381 lines
8.8 KiB
JavaScript
381 lines
8.8 KiB
JavaScript
const net = require("net");
|
|
|
|
var JS_WS_CLIENT_TYPE = "js-websocket";
|
|
var JS_WS_CLIENT_VERSION = "0.0.1";
|
|
|
|
var Protocol = require("./pomelo-protocol");
|
|
var Package = Protocol.Package;
|
|
var Message = Protocol.Message;
|
|
var EventEmitter = require("./emitter");
|
|
const protobuf = require("./pomelo-protobuf");
|
|
|
|
var RES_OK = 200;
|
|
var RES_FAIL = 500;
|
|
var RES_OLD_CLIENT = 501;
|
|
|
|
class PomeloClient extends EventEmitter {
|
|
constructor() {
|
|
super();
|
|
this.socket = null;
|
|
this.reqId = 0;
|
|
this.callbacks = {};
|
|
this.handlers = {};
|
|
//Map from request id to route
|
|
this.routeMap = {};
|
|
|
|
this.heartbeatInterval = 0;
|
|
this.heartbeatTimeout = 0;
|
|
this.nextHeartbeatTimeout = 0;
|
|
this.gapThreshold = 100; // heartbeat gap threashold
|
|
this.heartbeatId = null;
|
|
this.heartbeatTimeoutId = null;
|
|
|
|
this.handshakeCallback = null;
|
|
|
|
this.handshakeBuffer = {
|
|
sys: {
|
|
type: JS_WS_CLIENT_TYPE,
|
|
version: JS_WS_CLIENT_VERSION,
|
|
},
|
|
user: {},
|
|
};
|
|
|
|
this.initCallback = null;
|
|
|
|
this.handlers[Package.TYPE_HANDSHAKE] = this.handshake.bind(this);
|
|
this.handlers[Package.TYPE_HEARTBEAT] = this.heartbeat.bind(this);
|
|
this.handlers[Package.TYPE_DATA] = this.onData.bind(this);
|
|
this.handlers[Package.TYPE_KICK] = this.onKick.bind(this);
|
|
|
|
this._push_callback = null;
|
|
}
|
|
|
|
init(params, cb, push) {
|
|
this._push_callback = push;
|
|
this.initCallback = cb;
|
|
var host = params.host;
|
|
var port = params.port;
|
|
|
|
this.handshakeBuffer.user = params.user;
|
|
this.handshakeCallback = params.handshakeCallback;
|
|
this.initWebSocket(host, port, cb);
|
|
}
|
|
|
|
disconnect() {
|
|
if (this.socket) {
|
|
this.socket.removeAllListeners();
|
|
if (this.socket.disconnect) this.socket.disconnect();
|
|
if (this.socket.close) this.socket.close();
|
|
console.log("disconnect");
|
|
this.socket = null;
|
|
}
|
|
|
|
if (this.heartbeatId) {
|
|
clearTimeout(this.heartbeatId);
|
|
this.heartbeatId = null;
|
|
}
|
|
if (this.heartbeatTimeoutId) {
|
|
clearTimeout(this.heartbeatTimeoutId);
|
|
this.heartbeatTimeoutId = null;
|
|
}
|
|
}
|
|
|
|
request(route, msg, cb) {
|
|
if (arguments.length === 2 && typeof msg === "function") {
|
|
cb = msg;
|
|
msg = {};
|
|
} else {
|
|
msg = msg || {};
|
|
}
|
|
route = route || msg.route;
|
|
if (!route) {
|
|
return;
|
|
}
|
|
|
|
this.reqId++;
|
|
this.sendMessage(this.reqId, route, msg);
|
|
|
|
this.callbacks[this.reqId] = cb;
|
|
this.routeMap[this.reqId] = route;
|
|
}
|
|
|
|
notify(route, msg) {
|
|
msg = msg || {};
|
|
this.sendMessage(0, route, msg);
|
|
}
|
|
|
|
sendMessage(reqId, route, msg) {
|
|
var type = reqId ? Message.TYPE_REQUEST : Message.TYPE_NOTIFY;
|
|
|
|
//compress message by protobuf
|
|
var protos = !!this.data.protos ? this.data.protos.client : {};
|
|
if (!!protos[route]) {
|
|
msg = protobuf.encode(route, msg);
|
|
} else {
|
|
msg = Protocol.strencode(JSON.stringify(msg));
|
|
}
|
|
|
|
var compressRoute = 0;
|
|
if (this.dict && this.dict[route]) {
|
|
route = this.dict[route];
|
|
compressRoute = 1;
|
|
}
|
|
|
|
msg = Message.encode(reqId, type, compressRoute, route, msg);
|
|
var packet = Package.encode(Package.TYPE_DATA, msg);
|
|
this.send(packet);
|
|
}
|
|
|
|
initWebSocket(host, port, cb) {
|
|
console.log("connect to " + host + ":" + port);
|
|
|
|
this.socket = net.connect(port, host, (event) => {
|
|
var obj = Package.encode(
|
|
Package.TYPE_HANDSHAKE,
|
|
Protocol.strencode(JSON.stringify(this.handshakeBuffer))
|
|
);
|
|
this.send(obj);
|
|
});
|
|
this.socket.on("data", (event) => {
|
|
// console.log("socket data: ", event);
|
|
let r = Package.decode(event);
|
|
if (r instanceof Array) {
|
|
for (var i = 0; i < r.length; i++) {
|
|
this.processPackage(r[i], cb);
|
|
}
|
|
} else {
|
|
this.processPackage(r, cb);
|
|
}
|
|
// new package arrived, update the heartbeat timeout
|
|
if (this.heartbeatTimeout) {
|
|
this.nextHeartbeatTimeout = Date.now() + this.heartbeatTimeout;
|
|
}
|
|
});
|
|
this.socket.on("error", (event) => {
|
|
this.emit("io-error", event);
|
|
console.error("socket error: ", event);
|
|
});
|
|
this.socket.on("close", (event) => {
|
|
this.emit("close", event);
|
|
console.error("socket close: ", event);
|
|
});
|
|
}
|
|
|
|
send(packet) {
|
|
if (!this.socket) {
|
|
console.log("socket is not exist");
|
|
return;
|
|
}
|
|
this.socket.write(packet);
|
|
}
|
|
|
|
processPackage(msg) {
|
|
// console.log("processPackage msg: ", msg);
|
|
try {
|
|
this.handlers[msg.type](msg.body);
|
|
}
|
|
catch (ex) {
|
|
console.log("processPackage msg: ", msg);
|
|
console.error("processPackage error: ", ex);
|
|
}
|
|
}
|
|
|
|
heartbeat(data) {
|
|
if (!this.heartbeatInterval) {
|
|
// no heartbeat
|
|
return;
|
|
}
|
|
|
|
var obj = Package.encode(Package.TYPE_HEARTBEAT);
|
|
if (this.heartbeatTimeoutId) {
|
|
clearTimeout(this.heartbeatTimeoutId);
|
|
this.heartbeatTimeoutId = null;
|
|
}
|
|
|
|
if (this.heartbeatId) {
|
|
// already in a heartbeat interval
|
|
return;
|
|
}
|
|
|
|
this.heartbeatId = setTimeout(
|
|
function () {
|
|
this.heartbeatId = null;
|
|
this.send(obj);
|
|
|
|
this.nextHeartbeatTimeout = Date.now() + this.heartbeatTimeout;
|
|
this.heartbeatTimeoutId = setTimeout(
|
|
this.heartbeatTimeoutCb.bind(this),
|
|
this.heartbeatTimeout
|
|
);
|
|
}.bind(this),
|
|
this.heartbeatInterval
|
|
);
|
|
}
|
|
|
|
heartbeatTimeoutCb() {
|
|
var gap = this.nextHeartbeatTimeout - Date.now();
|
|
if (gap > this.gapThreshold) {
|
|
this.heartbeatTimeoutId = setTimeout(this.heartbeatTimeoutCb, gap);
|
|
} else {
|
|
console.error("server heartbeat timeout");
|
|
this.emit("heartbeat timeout");
|
|
this.disconnect();
|
|
}
|
|
}
|
|
|
|
handshake(data) {
|
|
data = JSON.parse(Protocol.strdecode(data));
|
|
if (data.code === RES_OLD_CLIENT) {
|
|
this.emit("error", "client version not fullfill");
|
|
return;
|
|
}
|
|
|
|
if (data.code !== RES_OK) {
|
|
this.emit("error", "handshake fail");
|
|
return;
|
|
}
|
|
|
|
this.handshakeInit(data);
|
|
|
|
var obj = Package.encode(Package.TYPE_HANDSHAKE_ACK);
|
|
this.send(obj);
|
|
if (this.initCallback) {
|
|
this.initCallback(this.socket);
|
|
this.initCallback = null;
|
|
}
|
|
}
|
|
|
|
handshakeInit(data) {
|
|
if (data.sys && data.sys.heartbeat) {
|
|
this.heartbeatInterval = data.sys.heartbeat * 1000; // heartbeat interval
|
|
this.heartbeatTimeout = this.heartbeatInterval * 2; // max heartbeat timeout
|
|
} else {
|
|
this.heartbeatInterval = 0;
|
|
this.heartbeatTimeout = 0;
|
|
}
|
|
|
|
this.initData(data);
|
|
|
|
if (typeof this.handshakeCallback === "function") {
|
|
this.handshakeCallback(data.user);
|
|
}
|
|
}
|
|
|
|
initData(data) {
|
|
if (!data || !data.sys) {
|
|
return;
|
|
}
|
|
this.data = this.data || {};
|
|
var dict = data.sys.dict;
|
|
var protos = data.sys.protos;
|
|
|
|
//Init compress dict
|
|
if (dict) {
|
|
this.data.dict = dict;
|
|
this.data.abbrs = {};
|
|
|
|
for (var route in dict) {
|
|
this.data.abbrs[dict[route]] = route;
|
|
}
|
|
}
|
|
|
|
//Init protobuf protos
|
|
if (protos) {
|
|
this.data.protos = {
|
|
server: protos.server || {},
|
|
client: protos.client || {},
|
|
};
|
|
if (!!protobuf) {
|
|
protobuf.init({
|
|
encoderProtos: protos.client,
|
|
decoderProtos: protos.server,
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
onData(data) {
|
|
// console.log("************* onData: ", data);
|
|
//probuff decode
|
|
var msg = Message.decode(data);
|
|
|
|
if (msg.id > 0) {
|
|
msg.route = this.routeMap[msg.id];
|
|
delete this.routeMap[msg.id];
|
|
if (!msg.route) {
|
|
return;
|
|
}
|
|
}
|
|
|
|
msg.body = this.deCompose(msg);
|
|
|
|
this.processMessage(msg);
|
|
}
|
|
|
|
deCompose(msg) {
|
|
var protos = !!this.data.protos ? this.data.protos.server : {};
|
|
var abbrs = this.data.abbrs;
|
|
var route = msg.route;
|
|
|
|
//Decompose route from dict
|
|
if (msg.compressRoute) {
|
|
if (!abbrs[route]) {
|
|
return {};
|
|
}
|
|
|
|
route = msg.route = abbrs[route];
|
|
}
|
|
if (!!protos[route]) {
|
|
return protobuf.decode(route, msg.body);
|
|
} else {
|
|
return JSON.parse(Protocol.strdecode(msg.body));
|
|
}
|
|
|
|
return msg;
|
|
}
|
|
|
|
processMessage(msg) {
|
|
// console.warn('recv msg: ', msg);
|
|
if(!msg.id) {
|
|
// server push message
|
|
this.emit(msg.route, msg.body);
|
|
if (this._push_callback) {
|
|
this._push_callback(msg.route, msg.body);
|
|
}
|
|
return;
|
|
}
|
|
|
|
//if have a id then find the callback function with the request
|
|
var cb = this.callbacks[msg.id];
|
|
|
|
delete this.callbacks[msg.id];
|
|
if(typeof cb !== 'function') {
|
|
return;
|
|
}
|
|
|
|
cb(msg.body);
|
|
return;
|
|
};
|
|
|
|
onKick(data) {
|
|
this.emit("onKick");
|
|
}
|
|
|
|
destructor() {
|
|
this.data = null;
|
|
this.handlers = null;
|
|
this.removeAllListeners();
|
|
this.disconnect();
|
|
|
|
if (this.initCallback) {
|
|
this.initCallback = null;
|
|
}
|
|
|
|
if (this._push_callback) {
|
|
this._push_callback = null;
|
|
}
|
|
}
|
|
}
|
|
|
|
module.exports = PomeloClient;
|