143 lines
4.7 KiB
JavaScript
143 lines
4.7 KiB
JavaScript
"use strict";
|
|
module.exports = Service;
|
|
|
|
var util = require("../util/minimal");
|
|
|
|
// Extends EventEmitter
|
|
(Service.prototype = Object.create(util.EventEmitter.prototype)).constructor = Service;
|
|
|
|
/**
|
|
* A service method callback as used by {@link rpc.ServiceMethod|ServiceMethod}.
|
|
*
|
|
* Differs from {@link RPCImplCallback} in that it is an actual callback of a service method which may not return `response = null`.
|
|
* @typedef rpc.ServiceMethodCallback
|
|
* @template TRes extends Message<TRes>
|
|
* @type {function}
|
|
* @param {Error|null} error Error, if any
|
|
* @param {TRes} [response] Response message
|
|
* @returns {undefined}
|
|
*/
|
|
|
|
/**
|
|
* A service method part of a {@link rpc.Service} as created by {@link Service.create}.
|
|
* @typedef rpc.ServiceMethod
|
|
* @template TReq extends Message<TReq>
|
|
* @template TRes extends Message<TRes>
|
|
* @type {function}
|
|
* @param {TReq|Properties<TReq>} request Request message or plain object
|
|
* @param {rpc.ServiceMethodCallback<TRes>} [callback] Node-style callback called with the error, if any, and the response message
|
|
* @returns {Promise<Message<TRes>>} Promise if `callback` has been omitted, otherwise `undefined`
|
|
*/
|
|
|
|
/**
|
|
* Constructs a new RPC service instance.
|
|
* @classdesc An RPC service as returned by {@link Service#create}.
|
|
* @exports rpc.Service
|
|
* @extends util.EventEmitter
|
|
* @constructor
|
|
* @param {RPCImpl} rpcImpl RPC implementation
|
|
* @param {boolean} [requestDelimited=false] Whether requests are length-delimited
|
|
* @param {boolean} [responseDelimited=false] Whether responses are length-delimited
|
|
*/
|
|
function Service(rpcImpl, requestDelimited, responseDelimited) {
|
|
|
|
if (typeof rpcImpl !== "function")
|
|
throw TypeError("rpcImpl must be a function");
|
|
|
|
util.EventEmitter.call(this);
|
|
|
|
/**
|
|
* RPC implementation. Becomes `null` once the service is ended.
|
|
* @type {RPCImpl|null}
|
|
*/
|
|
this.rpcImpl = rpcImpl;
|
|
|
|
/**
|
|
* Whether requests are length-delimited.
|
|
* @type {boolean}
|
|
*/
|
|
this.requestDelimited = Boolean(requestDelimited);
|
|
|
|
/**
|
|
* Whether responses are length-delimited.
|
|
* @type {boolean}
|
|
*/
|
|
this.responseDelimited = Boolean(responseDelimited);
|
|
}
|
|
|
|
/**
|
|
* Calls a service method through {@link rpc.Service#rpcImpl|rpcImpl}.
|
|
* @param {Method|rpc.ServiceMethod<TReq,TRes>} method Reflected or static method
|
|
* @param {Constructor<TReq>} requestCtor Request constructor
|
|
* @param {Constructor<TRes>} responseCtor Response constructor
|
|
* @param {TReq|Properties<TReq>} request Request message or plain object
|
|
* @param {rpc.ServiceMethodCallback<TRes>} callback Service callback
|
|
* @returns {undefined}
|
|
* @template TReq extends Message<TReq>
|
|
* @template TRes extends Message<TRes>
|
|
*/
|
|
Service.prototype.rpcCall = function rpcCall(method, requestCtor, responseCtor, request, callback) {
|
|
|
|
if (!request)
|
|
throw TypeError("request must be specified");
|
|
|
|
var self = this;
|
|
if (!callback)
|
|
return util.asPromise(rpcCall, self, method, requestCtor, responseCtor, request);
|
|
|
|
if (!self.rpcImpl) {
|
|
setTimeout(function() { callback(Error("already ended")); }, 0);
|
|
return undefined;
|
|
}
|
|
|
|
try {
|
|
return self.rpcImpl(
|
|
method,
|
|
requestCtor[self.requestDelimited ? "encodeDelimited" : "encode"](request).finish(),
|
|
function rpcCallback(err, response) {
|
|
|
|
if (err) {
|
|
self.emit("error", err, method);
|
|
return callback(err);
|
|
}
|
|
|
|
if (response === null) {
|
|
self.end(/* endedByRPC */ true);
|
|
return undefined;
|
|
}
|
|
|
|
if (!(response instanceof responseCtor)) {
|
|
try {
|
|
response = responseCtor[self.responseDelimited ? "decodeDelimited" : "decode"](response);
|
|
} catch (err) {
|
|
self.emit("error", err, method);
|
|
return callback(err);
|
|
}
|
|
}
|
|
|
|
self.emit("data", response, method);
|
|
return callback(null, response);
|
|
}
|
|
);
|
|
} catch (err) {
|
|
self.emit("error", err, method);
|
|
setTimeout(function() { callback(err); }, 0);
|
|
return undefined;
|
|
}
|
|
};
|
|
|
|
/**
|
|
* Ends this service and emits the `end` event.
|
|
* @param {boolean} [endedByRPC=false] Whether the service has been ended by the RPC implementation.
|
|
* @returns {rpc.Service} `this`
|
|
*/
|
|
Service.prototype.end = function end(endedByRPC) {
|
|
if (this.rpcImpl) {
|
|
if (!endedByRPC) // signal end to rpcImpl
|
|
this.rpcImpl(null, null, null);
|
|
this.rpcImpl = null;
|
|
this.emit("end").off();
|
|
}
|
|
return this;
|
|
};
|