r2/proxy/lib/deps/pomelo-jsclient.js
lightings 0f34c40d1f ...
2023-06-09 15:09:01 +08:00

354 lines
8.2 KiB
JavaScript

const net = require("net");
var JS_WS_CLIENT_TYPE = "js-websocket";
var JS_WS_CLIENT_VERSION = "0.0.1";
var Protocol = require("./pomelo-protocol");
var Package = Protocol.Package;
var Message = Protocol.Message;
var EventEmitter = require("./emitter");
const protobuf = require("./pomelo-protobuf");
var RES_OK = 200;
var RES_FAIL = 500;
var RES_OLD_CLIENT = 501;
class PomeloClient extends EventEmitter {
constructor() {
super();
this.socket = null;
this.reqId = 0;
this.callbacks = {};
this.handlers = {};
//Map from request id to route
this.routeMap = {};
this.heartbeatInterval = 0;
this.heartbeatTimeout = 0;
this.nextHeartbeatTimeout = 0;
this.gapThreshold = 100; // heartbeat gap threashold
this.heartbeatId = null;
this.heartbeatTimeoutId = null;
this.handshakeCallback = null;
this.handshakeBuffer = {
sys: {
type: JS_WS_CLIENT_TYPE,
version: JS_WS_CLIENT_VERSION,
},
user: {},
};
this.initCallback = null;
this.handlers[Package.TYPE_HANDSHAKE] = this.handshake.bind(this);
this.handlers[Package.TYPE_HEARTBEAT] = this.heartbeat.bind(this);
this.handlers[Package.TYPE_DATA] = this.onData.bind(this);
this.handlers[Package.TYPE_KICK] = this.onKick.bind(this);
this._push_callback = null;
}
init(params, cb, push) {
this._push_callback = push;
this.initCallback = cb;
var host = params.host;
var port = params.port;
this.handshakeBuffer.user = params.user;
this.handshakeCallback = params.handshakeCallback;
this.initWebSocket(host, port, cb);
}
disconnect() {
if (this.socket) {
if (this.socket.disconnect) this.socket.disconnect();
if (this.socket.close) this.socket.close();
console.log("disconnect");
this.socket = null;
}
if (this.heartbeatId) {
clearTimeout(this.heartbeatId);
this.heartbeatId = null;
}
if (this.heartbeatTimeoutId) {
clearTimeout(this.heartbeatTimeoutId);
this.heartbeatTimeoutId = null;
}
}
request(route, msg, cb) {
if (arguments.length === 2 && typeof msg === "function") {
cb = msg;
msg = {};
} else {
msg = msg || {};
}
route = route || msg.route;
if (!route) {
return;
}
this.reqId++;
this.sendMessage(this.reqId, route, msg);
this.callbacks[this.reqId] = cb;
this.routeMap[this.reqId] = route;
}
notify(route, msg) {
msg = msg || {};
this.sendMessage(0, route, msg);
}
sendMessage(reqId, route, msg) {
var type = reqId ? Message.TYPE_REQUEST : Message.TYPE_NOTIFY;
//compress message by protobuf
var protos = !!this.data.protos ? this.data.protos.client : {};
if (!!protos[route]) {
msg = protobuf.encode(route, msg);
} else {
msg = Protocol.strencode(JSON.stringify(msg));
}
var compressRoute = 0;
if (this.dict && this.dict[route]) {
route = this.dict[route];
compressRoute = 1;
}
msg = Message.encode(reqId, type, compressRoute, route, msg);
var packet = Package.encode(Package.TYPE_DATA, msg);
this.send(packet);
}
initWebSocket(host, port, cb) {
console.log("connect to " + host + ":" + port);
this.socket = net.connect(port, host, (event) => {
var obj = Package.encode(
Package.TYPE_HANDSHAKE,
Protocol.strencode(JSON.stringify(this.handshakeBuffer))
);
this.send(obj);
});
this.socket.on("data", (event) => {
this.processPackage(Package.decode(event), cb);
// new package arrived, update the heartbeat timeout
if (this.heartbeatTimeout) {
this.nextHeartbeatTimeout = Date.now() + this.heartbeatTimeout;
}
});
this.socket.on("error", (event) => {
this.emit("io-error", event);
console.error("socket error: ", event);
});
this.socket.on("close", (event) => {
this.emit("close", event);
console.error("socket close: ", event);
});
}
send(packet) {
if (!this.socket) {
console.log("socket is not exist");
return;
}
this.socket.write(packet);
}
processPackage(msg) {
try {
this.handlers[msg.type](msg.body);
}
catch (ex) {
console.log("processPackage msg: ", msg);
console.error("processPackage error: ", ex);
}
}
heartbeat(data) {
if (!this.heartbeatInterval) {
// no heartbeat
return;
}
var obj = Package.encode(Package.TYPE_HEARTBEAT);
if (this.heartbeatTimeoutId) {
clearTimeout(this.heartbeatTimeoutId);
this.heartbeatTimeoutId = null;
}
if (this.heartbeatId) {
// already in a heartbeat interval
return;
}
this.heartbeatId = setTimeout(
function () {
this.heartbeatId = null;
this.send(obj);
this.nextHeartbeatTimeout = Date.now() + this.heartbeatTimeout;
this.heartbeatTimeoutId = setTimeout(
this.heartbeatTimeoutCb.bind(this),
this.heartbeatTimeout
);
}.bind(this),
this.heartbeatInterval
);
}
heartbeatTimeoutCb() {
var gap = this.nextHeartbeatTimeout - Date.now();
if (gap > this.gapThreshold) {
this.heartbeatTimeoutId = setTimeout(this.heartbeatTimeoutCb, gap);
} else {
console.error("server heartbeat timeout");
this.emit("heartbeat timeout");
this.disconnect();
}
}
handshake(data) {
data = JSON.parse(Protocol.strdecode(data));
if (data.code === RES_OLD_CLIENT) {
this.emit("error", "client version not fullfill");
return;
}
if (data.code !== RES_OK) {
this.emit("error", "handshake fail");
return;
}
this.handshakeInit(data);
var obj = Package.encode(Package.TYPE_HANDSHAKE_ACK);
this.send(obj);
if (this.initCallback) {
this.initCallback(this.socket);
this.initCallback = null;
}
}
handshakeInit(data) {
if (data.sys && data.sys.heartbeat) {
this.heartbeatInterval = data.sys.heartbeat * 1000; // heartbeat interval
this.heartbeatTimeout = this.heartbeatInterval * 2; // max heartbeat timeout
} else {
this.heartbeatInterval = 0;
this.heartbeatTimeout = 0;
}
this.initData(data);
if (typeof this.handshakeCallback === "function") {
this.handshakeCallback(data.user);
}
}
initData(data) {
if (!data || !data.sys) {
return;
}
this.data = this.data || {};
var dict = data.sys.dict;
var protos = data.sys.protos;
//Init compress dict
if (dict) {
this.data.dict = dict;
this.data.abbrs = {};
for (var route in dict) {
this.data.abbrs[dict[route]] = route;
}
}
//Init protobuf protos
if (protos) {
this.data.protos = {
server: protos.server || {},
client: protos.client || {},
};
if (!!protobuf) {
protobuf.init({
encoderProtos: protos.client,
decoderProtos: protos.server,
});
}
}
}
onData(data) {
//probuff decode
var msg = Message.decode(data);
if (msg.id > 0) {
msg.route = this.routeMap[msg.id];
delete this.routeMap[msg.id];
if (!msg.route) {
return;
}
}
msg.body = this.deCompose(msg);
this.processMessage(msg);
}
deCompose(msg) {
var protos = !!this.data.protos ? this.data.protos.server : {};
var abbrs = this.data.abbrs;
var route = msg.route;
//Decompose route from dict
if (msg.compressRoute) {
if (!abbrs[route]) {
return {};
}
route = msg.route = abbrs[route];
}
if (!!protos[route]) {
return protobuf.decode(route, msg.body);
} else {
return JSON.parse(Protocol.strdecode(msg.body));
}
return msg;
}
processMessage(msg) {
if(!msg.id) {
// server push message
this.emit(msg.route, msg.body);
if (this._push_callback) {
this._push_callback(msg.route, msg.body);
}
return;
}
//if have a id then find the callback function with the request
var cb = this.callbacks[msg.id];
delete this.callbacks[msg.id];
if(typeof cb !== 'function') {
return;
}
cb(msg.body);
return;
};
onKick(data) {
this.emit("onKick");
}
}
module.exports = PomeloClient;