# -*- coding: utf-8 -*- #!/usr/bin/python import sys sys.path.append('../local_packages') import q7 import f7 import pymysql import hashlib import json import urllib.request import base64 import time import datetime import redis import os import functools CONFIG_DIR = '../config' if f7.isOnlineEnv() else '/var/data/conf_test/game2003api_rankserver/config' def _take_pass(elem): return elem[3] def _getRedis(): redis_conf = json.loads(open(CONFIG_DIR + '/rankserver.redis.cluster.json', 'r').read()) for conf in redis_conf: r = redis.Redis(host = conf['host'], port = conf['port'], password = conf['passwd'], charset = 'utf8' ) return r; def getRedisConf(): redis_conf = json.loads(open(CONFIG_DIR + '/rankserver.redis.cluster.json', 'r').read()) return redis_conf; #数据去重 def _delRepeatData(row, data_list): temp_list = [] for data in data_list: if data[0] == row[0]: temp_list.append(data) for temp_data in temp_list: data_list.remove(temp_data) #刷新通关数据 def _refreshData(row, pass_list): pass_list.append((row[0], row[1].decode('utf-8'), row[2], row[3], row[4])) pass_list.sort(key=_take_pass, reverse=True) if (len(pass_list) > 50): del pass_list[50:] #更新排行榜 def _updateRank(r, channel, pass_list): pass_list.sort(key=_take_pass, reverse=True) pass_rank = [] for pass_index in range(min(50, len(pass_list))): pass_rank.append(pass_list[pass_index]) r.set("game2003api:pass_rank_" + channel, json.dumps(pass_rank)) def fullUpdateRank(): mysql_conf = json.loads(open(CONFIG_DIR + '/rankserver.mysql.cluster.json', 'r').read()) rank_hash = {} for conf in mysql_conf: conn = pymysql.connect(host = conf['host'], port = conf['port'], user = conf['user'], passwd = conf['passwd'], db = 'gamedb2003_' + str(conf['instance_id']), charset = 'utf8' ) cursor = conn.cursor() last_idx = 0 temp_idx = 0 while 1: cursor.execute('SELECT accountid, user_name, avatar_url, pass, cumul_coin, idx ' 'FROM user WHERE idx > %s LIMIT 0, 5000' % (last_idx)) has_data = False for row in cursor: has_data = True #更新通关榜 channel = f7.getChannelByAccountId(row[0]) if channel not in rank_hash: rank_hash[channel] = [] _refreshData(row, rank_hash[channel]) last_idx = max(row[5], last_idx) time.sleep(0.001); if not has_data: break r = _getRedis() for channel in rank_hash: _updateRank(r, channel, rank_hash[channel]) #每日定时读取mysql里的数据生成排行榜写入redis后php读取redis返回客户端显示 def _fullUpdateRank(rushtime): def done_callback(): f7.timer.callAt(q7.getDaySeconds(time.time(), 1) + rushtime, lambda : _fullUpdateRank(rushtime)) f7.app.createAsyncTask(done_callback, fullUpdateRank, ()) def incrementUpdateRank(rushtime): mysql_conf = json.loads(open(CONFIG_DIR + '/rankserver.mysql.cluster.json', 'r').read()) r = _getRedis() rank_hash = {} for conf in mysql_conf: conn = pymysql.connect(host = conf['host'], port = conf['port'], user = conf['user'], passwd = conf['passwd'], db = 'gamedb2003_' + str(conf['instance_id']), charset = 'utf8' ) cursor = conn.cursor() last_idx = 0 temp_idx = 0 while 1: cursor.execute('SELECT accountid, user_name, avatar_url, pass, cumul_coin, idx, modify_time FROM user ' ' WHERE modify_time > %s AND idx > %s LIMIT 0, 1000' % (time.time() - 300, last_idx)) has_data = False for row in cursor: has_data = True channel = f7.getChannelByAccountId(row[0]) if channel not in rank_hash: rank_list = r.get('game2003api:pass_rank_' + channel) rank_hash[channel] = [] if not rank_list else json.loads(rank_list) #更新通关榜 _delRepeatData(row, rank_hash[channel]) _refreshData(row, rank_hash[channel]) temp_idx = int(row[5]) if (temp_idx > last_idx) : last_idx = int(row[5]) time.sleep(0.001); if not has_data: break for channel in rank_hash: _updateRank(r, channel, rank_hash[channel]) #每5分钟读取mysql里发生改变过的数据更新排行榜 def _incrementUpdateRank(rushtime): def done_callback(): f7.timer.callLater(rushtime, lambda : _incrementUpdateRank(rushtime)) f7.app.createAsyncTask(done_callback, incrementUpdateRank, ()) if __name__ == "__main__": f7.app.init('/data/logs/game2003_rankserver/logs') f7.udplog.info('rankserver start pid:' + str(os.getpid())) conf = json.loads(open(CONFIG_DIR + '/rankserver.json', 'r').read()) conf['rushtime'] = 300 f7.timer.callLater(conf['rushtime'], lambda : _incrementUpdateRank(conf['rushtime'])) conf['day_rushtime'] = 5 * 3600 f7.timer.callAt(q7.getDaySeconds(time.time(), 1) + conf['day_rushtime'], lambda : _fullUpdateRank(conf['day_rushtime'])) f7.app.listen(conf['listen_port']) f7.app.start()