/* This file is part of web3.js. web3.js is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. web3.js is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with web3.js. If not, see . */ /** * @file WebsocketProvider.js * @authors: Samuel Furter , Fabian Vogelsteller * @date 2019 */ 'use strict'; var EventEmitter = require('eventemitter3'); var helpers = require('./helpers.js'); var errors = require('web3-core-helpers').errors; var Ws = require('websocket').w3cwebsocket; /** * @param {string} url * @param {Object} options * * @constructor */ var WebsocketProvider = function WebsocketProvider(url, options) { EventEmitter.call(this); options = options || {}; this.url = url; this._customTimeout = options.timeout || 1000 * 15; this.headers = options.headers || {}; this.protocol = options.protocol || undefined; this.reconnectOptions = Object.assign({ auto: false, delay: 5000, maxAttempts: false, onTimeout: false }, options.reconnect); this.clientConfig = options.clientConfig || undefined; // Allow a custom client configuration this.requestOptions = options.requestOptions || undefined; // Allow a custom request options (https://github.com/theturtle32/WebSocket-Node/blob/master/docs/WebSocketClient.md#connectrequesturl-requestedprotocols-origin-headers-requestoptions) this.DATA = 'data'; this.CLOSE = 'close'; this.ERROR = 'error'; this.CONNECT = 'connect'; this.RECONNECT = 'reconnect'; this.connection = null; this.requestQueue = new Map(); this.responseQueue = new Map(); this.reconnectAttempts = 0; this.reconnecting = false; // The w3cwebsocket implementation does not support Basic Auth // username/password in the URL. So generate the basic auth header, and // pass through with any additional headers supplied in constructor var parsedURL = helpers.parseURL(url); if (parsedURL.username && parsedURL.password) { this.headers.authorization = 'Basic ' + helpers.btoa(parsedURL.username + ':' + parsedURL.password); } // When all node core implementations that do not have the // WHATWG compatible URL parser go out of service this line can be removed. if (parsedURL.auth) { this.headers.authorization = 'Basic ' + helpers.btoa(parsedURL.auth); } // make property `connected` which will return the current connection status Object.defineProperty(this, 'connected', { get: function () { return this.connection && this.connection.readyState === this.connection.OPEN; }, enumerable: true }); this.connect(); }; // Inherit from EventEmitter WebsocketProvider.prototype = Object.create(EventEmitter.prototype); WebsocketProvider.prototype.constructor = WebsocketProvider; /** * Connects to the configured node * * @method connect * * @returns {void} */ WebsocketProvider.prototype.connect = function () { this.connection = new Ws(this.url, this.protocol, undefined, this.headers, this.requestOptions, this.clientConfig); this._addSocketListeners(); }; /** * Listener for the `data` event of the underlying WebSocket object * * @method _onMessage * * @returns {void} */ WebsocketProvider.prototype._onMessage = function (e) { var _this = this; this._parseResponse((typeof e.data === 'string') ? e.data : '').forEach(function (result) { if (result.method && result.method.indexOf('_subscription') !== -1) { _this.emit(_this.DATA, result); return; } var id = result.id; // get the id which matches the returned id if (Array.isArray(result)) { id = result[0].id; } if (_this.responseQueue.has(id)) { if (_this.responseQueue.get(id).callback !== undefined) { _this.responseQueue.get(id).callback(false, result); } _this.responseQueue.delete(id); } }); }; /** * Listener for the `open` event of the underlying WebSocket object * * @method _onConnect * * @returns {void} */ WebsocketProvider.prototype._onConnect = function () { this.emit(this.CONNECT); this.reconnectAttempts = 0; this.reconnecting = false; if (this.requestQueue.size > 0) { var _this = this; this.requestQueue.forEach(function (request, key) { _this.send(request.payload, request.callback); _this.requestQueue.delete(key); }); } }; /** * Listener for the `close` event of the underlying WebSocket object * * @method _onClose * * @returns {void} */ WebsocketProvider.prototype._onClose = function (event) { var _this = this; if (this.reconnectOptions.auto && (![1000, 1001].includes(event.code) || event.wasClean === false)) { this.reconnect(); return; } this.emit(this.CLOSE, event); if (this.requestQueue.size > 0) { this.requestQueue.forEach(function (request, key) { request.callback(errors.ConnectionNotOpenError(event)); _this.requestQueue.delete(key); }); } if (this.responseQueue.size > 0) { this.responseQueue.forEach(function (request, key) { request.callback(errors.InvalidConnection('on WS', event)); _this.responseQueue.delete(key); }); } this._removeSocketListeners(); this.removeAllListeners(); }; /** * Will add the required socket listeners * * @method _addSocketListeners * * @returns {void} */ WebsocketProvider.prototype._addSocketListeners = function () { this.connection.addEventListener('message', this._onMessage.bind(this)); this.connection.addEventListener('open', this._onConnect.bind(this)); this.connection.addEventListener('close', this._onClose.bind(this)); }; /** * Will remove all socket listeners * * @method _removeSocketListeners * * @returns {void} */ WebsocketProvider.prototype._removeSocketListeners = function () { this.connection.removeEventListener('message', this._onMessage); this.connection.removeEventListener('open', this._onConnect); this.connection.removeEventListener('close', this._onClose); }; /** * Will parse the response and make an array out of it. * * @method _parseResponse * * @param {String} data * * @returns {Array} */ WebsocketProvider.prototype._parseResponse = function (data) { var _this = this, returnValues = []; // DE-CHUNKER var dechunkedData = data .replace(/\}[\n\r]?\{/g, '}|--|{') // }{ .replace(/\}\][\n\r]?\[\{/g, '}]|--|[{') // }][{ .replace(/\}[\n\r]?\[\{/g, '}|--|[{') // }[{ .replace(/\}\][\n\r]?\{/g, '}]|--|{') // }]{ .split('|--|'); dechunkedData.forEach(function (data) { // prepend the last chunk if (_this.lastChunk) data = _this.lastChunk + data; var result = null; try { result = JSON.parse(data); } catch (e) { _this.lastChunk = data; // start timeout to cancel all requests clearTimeout(_this.lastChunkTimeout); _this.lastChunkTimeout = setTimeout(function () { if (_this.reconnectOptions.auto && _this.reconnectOptions.onTimeout) { _this.reconnect(); return; } _this.emit(_this.ERROR, errors.ConnectionTimeout(_this._customTimeout)); if (_this.requestQueue.size > 0) { _this.requestQueue.forEach(function (request, key) { request.callback(errors.ConnectionTimeout(_this._customTimeout)); _this.requestQueue.delete(key); }); } }, _this._customTimeout); return; } // cancel timeout and set chunk to null clearTimeout(_this.lastChunkTimeout); _this.lastChunk = null; if (result) returnValues.push(result); }); return returnValues; }; /** * Does check if the provider is connecting and will add it to the queue or will send it directly * * @method send * * @param {Object} payload * @param {Function} callback * * @returns {void} */ WebsocketProvider.prototype.send = function (payload, callback) { var _this = this; var id = payload.id; var request = { payload: payload, callback: callback }; if (Array.isArray(payload)) { id = payload[0].id; } if (this.connection.readyState === this.connection.CONNECTING || this.reconnecting) { this.requestQueue.set(id, request); return; } if (this.connection.readyState !== this.connection.OPEN) { this.requestQueue.delete(id); this.emit(this.ERROR, errors.ConnectionNotOpenError()); request.callback(errors.ConnectionNotOpenError()); return; } this.responseQueue.set(id, request); this.requestQueue.delete(id); try { this.connection.send(JSON.stringify(request.payload)); } catch (error) { request.callback(error); _this.responseQueue.delete(id); } }; /** * Resets the providers, clears all callbacks * * @method reset * * @returns {void} */ WebsocketProvider.prototype.reset = function () { this.responseQueue.clear(); this.requestQueue.clear(); this.removeAllListeners(); this._removeSocketListeners(); this._addSocketListeners(); }; /** * Closes the current connection with the given code and reason arguments * * @method disconnect * * @param {number} code * @param {string} reason * * @returns {void} */ WebsocketProvider.prototype.disconnect = function (code, reason) { this._removeSocketListeners(); this.connection.close(code || 1000, reason); }; /** * Returns the desired boolean. * * @method supportsSubscriptions * * @returns {boolean} */ WebsocketProvider.prototype.supportsSubscriptions = function () { return true; }; /** * Removes the listeners and reconnects to the socket. * * @method reconnect * * @returns {void} */ WebsocketProvider.prototype.reconnect = function () { var _this = this; this.reconnecting = true; if (this.responseQueue.size > 0) { this.responseQueue.forEach(function (request, key) { request.callback(errors.PendingRequestsOnReconnectingError()); _this.responseQueue.delete(key); }); } if (!this.reconnectOptions.maxAttempts || this.reconnectAttempts < this.reconnectOptions.maxAttempts) { setTimeout(function () { _this.reconnectAttempts++; _this._removeSocketListeners(); _this.emit(_this.RECONNECT, _this.reconnectAttempts); _this.connect(); }, this.reconnectOptions.delay); return; } this.emit(this.ERROR, errors.MaxAttemptsReachedOnReconnectingError()); this.reconnecting = false; if (this.requestQueue.size > 0) { this.requestQueue.forEach(function (request, key) { request.callback(errors.MaxAttemptsReachedOnReconnectingError()); _this.requestQueue.delete(key); }); } }; module.exports = WebsocketProvider;