# -*- coding: utf-8 -*- #!/usr/bin/python import pymysql import hashlib import json import urllib.request import base64 import tornado.ioloop import tornado.web import time import datetime import redis import os import functools CONFIG_DIR = '' def IsOnlineEnv(): return os.getenv("SERVER_ENV"); if (IsOnlineEnv()): CONFIG_DIR = '/var/data/conf_test/game2003api_rankserver/config' else: CONFIG_DIR = '../config' def info(msg): print(str(datetime.datetime.now()) + '[INFO] ' + msg) def take_pass(elem): return elem[3] def take_coin_num(elem): return elem[4] def safeDiv(a, b): if b == 0: return 0 else: return a / b #获取channel def getChannel(a): str_list = a.split('_') if len(str_list) < 3: return 0 return str_list[0] def getRedis(): redis_conf = json.loadsmysql_conf = json.loads(open(CONFIG_DIR + '/rankserver.redis.cluster.json', 'r').read()) for conf in redis_conf: r = redis.Redis(host = conf['host'], port = conf['port'], password = conf['passwd'], charset = 'utf8' ) return r; def getRedisConf(): redis_conf = json.loads(open(CONFIG_DIR + '/rankserver.redis.cluster.json', 'r').read()) return redis_conf; def getDaySeconds(time_val, incdays): time_zone = 8 dayseconds = int((time_val + time_zone * 3600)/3600/24 + incdays) * 3600 * 24 - 3600 * time_zone; return dayseconds #数据去重 def delRepeatData(row, data_list): temp_list = [] for data in data_list: if data[0] == row[0]: temp_list.append(data) for temp_data in temp_list: data_list.remove(temp_data) #刷新通关数据 def refreshData(row, pass_list): pass_list.append((row[0], row[1].decode('utf-8'), row[2], row[3], row[4])) pass_list.sort(key=take_pass, reverse=True) if (len(pass_list) > 50): del pass_list[50:] #更新排行榜 def updateRank(r, channel, pass_list): pass_list.sort(key=take_pass, reverse=True) pass_rank = [] for pass_index in range(min(50, len(pass_list))): pass_rank.append(pass_list[pass_index]) r.set("game2003api:pass_rank_" + channel, json.dumps(pass_rank)) def internalDayReadMysqlData(): mysql_conf = json.loads(open(CONFIG_DIR + '/rankserver.mysql.cluster.json', 'r').read()) rank_hash = {} for conf in mysql_conf: conn = pymysql.connect(host = conf['host'], port = conf['port'], user = conf['user'], passwd = conf['passwd'], db = 'gamedb2003_' + str(conf['instance_id']), charset = 'utf8' ) cursor = conn.cursor() last_idx = 0 temp_idx = 0 while 1: cursor.execute('SELECT accountid, user_name, avatar_url, pass, cumul_coin, idx ' 'FROM user WHERE idx > %s LIMIT 0, 5000' % (last_idx)) has_data = False for row in cursor: has_data = True #更新通关榜 channel = getChannel(row[0]) if channel not in rank_hash: rank_hash[channel] = [] refreshData(row, rank_hash[channel]) last_idx = max(row[5], last_idx) time.sleep(0.001); if not has_data: break r = getRedis() for channel in rank_hash: updateRank(r, channel, rank_hash[channel]) #每日定时读取mysql里的数据生成排行榜写入redis后php读取redis返回客户端显示 def dayReadMysqlData(rushtime): internalDayReadMysqlData() tornado.ioloop.IOLoop.current().call_at(getDaySeconds(time.time(), 1) + rushtime, lambda : dayReadMysqlData(rushtime) ) #每5分钟读取mysql里发生改变过的数据更新排行榜 def readMysqlData(rushtime): mysql_conf = json.loads(open(CONFIG_DIR + '/rankserver.mysql.cluster.json', 'r').read()) r = getRedis() rank_hash = {} for conf in mysql_conf: conn = pymysql.connect(host = conf['host'], port = conf['port'], user = conf['user'], passwd = conf['passwd'], db = 'gamedb2003_' + str(conf['instance_id']), charset = 'utf8' ) cursor = conn.cursor() last_idx = 0 temp_idx = 0 while 1: cursor.execute('SELECT accountid, user_name, avatar_url, pass, cumul_coin, idx, modify_time FROM user ' ' WHERE modify_time > %s AND idx > %s LIMIT 0, 1000' % (time.time() - 300, last_idx)) has_data = False for row in cursor: has_data = True channel = getChannel(row[0]) if channel not in rank_hash: rank_list = r.get('game2003api:pass_rank_' + channel) rank_hash[channel] = [] if not rank_list else json.loads(rank_list) #更新通关榜 delRepeatData(row, rank_hash[channel]) refreshData(row, rank_hash[channel]) temp_idx = int(row[5]) if (temp_idx > last_idx) : last_idx = int(row[5]) time.sleep(0.001); if not has_data: break for channel in rank_hash: updateRank(r, channel, rank_hash[channel]) tornado.ioloop.IOLoop.current().call_later(rushtime, lambda : readMysqlData(rushtime) ) class SelfCheckingHandler(tornado.web.RequestHandler): def get(self): self.write(json.dumps({ 'errcode': 0, 'errmsg': '', 'healthy': 1, 'max_rundelay': 10 })) def make_app(): return tornado.web.Application([ (r"/webapp/index[\.]php", SelfCheckingHandler), ]) if __name__ == "__main__": conf = json.loads(open(CONFIG_DIR + '/rankserver.json', 'r').read()) app = make_app() app.listen(conf['listen_port']) conf['rushtime'] = 5 tornado.ioloop.IOLoop.current().call_later(conf['rushtime'], lambda : readMysqlData(conf['rushtime']) ) conf['day_rushtime'] = 5 * 3600 tornado.ioloop.IOLoop.current().call_at(getDaySeconds(time.time(), 1) + conf['day_rushtime'], lambda : dayReadMysqlData(conf['day_rushtime']) ) tornado.ioloop.IOLoop.current().start()