diff --git a/package.json b/package.json index 1d4a7f5..fe8e677 100644 --- a/package.json +++ b/package.json @@ -25,6 +25,7 @@ "@types/express": "^4.17.1", "@types/express-rate-limit": "^5.1.1", "@types/mongoose": "5.10.3", + "@types/redis": "^2.8.28", "ts-node": "^8.1.0", "ts-node-dev": "^1.0.0-pre.63", "typescript": "^3.4.5" @@ -42,6 +43,7 @@ "express-jwt": "^5.3.1", "express-rate-limit": "^5.2.3", "fs-jetpack": "^4.1.0", - "mongoose": "5.10.3" + "mongoose": "5.10.3", + "redis": "^2.8.0" } } diff --git a/src/common/Debug.ts b/src/common/Debug.ts index 095aba5..8a497dc 100644 --- a/src/common/Debug.ts +++ b/src/common/Debug.ts @@ -12,5 +12,7 @@ export const assistLog = debug('jc:assist'); export const cardLog = debug('jc:card'); +export const sysLog = debug('jc:sys'); + export const error = debug('jc:error'); error.log = console.error.bind(console); diff --git a/src/common/WebApi.ts b/src/common/WebApi.ts index 8088f72..dabe1bf 100644 --- a/src/common/WebApi.ts +++ b/src/common/WebApi.ts @@ -1,8 +1,6 @@ import axios from 'axios' -import { Config } from '../cfg/Config' import { debugRoom, error } from './Debug' - -let config: Config = require('../../config/config.json') +import { Service } from '../service/Service' /** * 获取卡组详情 @@ -11,8 +9,9 @@ let config: Config = require('../../config/config.json') * @param {string} cardgroup * @return {Promise>} */ -export function getCardGroup(accountid: string, heroid: number, cardgroup: string) { - return axios.get(`${ config.info_svr }/${ accountid }/group_info/${ heroid }/${ cardgroup }`) +export async function getCardGroup(accountid: string, heroid: number, cardgroup: string) { + const infoHost = await new Service().getInfoSvr() + return axios.get(`${ infoHost }/${ accountid }/group_info/${ heroid }/${ cardgroup }`) .then(function (response) { let res = response.data if (res.errcode) { @@ -23,8 +22,9 @@ export function getCardGroup(accountid: string, heroid: number, cardgroup: strin }) } -export function getUserInfo(accountid: string, isMatch: boolean) { - return axios.post(`${ config.info_svr }/${ accountid }/uinfo`, {isMatch}) +export async function getUserInfo(accountid: string, isMatch: boolean) { + const infoHost = await new Service().getInfoSvr() + return axios.post(`${ infoHost }/${ accountid }/uinfo`, {isMatch}) .then(function (response) { let res = response.data if (res.errcode) { @@ -35,9 +35,10 @@ export function getUserInfo(accountid: string, isMatch: boolean) { }) } -export function randomUserInfo(min: number, max: number, accounts: string[]) { +export async function randomUserInfo(min: number, max: number, accounts: string[]) { const data = {min, max, accounts} - return axios.post(`${ config.info_svr }/randomrobot`, data) + const infoHost = await new Service().getInfoSvr() + return axios.post(`${ infoHost }/randomrobot`, data) .then(function (response) { let res = response.data if (res.errcode) { @@ -54,9 +55,10 @@ export function randomUserInfo(min: number, max: number, accounts: string[]) { * @param {number | string} heroid * @return {Promise>} */ -export function requestUnlockHero(accountid: string, heroid: number | string) { +export async function requestUnlockHero(accountid: string, heroid: number | string) { let data = { 'type': 0 } - return axios.post(`${ config.info_svr }/${ accountid }/hero/unlock/${ heroid }`, data) + const infoHost = await new Service().getInfoSvr() + return axios.post(`${ infoHost }/${ accountid }/hero/unlock/${ heroid }`, data) } /** @@ -65,10 +67,10 @@ export function requestUnlockHero(accountid: string, heroid: number | string) { */ export async function reportGameResult(data: any) { let dataStr = JSON.stringify(data) - + const infoHost = await new Service().getInfoSvr() let reqConfig = { method: 'post', - url: `${ config.info_svr }/record/save`, + url: `${ infoHost }/record/save`, headers: { 'Content-Type': 'application/json' }, @@ -88,9 +90,13 @@ export async function reportGameResult(data: any) { export async function useItem(accountid: string, itemid: number, count: number) { const data = { itemid, count } let dataStr = JSON.stringify(data) + const infoHost = await new Service().getInfoSvr() + if (!infoHost) { + error('no info host found!!!') + } let reqConfig = { method: 'post', - url: `${ config.info_svr }/${ accountid }/useitem`, + url: `${ infoHost }/${ accountid }/useitem`, headers: { 'Content-Type': 'application/json' }, @@ -109,7 +115,8 @@ export async function useItem(accountid: string, itemid: number, count: number) */ export async function checkMatchTicket(accountid: string, matchid: string) { const data = { matchid } - const url = `${ config.info_svr }/${ accountid }/beginmatch` + const infoHost = await new Service().getInfoSvr() + const url = `${ infoHost }/${ accountid }/beginmatch` let reqConfig = { method: 'post', url, @@ -135,8 +142,12 @@ export async function checkMatchTicket(accountid: string, matchid: string) { * 创建机器人 * @param data */ -export function createRobot(data: any) { - axios.get('http://127.0.0.1:2500/robot/create', { +export async function createRobot(data: any) { + const robotHost = await new Service().getRobotSvr() + if (!robotHost) { + error('no robot host found!!!') + } + axios.get(`${robotHost}/robot/create`, { params: data }).then((res) => { debugRoom(`caeate robot result: `, res.data) diff --git a/src/index.ts b/src/index.ts index 5673fd9..895a3a4 100644 --- a/src/index.ts +++ b/src/index.ts @@ -10,6 +10,8 @@ import {initData} from "./common/GConfig"; import {Config} from "./cfg/Config"; import {RankedLobbyRoom} from "./rooms/RankedLobbyRoom"; import { MongooseDriver } from 'colyseus/lib/matchmaker/drivers/MongooseDriver' +import { Service } from './service/Service' +import { RedisClient } from './redis/RedisClient' require('./rooms/MSender'); require('./rooms/RoomExtMethod'); @@ -26,8 +28,8 @@ app.use(cors()); app.use(express.json()) initData(); const server = http.createServer(app); -let port -let gameServer +let port: number +let gameServer: Server if (isProd) { port = Number(process.env.PORT) + Number(process.env.NODE_APP_INSTANCE); gameServer = new Server({ @@ -70,10 +72,18 @@ app.use("/matchmake/", apiLimiter); // see https://expressjs.com/en/guide/behind-proxies.html app.set('trust proxy', 1); +let opts = {url: config.redis} +new RedisClient(opts) +const services = new Service() gameServer.onShutdown(function () { console.log("master process is being shut down!"); //TODO:: 保存所有数据至db, 重启时恢复 }); -gameServer.listen(port).then(() => { -}); -console.log(`Listening on ws://localhost:${port}`) +services.registSelf(port) + .then(() => { + return gameServer.listen(port) + }).then(() => { + console.log(`Listening on ws://localhost:${port}`) + }); + + diff --git a/src/redis/RedisClient.ts b/src/redis/RedisClient.ts new file mode 100644 index 0000000..0eac011 --- /dev/null +++ b/src/redis/RedisClient.ts @@ -0,0 +1,273 @@ +import redis from 'redis'; +import { promisify } from 'util'; +import { singleton } from '../decorators/singleton.decorator' + +type Callback = (...args: any[]) => void; + +@singleton +export class RedisClient { + public pub: redis.RedisClient; + public sub: redis.RedisClient; + + protected subscribeAsync: any; + protected unsubscribeAsync: any; + protected publishAsync: any; + + protected subscriptions: { [channel: string]: Callback[] } = {}; + + protected smembersAsync: any; + protected sismemberAsync: any; + protected hgetAsync: any; + protected hlenAsync: any; + protected pubsubAsync: any; + protected incrAsync: any; + protected decrAsync: any; + + constructor(opts?: redis.ClientOpts) { + this.sub = redis.createClient(opts); + this.pub = redis.createClient(opts); + + // no listener limit + this.sub.setMaxListeners(0); + + // create promisified pub/sub methods. + this.subscribeAsync = promisify(this.sub.subscribe).bind(this.sub); + this.unsubscribeAsync = promisify(this.sub.unsubscribe).bind(this.sub); + + this.publishAsync = promisify(this.pub.publish).bind(this.pub); + + // create promisified redis methods. + this.smembersAsync = promisify(this.pub.smembers).bind(this.pub); + this.sismemberAsync = promisify(this.pub.sismember).bind(this.pub); + this.hlenAsync = promisify(this.pub.hlen).bind(this.pub); + this.hgetAsync = promisify(this.pub.hget).bind(this.pub); + this.pubsubAsync = promisify(this.pub.pubsub).bind(this.pub); + this.decrAsync = promisify(this.pub.decr).bind(this.pub); + this.incrAsync = promisify(this.pub.incr).bind(this.pub); + } + + public async subscribe(topic: string, callback: Callback) { + if (!this.subscriptions[topic]) { + this.subscriptions[topic] = []; + } + + this.subscriptions[topic].push(callback); + + if (this.sub.listeners('message').length === 0) { + this.sub.addListener('message', this.handleSubscription); + } + + await this.subscribeAsync(topic); + + return this; + } + + public async unsubscribe(topic: string, callback?: Callback) { + if (callback) { + const index = this.subscriptions[topic].indexOf(callback); + this.subscriptions[topic].splice(index, 1); + + } else { + this.subscriptions[topic] = []; + } + + if (this.subscriptions[topic].length === 0) { + await this.unsubscribeAsync(topic); + } + + return this; + } + + public async publish(topic: string, data: any) { + if (data === undefined) { + data = false; + } + + await this.publishAsync(topic, JSON.stringify(data)); + } + + public async exists(roomId: string): Promise { + return (await this.pubsubAsync('channels', roomId)).length > 0; + } + + public async setex(key: string, value: string, seconds: number) { + return new Promise((resolve) => + this.pub.setex(key, seconds, value, resolve)); + } + + public async expire(key: string, seconds: number) { + return new Promise((resolve) => + this.pub.expire(key, seconds, resolve)); + } + + public async get(key: string) { + return new Promise((resolve, reject) => { + this.pub.get(key, (err, data) => { + if (err) { return reject(err); } + resolve(data); + }); + }); + } + + public async del(roomId: string) { + return new Promise((resolve) => { + this.pub.del(roomId, resolve); + }); + } + + public async sadd(key: string, value: any) { + return new Promise((resolve) => { + this.pub.sadd(key, value, resolve); + }); + } + + public async smembers(key: string): Promise { + return await this.smembersAsync(key); + } + + public async sismember(key: string, field: string): Promise { + return await this.sismemberAsync(key, field); + } + + public async srem(key: string, value: any) { + return new Promise((resolve) => { + this.pub.srem(key, value, resolve); + }); + } + + public async scard(key: string) { + return new Promise((resolve, reject) => { + this.pub.scard(key, (err, data) => { + if (err) { return reject(err); } + resolve(data); + }); + }); + } + public async srandmember(key: string) { + return new Promise((resolve, reject) => { + this.pub.srandmember(key, (err, data) => { + if (err) { return reject(err); } + resolve(data); + }); + }); + } + + public async sinter(...keys: string[]) { + return new Promise((resolve, reject) => { + this.pub.sinter(...keys, (err, data) => { + if (err) { return reject(err); } + resolve(data); + }); + }); + } + + public async zadd(key: string, value: any, member: string) { + return new Promise((resolve) => { + this.pub.zadd(key, value, member, resolve); + }); + } + public async zrangebyscore(key: string, min: number, max: number) { + return new Promise((resolve, reject) => { + this.pub.zrangebyscore(key, min, max, 'withscores', (err, data) => { + if (err) { return reject(err); } + resolve(data); + }); + }); + } + + public async zcard(key: string) { + return new Promise((resolve, reject) => { + this.pub.zcard(key, (err, data) => { + if (err) { return reject(err); } + resolve(data); + }); + }); + } + + public async zrevrank(key: string, member: string) { + return new Promise((resolve, reject) => { + this.pub.zrevrank(key, member, (err, data) => { + if (err) { return reject(err); } + resolve(data); + }); + }); + } + + public async zscore(key: string, member: string) { + return new Promise((resolve, reject) => { + this.pub.zscore(key, member, (err, data) => { + if (err) { return reject(err); } + resolve(data); + }); + }); + } + + + public async zrevrange(key: string, start: number, end: number) { + return new Promise((resolve, reject) => { + this.pub.zrevrange(key, start, end, 'withscores', (err, data) => { + if (err) { return reject(err); } + resolve(data); + }); + }); + } + + public async hset(key: string, field: string, value: string) { + return new Promise((resolve) => { + this.pub.hset(key, field, value, resolve); + }); + } + + public async hincrby(key: string, field: string, value: number) { + return new Promise((resolve) => { + this.pub.hincrby(key, field, value, resolve); + }); + } + + public async hget(key: string, field: string) { + return await this.hgetAsync(key, field); + } + + public async hgetall(key: string) { + return new Promise<{ [key: string]: string }>((resolve, reject) => { + this.pub.hgetall(key, (err, values) => { + if (err) { return reject(err); } + resolve(values); + }); + }); + } + + public async hdel(key: string, field: string) { + return new Promise((resolve, reject) => { + this.pub.hdel(key, field, (err, ok) => { + if (err) { return reject(err); } + resolve(ok); + }); + }); + } + + public async hlen(key: string): Promise { + return await this.hlenAsync(key); + } + + public async incr(key: string): Promise { + return await this.incrAsync(key); + } + + public async decr(key: string): Promise { + return await this.decrAsync(key); + } + + protected handleSubscription = (channel: string, message: string) => { + if (this.subscriptions[channel]) { + for (let i = 0, l = this.subscriptions[channel].length; i < l; i++) { + try { + this.subscriptions[channel][i](JSON.parse(message)); + } catch (err) { + this.subscriptions[channel][i](message); + } + + } + } + } +} diff --git a/src/robot.ts b/src/robot.ts index 4d02fbc..82eb9c0 100644 --- a/src/robot.ts +++ b/src/robot.ts @@ -4,19 +4,42 @@ import cors from "cors"; import bodyParser from 'body-parser'; import mainCtrl from './robot/main.controller'; import {initData} from "./common/GConfig"; +import { + registerGracefulShutdown, + registService, + unRegistService +} from './utils/system.util' +import { error } from './common/Debug' +import { Config } from './cfg/Config' +import { RedisClient } from './redis/RedisClient' require('./common/Extend'); const app = express() const port = Number(process.env.PORT || 2500); +let config: Config = require('../config/config.json'); +const isProd = process.env.NODE_ENV === 'production' initData(); app.use(cors()); app.use(express.json()); app.use(bodyParser.json({})); app.use('/robot', mainCtrl); +let gracefullyShutdown = function (exit: boolean = true, err?: Error) { + unRegistService(port).then(()=>{}) + .catch(err => { + error('error unregistservice') + }) +} -app.listen(port, function () { +let opts = {url: config.redis} +new RedisClient(opts) + +app.listen(port, async function () { + if (isProd) { + await registerGracefulShutdown((err) => gracefullyShutdown(true, err)); + await registService(port) + } console.log(`App is listening on port ${port}!`); }); diff --git a/src/rooms/GeneralRoom.ts b/src/rooms/GeneralRoom.ts index 37908ff..e82f038 100644 --- a/src/rooms/GeneralRoom.ts +++ b/src/rooms/GeneralRoom.ts @@ -21,6 +21,7 @@ import {GameRestartCommand} from "./commands/GameRestartCommand"; import {RobotClient} from "../robot/RobotClient"; import {ChangePetCommand} from "./commands/ChangePetCommand"; import {createRobot} from "../common/WebApi"; +import { Service } from '../service/Service' export class GeneralRoom extends Room { dispatcher = new Dispatcher(this); @@ -250,13 +251,14 @@ export class GeneralRoom extends Room { } } - addRobot(playerId?: string) { + async addRobot(playerId?: string) { + const host = new Service().selfHost let data = { - host: 'ws://127.0.0.1:2567', + host, room: this.roomId, sessionId: playerId } - createRobot(data); + await createRobot(data); } addAssistClient(sessionId: string) { diff --git a/src/rooms/commands/GameResultCommand.ts b/src/rooms/commands/GameResultCommand.ts index de5e7e5..15e016c 100644 --- a/src/rooms/commands/GameResultCommand.ts +++ b/src/rooms/commands/GameResultCommand.ts @@ -177,11 +177,11 @@ export class GameResultCommand extends Command { await self.room.unlock(); await self.room.setPrivate(false); //开启匹配定时, 长时间没匹配到人的话, 添加机器人 - let timeOutWaitingPlayer = function () { + let timeOutWaitingPlayer = async function () { let count = self.room.maxClients - self.room.clientCount(); if (count > 0) { for (let i = 0; i < count; i++) { - self.room.addRobot(); + await self.room.addRobot(); } } } diff --git a/src/rooms/commands/OnJoinCommand.ts b/src/rooms/commands/OnJoinCommand.ts index 582040d..0e54420 100644 --- a/src/rooms/commands/OnJoinCommand.ts +++ b/src/rooms/commands/OnJoinCommand.ts @@ -69,11 +69,11 @@ export class OnJoinCommand extends Command 0) { for (let i = 0; i < count; i++) { - self.room.addRobot(); + await self.room.addRobot(); } } } @@ -92,7 +92,7 @@ export class OnJoinCommand extends Command= this.room.maxClients - this.room.robotCount) { for (let i = 0; i < this.room.robotCount; i++) { this.room.robotCount --; - self.room.addRobot(); + await self.room.addRobot(); } } } diff --git a/src/service/Service.ts b/src/service/Service.ts new file mode 100644 index 0000000..3062880 --- /dev/null +++ b/src/service/Service.ts @@ -0,0 +1,115 @@ +import { singleton } from '../decorators/singleton.decorator' +import { getNodeList, listen, Action } from './discovery' +import { error, sysLog } from '../common/Debug' +import { Config } from '../cfg/Config' +import ip from 'internal-ip' +const config: Config = require('../../config/config.json') +const isProd = process.env.NODE_ENV === 'production' +@singleton +export class Service { + /** + * info server的key名 + * @type {string} + */ + public static readonly INFO_NODE = 'poker:infosvr' + /** + * info server发布的channel + * @type {string} + */ + public static readonly INFO_CHANNEL = 'poker:infosvr:discovery' + /** + * robot server的key名 + * @type {string} + */ + public static readonly ROBOT_NODE = 'poker:robot' + /** + * robot server发布的channel + * @type {string} + */ + public static readonly ROBOT_CHANNEL = 'poker:robot:discovery' + + public selfHost: string + + public serviceMap: Map = new Map() + + + constructor() { + this.serviceMap.set(Service.INFO_NODE, []) + this.serviceMap.set(Service.ROBOT_NODE, []) + if (isProd) { + this.discoveryServices() + } + } + + public discoveryServices() { + listen(Service.INFO_CHANNEL, (action: Action, address: string) => { + sysLog("LISTEN: info channel ", action, address); + if (action === 'add') { + this.register(Service.INFO_NODE, address); + + } else if (action == 'remove') { + this.unregister(Service.INFO_NODE, address); + } + }).then(() => { + sysLog('subscribe to info channel success') + }).catch(err => { + sysLog('subscribe to info channel error', err) + }) + listen(Service.ROBOT_CHANNEL, (action: Action, address: string) => { + sysLog("LISTEN: robot channel", action, address); + if (action === 'add') { + this.register(Service.ROBOT_NODE, address); + + } else if (action == 'remove') { + this.unregister(Service.ROBOT_NODE, address); + } + }).then(() => { + sysLog('subscribe to robot channel success') + }).catch(err => { + sysLog('subscribe to robot channel error', err) + }) + } + + public async getInfoSvr() { + let key = Service.INFO_NODE + if (!this.serviceMap.has(key) || this.serviceMap.get(key).length == 0) { + let svrList = await getNodeList(key) + this.serviceMap.set(key, svrList) + } + let svrList = this.serviceMap.get(key) + if (svrList.length == 0 || !isProd) { + error('no info service found') + return config.info_svr + } + return 'http://' + svrList.randomOne()+'/svr' + } + + public async getRobotSvr() { + let key = Service.ROBOT_NODE + if (!this.serviceMap.has(key) || this.serviceMap.get(key).length == 0) { + let svrList = await getNodeList(key) + this.serviceMap.set(key, svrList) + } + let svrList = this.serviceMap.get(key) + if (svrList.length == 0 || !isProd) { + error('no robot service found') + return 'http://127.0.0.1:2500' + } + return 'http://' + svrList.randomOne() + } + + private register(type: string, address: string) { + let svrList = this.serviceMap.get(type) + svrList.pushOnce(address) + } + private unregister(type: string, address: string) { + let svrList = this.serviceMap.get(type) + svrList.removeEx(address) + } + + public async registSelf(port: number) { + const host = process.env.SELF_HOSTNAME || await ip.v4(); + this.selfHost = `ws://${host}:${port}` + } + +} diff --git a/src/service/discovery.ts b/src/service/discovery.ts new file mode 100644 index 0000000..74b6706 --- /dev/null +++ b/src/service/discovery.ts @@ -0,0 +1,16 @@ +import { RedisClient } from '../redis/RedisClient' + +export type Action = 'add' | 'remove'; + + +export async function getNodeList(type: string): Promise { + const nodes: string[] = await new RedisClient().smembers(type); + return nodes +} + +export async function listen(channel: string, cb: (action: Action, address: string) => void) { + await new RedisClient().subscribe(channel, function (message: any){ + const [action, address] = message.split(","); + cb(action, address); + }) +} diff --git a/src/utils/system.util.ts b/src/utils/system.util.ts new file mode 100644 index 0000000..627450e --- /dev/null +++ b/src/utils/system.util.ts @@ -0,0 +1,39 @@ +import { RedisClient } from '../redis/RedisClient' +import ip from 'internal-ip' +import { Service } from '../service/Service' + +const signals: NodeJS.Signals[] = ['SIGINT', 'SIGTERM', 'SIGUSR2'] + + +export function registerGracefulShutdown(callback: (err?: Error) => void) { + /** + * Gracefully shutdown on uncaught errors + */ + process.on('uncaughtException', (err) => { + console.error(err) + callback(err) + }) + + signals.forEach((signal) => { + process.once(signal, () => callback()) + }) +} + +async function getNodeAddress(port: number) { + const host = process.env.SELF_HOSTNAME || await ip.v4() + return `${ host }:${ port }` +} + +export async function registService(port: number) { + const address = await getNodeAddress(port) + const client = new RedisClient() + await client.sadd(Service.ROBOT_NODE, address) + await client.publish(Service.ROBOT_CHANNEL, `add,${address}`) +} + +export async function unRegistService(port: number) { + const address = await getNodeAddress(port) + const client = new RedisClient() + await client.srem(Service.ROBOT_NODE, address) + await client.publish(Service.ROBOT_CHANNEL, `remove,${address}`) +}