# -*- coding: utf-8 -*- #!/usr/bin/python import sys sys.path.append('../local_packages') import q7 import f7 import pymysql import hashlib import json import urllib.request import base64 import time import datetime import redis import os import functools CONFIG_DIR = '../config' if f7.isOnlineEnv() else '/var/data/conf_test/game2001api_rankserver/config' def _take_pass(elem): return elem[3] def _getRedis(): redis_conf = json.loads(open(CONFIG_DIR + '/rankserver.redis.cluster.json', 'r').read()) for conf in redis_conf: r = redis.Redis(host = conf['host'], port = conf['port'], password = conf['passwd'], charset = 'utf8' ) return r; def getRedisConf(): redis_conf = json.loads(open(CONFIG_DIR + '/rankserver.redis.cluster.json', 'r').read()) return redis_conf; def info(msg): print(str(datetime.datetime.now()) + '[INFO] ' + msg) def take_kills(elem): return elem[3] def take_game_times(elem): return elem[7] def take_integral_times(elem): return elem[8] def safeDiv(a, b): if b == 0: return 0 else: return a / b def getDaySeconds(time_val, incdays): time_zone = 8 dayseconds = int((time_val + time_zone * 3600)/3600/24 + incdays) * 3600 * 24 - 3600 * time_zone; return dayseconds #数据去重 def _delRepeatData(row, data_list): temp_list = [] for data in data_list: if data[0] == row[0]: temp_list.append(data) for temp_data in temp_list: data_list.remove(temp_data) #刷新数据 def _refreshData(row, data_list, data_info): key_info = data_info kill = safeDiv(row[3], row[7]) alive_time = safeDiv(row[4], row[7]) harm = safeDiv(row[5], row[7]) win_times = safeDiv(row[6], row[7]) num = row[9] if (time.time() > row[10]): num = 0 data_list.append((row[0], row[1].decode('utf-8'), row[2], kill, alive_time, harm, win_times, row[6], num)) data_list.sort(key=key_info, reverse=True) if (len(data_list) > 50): del data_list[50:] #更新排行榜 def _updateKillRank(r, channel, kill_list): kill_list.sort(key=take_kills, reverse=True) kill_rank = [] for kill_index in range(min(50, len(kill_list))): kill_rank.append(kill_list[kill_index]) r.set("game2001api:kill_rank_" + channel, json.dumps(kill_rank)) def _updateWinRank(r, channel, win_list): win_list.sort(key=take_game_times, reverse=True) win_rank = [] for win_index in range(min(50, len(win_list))): win_rank.append(win_list[win_index]) r.set("game2001api:win_rank_" + channel, json.dumps(win_rank)) def _updateScoreRank(r, channel, integral_list): integral_list.sort(key=take_integral_times, reverse=True) integral_rank = [] for integral_index in range(min(50, len(integral_list))): integral_rank.append(integral_list[integral_index]) r.set("game2001api:integral_rank_" + channel, json.dumps(integral_rank)) def fullUpdateRank(): f7.udplog.info('fullUpdateRank begin') mysql_conf = json.loads(open(CONFIG_DIR + '/rankserver.mysql.cluster.json', 'r').read()) kill_hash = {} win_hash = {} integral_hash = {} for conf in mysql_conf: conn = pymysql.connect(host = conf['host'], port = conf['port'], user = conf['user'], passwd = conf['passwd'], db = 'gamedb2001_' + str(conf['instance_id']), charset = 'utf8' ) cursor = conn.cursor() last_idx = 0 temp_idx = 0 while 1: cursor.execute('SELECT accountid, user_name, avatar_url, kills, alive_time,' ' harm, win_times, game_times, idx, integral, season_time FROM user WHERE idx > %s LIMIT 0, 1000' % (last_idx)) has_data = False for row in cursor: has_data = True #更新击杀榜 channel = f7.getChannelByAccountId(row[0]) if channel not in kill_hash: kill_hash[channel] = [] _refreshData(row, kill_hash[channel], take_kills) #更新胜场榜 channel = f7.getChannelByAccountId(row[0]) if channel not in win_hash: win_hash[channel] = [] _refreshData(row, win_hash[channel], take_game_times) #更新积分榜 channel = f7.getChannelByAccountId(row[0]) if channel not in integral_hash: integral_hash[channel] = [] _refreshData(row, integral_hash[channel], take_integral_times) last_idx = max(row[8], last_idx) time.sleep(0.001); if not has_data: break r = _getRedis() for channel in kill_hash: _updateKillRank(r, channel, kill_hash[channel]) for channel in win_hash: _updateWinRank(r, channel, win_hash[channel]) for channel in integral_hash: _updateScoreRank(r, channel, integral_hash[channel]) f7.udplog.info('fullUpdateRank end') #每日定时读取mysql里的数据生成排行榜写入redis后php读取redis返回客户端显示 def _fullUpdateRank(rushtime): def done_callback(): f7.timer.callAt(q7.getDaySeconds(time.time(), 1) + rushtime, lambda : _fullUpdateRank(rushtime)) f7.app.createAsyncTask(done_callback, fullUpdateRank, ()) #每5分钟读取mysql里发生改变过的数据更新排行榜 def incrementUpdateRank(): mysql_conf = json.loads(open(CONFIG_DIR + '/rankserver.mysql.cluster.json', 'r').read()) r = _getRedis() kill_hash = {} win_hash = {} integral_hash = {} for conf in mysql_conf: conn = pymysql.connect(host = conf['host'], port = conf['port'], user = conf['user'], passwd = conf['passwd'], db = 'gamedb2001_' + str(conf['instance_id']), charset = 'utf8' ) cursor = conn.cursor() last_idx = 0 temp_idx = 0 while 1: cursor.execute('SELECT accountid, user_name, avatar_url, kills, alive_time,' ' harm, win_times, game_times, idx, integral, season_time, kill_modifytime FROM user ' ' WHERE kill_modifytime > %s AND idx > %s LIMIT 0, 1000' % (time.time() - 300, last_idx)) has_data = False for row in cursor: has_data = True channel = f7.getChannelByAccountId(row[0]) #更新击杀榜 if channel not in kill_hash: kill_list = r.get('game2001api:kill_rank_' + channel) kill_hash[channel] = [] if not kill_list else json.loads(kill_list) _delRepeatData(row, kill_hash[channel]) _refreshData(row, kill_hash[channel], take_kills) #更新胜场榜 if channel not in win_hash: win_list = r.get('game2001api:win_rank_' + channel) win_hash[channel] = [] if not win_list else json.loads(win_list) _delRepeatData(row, win_hash[channel]) _refreshData(row, win_hash[channel], take_game_times) #更新积分榜 if channel not in integral_hash: integral_list = r.get('game2001api:integral_rank_' + channel) integral_hash[channel] = [] if not integral_list else json.loads(integral_list) _delRepeatData(row, integral_hash[channel]) _refreshData(row, integral_hash[channel], take_integral_times) last_idx = max(row[8], last_idx) time.sleep(0.001); if not has_data: break for channel in kill_hash: _updateKillRank(r, channel, kill_hash[channel]) for channel in win_hash: _updateWinRank(r, channel, win_hash[channel]) for channel in integral_hash: _updateScoreRank(r, channel, integral_hash[channel]) f7.udplog.info('incrementUpdateRank end') #每5分钟读取mysql里发生改变过的数据更新排行榜 def _incrementUpdateRank(rushtime): def done_callback(): f7.timer.callLater(rushtime, lambda : _incrementUpdateRank(rushtime)) f7.app.createAsyncTask(done_callback, incrementUpdateRank, ()) if __name__ == "__main__": q7.xPrint('pid %d' % os.getpid()) f7.app.init('/data/logs/game2001_rankserver/logs') f7.udplog.info('rankserver start pid:' + str(os.getpid())) conf = json.loads(open(CONFIG_DIR + '/rankserver.json', 'r').read()) conf['rushtime'] = 300 f7.timer.callLater(conf['rushtime'], lambda : _incrementUpdateRank(conf['rushtime'])) conf['day_rushtime'] = 5 * 3600 f7.timer.callAt(q7.getDaySeconds(time.time(), 1) + conf['day_rushtime'], lambda : _fullUpdateRank(conf['day_rushtime'])) f7.app.listen(conf['listen_port']) f7.app.start()