aozhiwei 0a973d12e3 1
2022-01-27 17:32:21 +08:00

356 lines
12 KiB
JavaScript

/*
This file is part of web3.js.
web3.js is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
web3.js is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with web3.js. If not, see <http://www.gnu.org/licenses/>.
*/
/**
* @file WebsocketProvider.js
* @authors: Samuel Furter <samuel@ethereum.org>, Fabian Vogelsteller <fabian@ethereum.org>
* @date 2019
*/
'use strict';
var EventEmitter = require('eventemitter3');
var helpers = require('./helpers.js');
var errors = require('web3-core-helpers').errors;
var Ws = require('websocket').w3cwebsocket;
/**
* @param {string} url
* @param {Object} options
*
* @constructor
*/
var WebsocketProvider = function WebsocketProvider(url, options) {
EventEmitter.call(this);
options = options || {};
this.url = url;
this._customTimeout = options.timeout || 1000 * 15;
this.headers = options.headers || {};
this.protocol = options.protocol || undefined;
this.reconnectOptions = Object.assign({
auto: false,
delay: 5000,
maxAttempts: false,
onTimeout: false
}, options.reconnect);
this.clientConfig = options.clientConfig || undefined; // Allow a custom client configuration
this.requestOptions = options.requestOptions || undefined; // Allow a custom request options (https://github.com/theturtle32/WebSocket-Node/blob/master/docs/WebSocketClient.md#connectrequesturl-requestedprotocols-origin-headers-requestoptions)
this.DATA = 'data';
this.CLOSE = 'close';
this.ERROR = 'error';
this.CONNECT = 'connect';
this.RECONNECT = 'reconnect';
this.connection = null;
this.requestQueue = new Map();
this.responseQueue = new Map();
this.reconnectAttempts = 0;
this.reconnecting = false;
// The w3cwebsocket implementation does not support Basic Auth
// username/password in the URL. So generate the basic auth header, and
// pass through with any additional headers supplied in constructor
var parsedURL = helpers.parseURL(url);
if (parsedURL.username && parsedURL.password) {
this.headers.authorization = 'Basic ' + helpers.btoa(parsedURL.username + ':' + parsedURL.password);
}
// When all node core implementations that do not have the
// WHATWG compatible URL parser go out of service this line can be removed.
if (parsedURL.auth) {
this.headers.authorization = 'Basic ' + helpers.btoa(parsedURL.auth);
}
// make property `connected` which will return the current connection status
Object.defineProperty(this, 'connected', {
get: function () {
return this.connection && this.connection.readyState === this.connection.OPEN;
},
enumerable: true
});
this.connect();
};
// Inherit from EventEmitter
WebsocketProvider.prototype = Object.create(EventEmitter.prototype);
WebsocketProvider.prototype.constructor = WebsocketProvider;
/**
* Connects to the configured node
*
* @method connect
*
* @returns {void}
*/
WebsocketProvider.prototype.connect = function () {
this.connection = new Ws(this.url, this.protocol, undefined, this.headers, this.requestOptions, this.clientConfig);
this._addSocketListeners();
};
/**
* Listener for the `data` event of the underlying WebSocket object
*
* @method _onMessage
*
* @returns {void}
*/
WebsocketProvider.prototype._onMessage = function (e) {
var _this = this;
this._parseResponse((typeof e.data === 'string') ? e.data : '').forEach(function (result) {
if (result.method && result.method.indexOf('_subscription') !== -1) {
_this.emit(_this.DATA, result);
return;
}
var id = result.id;
// get the id which matches the returned id
if (Array.isArray(result)) {
id = result[0].id;
}
if (_this.responseQueue.has(id)) {
if (_this.responseQueue.get(id).callback !== undefined) {
_this.responseQueue.get(id).callback(false, result);
}
_this.responseQueue.delete(id);
}
});
};
/**
* Listener for the `open` event of the underlying WebSocket object
*
* @method _onConnect
*
* @returns {void}
*/
WebsocketProvider.prototype._onConnect = function () {
this.emit(this.CONNECT);
this.reconnectAttempts = 0;
this.reconnecting = false;
if (this.requestQueue.size > 0) {
var _this = this;
this.requestQueue.forEach(function (request, key) {
_this.send(request.payload, request.callback);
_this.requestQueue.delete(key);
});
}
};
/**
* Listener for the `close` event of the underlying WebSocket object
*
* @method _onClose
*
* @returns {void}
*/
WebsocketProvider.prototype._onClose = function (event) {
var _this = this;
if (this.reconnectOptions.auto && (![1000, 1001].includes(event.code) || event.wasClean === false)) {
this.reconnect();
return;
}
this.emit(this.CLOSE, event);
if (this.requestQueue.size > 0) {
this.requestQueue.forEach(function (request, key) {
request.callback(errors.ConnectionNotOpenError(event));
_this.requestQueue.delete(key);
});
}
if (this.responseQueue.size > 0) {
this.responseQueue.forEach(function (request, key) {
request.callback(errors.InvalidConnection('on WS', event));
_this.responseQueue.delete(key);
});
}
this._removeSocketListeners();
this.removeAllListeners();
};
/**
* Will add the required socket listeners
*
* @method _addSocketListeners
*
* @returns {void}
*/
WebsocketProvider.prototype._addSocketListeners = function () {
this.connection.addEventListener('message', this._onMessage.bind(this));
this.connection.addEventListener('open', this._onConnect.bind(this));
this.connection.addEventListener('close', this._onClose.bind(this));
};
/**
* Will remove all socket listeners
*
* @method _removeSocketListeners
*
* @returns {void}
*/
WebsocketProvider.prototype._removeSocketListeners = function () {
this.connection.removeEventListener('message', this._onMessage);
this.connection.removeEventListener('open', this._onConnect);
this.connection.removeEventListener('close', this._onClose);
};
/**
* Will parse the response and make an array out of it.
*
* @method _parseResponse
*
* @param {String} data
*
* @returns {Array}
*/
WebsocketProvider.prototype._parseResponse = function (data) {
var _this = this, returnValues = [];
// DE-CHUNKER
var dechunkedData = data
.replace(/\}[\n\r]?\{/g, '}|--|{') // }{
.replace(/\}\][\n\r]?\[\{/g, '}]|--|[{') // }][{
.replace(/\}[\n\r]?\[\{/g, '}|--|[{') // }[{
.replace(/\}\][\n\r]?\{/g, '}]|--|{') // }]{
.split('|--|');
dechunkedData.forEach(function (data) {
// prepend the last chunk
if (_this.lastChunk)
data = _this.lastChunk + data;
var result = null;
try {
result = JSON.parse(data);
}
catch (e) {
_this.lastChunk = data;
// start timeout to cancel all requests
clearTimeout(_this.lastChunkTimeout);
_this.lastChunkTimeout = setTimeout(function () {
if (_this.reconnectOptions.auto && _this.reconnectOptions.onTimeout) {
_this.reconnect();
return;
}
_this.emit(_this.ERROR, errors.ConnectionTimeout(_this._customTimeout));
if (_this.requestQueue.size > 0) {
_this.requestQueue.forEach(function (request, key) {
request.callback(errors.ConnectionTimeout(_this._customTimeout));
_this.requestQueue.delete(key);
});
}
}, _this._customTimeout);
return;
}
// cancel timeout and set chunk to null
clearTimeout(_this.lastChunkTimeout);
_this.lastChunk = null;
if (result)
returnValues.push(result);
});
return returnValues;
};
/**
* Does check if the provider is connecting and will add it to the queue or will send it directly
*
* @method send
*
* @param {Object} payload
* @param {Function} callback
*
* @returns {void}
*/
WebsocketProvider.prototype.send = function (payload, callback) {
var _this = this;
var id = payload.id;
var request = { payload: payload, callback: callback };
if (Array.isArray(payload)) {
id = payload[0].id;
}
if (this.connection.readyState === this.connection.CONNECTING || this.reconnecting) {
this.requestQueue.set(id, request);
return;
}
if (this.connection.readyState !== this.connection.OPEN) {
this.requestQueue.delete(id);
this.emit(this.ERROR, errors.ConnectionNotOpenError());
request.callback(errors.ConnectionNotOpenError());
return;
}
this.responseQueue.set(id, request);
this.requestQueue.delete(id);
try {
this.connection.send(JSON.stringify(request.payload));
}
catch (error) {
request.callback(error);
_this.responseQueue.delete(id);
}
};
/**
* Resets the providers, clears all callbacks
*
* @method reset
*
* @returns {void}
*/
WebsocketProvider.prototype.reset = function () {
this.responseQueue.clear();
this.requestQueue.clear();
this.removeAllListeners();
this._removeSocketListeners();
this._addSocketListeners();
};
/**
* Closes the current connection with the given code and reason arguments
*
* @method disconnect
*
* @param {number} code
* @param {string} reason
*
* @returns {void}
*/
WebsocketProvider.prototype.disconnect = function (code, reason) {
this._removeSocketListeners();
this.connection.close(code || 1000, reason);
};
/**
* Returns the desired boolean.
*
* @method supportsSubscriptions
*
* @returns {boolean}
*/
WebsocketProvider.prototype.supportsSubscriptions = function () {
return true;
};
/**
* Removes the listeners and reconnects to the socket.
*
* @method reconnect
*
* @returns {void}
*/
WebsocketProvider.prototype.reconnect = function () {
var _this = this;
this.reconnecting = true;
if (this.responseQueue.size > 0) {
this.responseQueue.forEach(function (request, key) {
request.callback(errors.PendingRequestsOnReconnectingError());
_this.responseQueue.delete(key);
});
}
if (!this.reconnectOptions.maxAttempts ||
this.reconnectAttempts < this.reconnectOptions.maxAttempts) {
setTimeout(function () {
_this.reconnectAttempts++;
_this._removeSocketListeners();
_this.emit(_this.RECONNECT, _this.reconnectAttempts);
_this.connect();
}, this.reconnectOptions.delay);
return;
}
this.emit(this.ERROR, errors.MaxAttemptsReachedOnReconnectingError());
this.reconnecting = false;
if (this.requestQueue.size > 0) {
this.requestQueue.forEach(function (request, key) {
request.callback(errors.MaxAttemptsReachedOnReconnectingError());
_this.requestQueue.delete(key);
});
}
};
module.exports = WebsocketProvider;