const net = require("net"); var JS_WS_CLIENT_TYPE = "js-websocket"; var JS_WS_CLIENT_VERSION = "0.0.1"; var Protocol = require("./pomelo-protocol"); var Package = Protocol.Package; var Message = Protocol.Message; var EventEmitter = require("./emitter"); const protobuf = require("./pomelo-protobuf"); var RES_OK = 200; var RES_FAIL = 500; var RES_OLD_CLIENT = 501; class PomeloClient extends EventEmitter { constructor() { super(); this.socket = null; this.reqId = 0; this.callbacks = {}; this.handlers = {}; //Map from request id to route this.routeMap = {}; this.heartbeatInterval = 0; this.heartbeatTimeout = 0; this.nextHeartbeatTimeout = 0; this.gapThreshold = 100; // heartbeat gap threashold this.heartbeatId = null; this.heartbeatTimeoutId = null; this.handshakeCallback = null; this.handshakeBuffer = { sys: { type: JS_WS_CLIENT_TYPE, version: JS_WS_CLIENT_VERSION, }, user: {}, }; this.initCallback = null; this.handlers[Package.TYPE_HANDSHAKE] = this.handshake.bind(this); this.handlers[Package.TYPE_HEARTBEAT] = this.heartbeat.bind(this); this.handlers[Package.TYPE_DATA] = this.onData.bind(this); this.handlers[Package.TYPE_KICK] = this.onKick.bind(this); } init(params, cb) { this.initCallback = cb; var host = params.host; var port = params.port; this.handshakeBuffer.user = params.user; this.handshakeCallback = params.handshakeCallback; this.initWebSocket(host, port, cb); } disconnect() { if (this.socket) { if (this.socket.disconnect) this.socket.disconnect(); if (this.socket.close) this.socket.close(); console.log("disconnect"); this.socket = null; } if (this.heartbeatId) { clearTimeout(this.heartbeatId); this.heartbeatId = null; } if (this.heartbeatTimeoutId) { clearTimeout(this.heartbeatTimeoutId); this.heartbeatTimeoutId = null; } } request(route, msg, cb) { if (arguments.length === 2 && typeof msg === "function") { cb = msg; msg = {}; } else { msg = msg || {}; } route = route || msg.route; if (!route) { return; } this.reqId++; this.sendMessage(this.reqId, route, msg); this.callbacks[this.reqId] = cb; this.routeMap[this.reqId] = route; } notify(route, msg) { msg = msg || {}; this.sendMessage(0, route, msg); } sendMessage(reqId, route, msg) { var type = reqId ? Message.TYPE_REQUEST : Message.TYPE_NOTIFY; //compress message by protobuf var protos = !!this.data.protos ? this.data.protos.client : {}; if (!!protos[route]) { msg = protobuf.encode(route, msg); } else { msg = Protocol.strencode(JSON.stringify(msg)); } var compressRoute = 0; if (this.dict && this.dict[route]) { route = this.dict[route]; compressRoute = 1; } msg = Message.encode(reqId, type, compressRoute, route, msg); var packet = Package.encode(Package.TYPE_DATA, msg); this.send(packet); } initWebSocket(host, port, cb) { console.log("connect to " + host + ":" + port); this.socket = net.connect(port, host, (event) => { var obj = Package.encode( Package.TYPE_HANDSHAKE, Protocol.strencode(JSON.stringify(this.handshakeBuffer)) ); this.send(obj); }); this.socket.on("data", (event) => { this.processPackage(Package.decode(event), cb); // new package arrived, update the heartbeat timeout if (this.heartbeatTimeout) { this.nextHeartbeatTimeout = Date.now() + this.heartbeatTimeout; } }); this.socket.on("error", (event) => { this.emit("io-error", event); console.error("socket error: ", event); }); this.socket.on("close", (event) => { this.emit("close", event); console.error("socket close: ", event); }); } send(packet) { this.socket.write(packet); } processPackage(msg) { this.handlers[msg.type](msg.body); } heartbeat(data) { if (!this.heartbeatInterval) { // no heartbeat return; } var obj = Package.encode(Package.TYPE_HEARTBEAT); if (this.heartbeatTimeoutId) { clearTimeout(this.heartbeatTimeoutId); this.heartbeatTimeoutId = null; } if (this.heartbeatId) { // already in a heartbeat interval return; } this.heartbeatId = setTimeout( function () { this.heartbeatId = null; this.send(obj); this.nextHeartbeatTimeout = Date.now() + this.heartbeatTimeout; this.heartbeatTimeoutId = setTimeout( this.heartbeatTimeoutCb.bind(this), this.heartbeatTimeout ); }.bind(this), this.heartbeatInterval ); } heartbeatTimeoutCb() { var gap = this.nextHeartbeatTimeout - Date.now(); if (gap > this.gapThreshold) { this.heartbeatTimeoutId = setTimeout(this.heartbeatTimeoutCb, gap); } else { console.error("server heartbeat timeout"); this.emit("heartbeat timeout"); this.disconnect(); } } handshake(data) { data = JSON.parse(Protocol.strdecode(data)); if (data.code === RES_OLD_CLIENT) { this.emit("error", "client version not fullfill"); return; } if (data.code !== RES_OK) { this.emit("error", "handshake fail"); return; } this.handshakeInit(data); var obj = Package.encode(Package.TYPE_HANDSHAKE_ACK); this.send(obj); if (this.initCallback) { this.initCallback(this.socket); this.initCallback = null; } } handshakeInit(data) { if (data.sys && data.sys.heartbeat) { this.heartbeatInterval = data.sys.heartbeat * 1000; // heartbeat interval this.heartbeatTimeout = this.heartbeatInterval * 2; // max heartbeat timeout } else { this.heartbeatInterval = 0; this.heartbeatTimeout = 0; } this.initData(data); if (typeof this.handshakeCallback === "function") { this.handshakeCallback(data.user); } } initData(data) { if (!data || !data.sys) { return; } this.data = this.data || {}; var dict = data.sys.dict; var protos = data.sys.protos; //Init compress dict if (dict) { this.data.dict = dict; this.data.abbrs = {}; for (var route in dict) { this.data.abbrs[dict[route]] = route; } } //Init protobuf protos if (protos) { this.data.protos = { server: protos.server || {}, client: protos.client || {}, }; if (!!protobuf) { protobuf.init({ encoderProtos: protos.client, decoderProtos: protos.server, }); } } } onData(data) { //probuff decode var msg = Message.decode(data); if (msg.id > 0) { msg.route = this.routeMap[msg.id]; delete this.routeMap[msg.id]; if (!msg.route) { return; } } msg.body = this.deCompose(msg); this.processMessage(msg); } deCompose(msg) { var protos = !!this.data.protos ? this.data.protos.server : {}; var abbrs = this.data.abbrs; var route = msg.route; //Decompose route from dict if (msg.compressRoute) { if (!abbrs[route]) { return {}; } route = msg.route = abbrs[route]; } if (!!protos[route]) { return protobuf.decode(route, msg.body); } else { return JSON.parse(Protocol.strdecode(msg.body)); } return msg; } processMessage(msg) { if(!msg.id) { // server push message this.emit(msg.route, msg.body); return; } //if have a id then find the callback function with the request var cb = this.callbacks[msg.id]; delete this.callbacks[msg.id]; if(typeof cb !== 'function') { return; } cb(msg.body); return; }; onKick(data) { this.emit("onKick"); } } module.exports = PomeloClient;