aozhiwei da9c86495e 1
2020-12-07 19:25:59 +08:00

279 lines
10 KiB
Python

# -*- coding: utf-8 -*-
#!/usr/bin/python
import sys
sys.path.append('../local_packages')
import q7
import f7
import pymysql
import hashlib
import json
import urllib.request
import base64
import time
import datetime
import redis
import os
import functools
CONFIG_DIR = '../config' if f7.isOnlineEnv() else '/var/data/conf_test/game2004api_rankserver/config'
RANK_KEY_PREFIX = 'game2004api:'
DBNAME_PREFIX = 'gamedb2004_'
IGNORE_GUEST_ACCOUNT = 0
def _take_pass(elem):
return elem[3]
def _getRedis():
redis_conf = json.loads(open(CONFIG_DIR + '/rankserver.redis.cluster.json', 'r').read())
for conf in redis_conf:
r = redis.Redis(host = conf['host'],
port = conf['port'],
password = conf['passwd'],
charset = 'utf8'
)
return r;
def getRedisConf():
redis_conf = json.loads(open(CONFIG_DIR + '/rankserver.redis.cluster.json', 'r').read())
return redis_conf;
def checkchannel(channel):
if channel == 6006:
return False
else:
return True
def info(msg):
print(str(datetime.datetime.now()) + '[INFO] ' + msg)
def take_kills(elem):
return elem[3]
def take_game_times(elem):
return elem[7]
def take_integral_times(elem):
return elem[8]
def safeDiv(a, b):
if b == 0:
return 0
else:
return a / b
def getDaySeconds(time_val, incdays):
time_zone = 8
dayseconds = int((time_val + time_zone * 3600)/3600/24 + incdays) * 3600 * 24 - 3600 * time_zone;
return dayseconds
#数据去重
def _delRepeatData(row, data_list):
temp_list = []
for data in data_list:
if data[0] == row[0]:
temp_list.append(data)
for temp_data in temp_list:
data_list.remove(temp_data)
#刷新数据
def _refreshData(row, data_list, data_info):
key_info = data_info
kill = safeDiv(row[3], row[7])
alive_time = safeDiv(row[4], row[7])
harm = safeDiv(row[5], row[7])
win_times = safeDiv(row[6], row[7])
num = row[9]
if (time.time() > row[10]):
num = 0
user_name = ''
if row[1]:
user_name = row[1].decode('utf-8')
data_list.append((row[0], user_name, row[2], kill, alive_time, harm, win_times, row[6], num))
data_list.sort(key=key_info, reverse=True)
if (len(data_list) > 50):
del data_list[50:]
#更新排行榜
def _updateKillRank(r, channel, kill_list):
kill_list.sort(key=take_kills, reverse=True)
kill_rank = []
for kill_index in range(min(50, len(kill_list))):
kill_rank.append(kill_list[kill_index])
r.set("game2004api:kill_rank_" + channel, json.dumps(kill_rank))
def _updateWinRank(r, channel, win_list):
win_list.sort(key=take_game_times, reverse=True)
win_rank = []
for win_index in range(min(50, len(win_list))):
win_rank.append(win_list[win_index])
r.set("game2004api:win_rank_" + channel, json.dumps(win_rank))
def _updateScoreRank(r, channel, integral_list):
integral_list.sort(key=take_integral_times, reverse=True)
integral_rank = []
for integral_index in range(min(50, len(integral_list))):
integral_rank.append(integral_list[integral_index])
r.set("game2004api:integral_rank_" + channel, json.dumps(integral_rank))
def fullUpdateRank():
f7.udplog.info('fullUpdateRank begin')
mysql_conf = json.loads(open(CONFIG_DIR + '/rankserver.mysql.cluster.json', 'r').read())
kill_hash = {}
win_hash = {}
integral_hash = {}
for conf in mysql_conf:
conn = pymysql.connect(host = conf['host'],
port = conf['port'],
user = conf['user'],
passwd = conf['passwd'],
db = DBNAME_PREFIX + str(conf['instance_id']),
charset = 'utf8'
)
cursor = conn.cursor()
last_idx = 0
temp_idx = 0
while 1:
cursor.execute('SELECT accountid, user_name, avatar_url, kills, alive_time,'
' harm, win_times, game_times, idx, integral, season_time FROM user WHERE idx > %s LIMIT 0, 1000' % (last_idx))
has_data = False
for row in cursor:
has_data = True
last_idx = max(row[8], last_idx)
user_name = ''
if row[1]:
user_name = str(row[1], encoding='utf8')
if IGNORE_GUEST_ACCOUNT and (user_name == '游客' or user_name == '极乐玩家'):
continue
#更新击杀榜
channel = f7.getChannelByAccountId(row[0])
if checkchannel(channel):
if channel not in kill_hash:
kill_hash[channel] = []
_refreshData(row, kill_hash[channel], take_kills)
#更新胜场榜
channel = f7.getChannelByAccountId(row[0])
if checkchannel(channel):
if channel not in win_hash:
win_hash[channel] = []
_refreshData(row, win_hash[channel], take_game_times)
#更新积分榜
channel = f7.getChannelByAccountId(row[0])
if checkchannel(channel):
if channel not in integral_hash:
integral_hash[channel] = []
_refreshData(row, integral_hash[channel], take_integral_times)
time.sleep(0.001);
if not has_data:
break
r = _getRedis()
for channel in kill_hash:
_updateKillRank(r, channel, kill_hash[channel])
for channel in win_hash:
_updateWinRank(r, channel, win_hash[channel])
for channel in integral_hash:
_updateScoreRank(r, channel, integral_hash[channel])
f7.udplog.info('fullUpdateRank end')
#每日定时读取mysql里的数据生成排行榜写入redis后php读取redis返回客户端显示
def _fullUpdateRank(rushtime):
def done_callback():
f7.timer.callAt(q7.getDaySeconds(time.time(), 1) + rushtime,
lambda : _fullUpdateRank(rushtime))
f7.app.createAsyncTask(done_callback, fullUpdateRank, ())
#每5分钟读取mysql里发生改变过的数据更新排行榜
def incrementUpdateRank():
mysql_conf = json.loads(open(CONFIG_DIR + '/rankserver.mysql.cluster.json', 'r').read())
r = _getRedis()
kill_hash = {}
win_hash = {}
integral_hash = {}
for conf in mysql_conf:
conn = pymysql.connect(host = conf['host'],
port = conf['port'],
user = conf['user'],
passwd = conf['passwd'],
db = DBNAME_PREFIX + str(conf['instance_id']),
charset = 'utf8'
)
cursor = conn.cursor()
last_idx = 0
temp_idx = 0
while 1:
cursor.execute('SELECT accountid, user_name, avatar_url, kills, alive_time,'
' harm, win_times, game_times, idx, integral, season_time, kill_modifytime FROM user '
' WHERE kill_modifytime > %s AND idx > %s LIMIT 0, 1000' % (time.time() - 300, last_idx))
has_data = False
for row in cursor:
has_data = True
channel = f7.getChannelByAccountId(row[0])
last_idx = max(row[8], last_idx)
user_name = ''
if row[1]:
user_name = str(row[1], encoding='utf8')
if IGNORE_GUEST_ACCOUNT and (user_name == '游客' or user_name == '极乐玩家'):
continue
if checkchannel(channel):
#更新击杀榜
if channel not in kill_hash:
kill_list = r.get(RANK_KEY_PREFIX + 'kill_rank_' + channel)
kill_hash[channel] = [] if not kill_list else json.loads(kill_list)
_delRepeatData(row, kill_hash[channel])
_refreshData(row, kill_hash[channel], take_kills)
#更新胜场榜
if channel not in win_hash:
win_list = r.get(RANK_KEY_PREFIX + 'win_rank_' + channel)
win_hash[channel] = [] if not win_list else json.loads(win_list)
_delRepeatData(row, win_hash[channel])
_refreshData(row, win_hash[channel], take_game_times)
#更新积分榜
if channel not in integral_hash:
integral_list = r.get(RANK_KEY_PREFIX + 'integral_rank_' + channel)
integral_hash[channel] = [] if not integral_list else json.loads(integral_list)
_delRepeatData(row, integral_hash[channel])
_refreshData(row, integral_hash[channel], take_integral_times)
time.sleep(0.001);
if not has_data:
break
for channel in kill_hash:
_updateKillRank(r, channel, kill_hash[channel])
for channel in win_hash:
_updateWinRank(r, channel, win_hash[channel])
for channel in integral_hash:
_updateScoreRank(r, channel, integral_hash[channel])
f7.udplog.info('incrementUpdateRank end')
#每5分钟读取mysql里发生改变过的数据更新排行榜
def _incrementUpdateRank(rushtime):
def done_callback():
f7.timer.callLater(rushtime,
lambda : _incrementUpdateRank(rushtime))
f7.app.createAsyncTask(done_callback, incrementUpdateRank, ())
conf = json.loads(open(CONFIG_DIR + '/rankserver.json', 'r').read())
RANK_KEY_PREFIX = conf.get('key_prefix', RANK_KEY_PREFIX)
DBNAME_PREFIX = conf.get('dbname_prefix', DBNAME_PREFIX)
IGNORE_GUEST_ACCOUNT = conf.get('ignore_guest_account', IGNORE_GUEST_ACCOUNT)
if __name__ == "__main__":
q7.xPrint('pid %d' % os.getpid())
f7.app.init('/data/logs/game2004_rankserver/logs')
f7.udplog.info('rankserver start pid:' + str(os.getpid()))
conf['rushtime'] = 300
f7.timer.callLater(conf['rushtime'],
lambda : _incrementUpdateRank(conf['rushtime']))
conf['day_rushtime'] = 5 * 3600
f7.timer.callAt(q7.getDaySeconds(time.time(), 1) + conf['day_rushtime'],
lambda : _fullUpdateRank(conf['day_rushtime']))
f7.app.listen(conf['listen_port'])
f7.app.start()