wjtx/tools/combine_db/script/combine_redis.py
2020-12-23 14:49:46 +08:00

275 lines
10 KiB
Python

# -*- coding: utf-8 -*-
#!/usr/bin/python
import os
import sys
import time
import json
import redis
import pymysql
import datetime
import pprint
import _utils
from string import Template
from enum import Enum, unique
kHtHashUnion = 'hash_union'
kHtZSetStarUnion = 'zset_star_union'
kHtZSetUnion = 'zset_union'
kHtSpec1ZSetUnion = 'spec1_zset_union'
kHtSpec1ZSetStarUnion = 'spec1_zset_star_union'
kHtKeyStarAdd = 'key_star_add'
kHtKeyAdd = 'key_add'
class Combine:
def __init__(self):
self._rule_conf = _utils.readRedisRule()
self._server_list_conf = _utils.readServerListConf()
self._global_redis_conf = _utils.getGlobalRedisConf()
self._main_serverid = self._server_list_conf[0]['serverid']
self._curr_serverid = 0
self._curr_rule = None;
self._hash_data = {}
self._single_value = 0
self._local_file_name = 'out/game_redis.script'
self._local_file = open(self._local_file_name, 'w')
self._global_file_name = 'out/global_redis.script'
self._global_file = open(self._global_file_name, 'w')
self._curr_file = None
def run(self):
self._convertRedis()
self._convertGlobalRedis()
self._local_file.close()
self._global_file.close()
def _convertRedis(self):
for rule in self._rule_conf:
print(rule)
if rule['redis_db'] != 'game':
continue
if rule['key_name'] == '':
continue
self._curr_deal_num = 0
self._curr_rule = rule
self._curr_file = self._local_file
self._beginRule()
for conf in self._server_list_conf:
conn = redis.Redis(host = conf['redis_host'],
port = conf['redis_port'],
#password = None if not conf['redis_passwd'] else conf['redis_passwd'],
db = conf.get('redis_db', '0'),
decode_responses = True
)
assert conn
self._curr_serverid = conf['serverid']
self._procOneRedis(conn)
self._endRule()
def _convertGlobalRedis(self):
for rule in self._rule_conf:
if rule['redis_db'] != 'global':
continue
if rule['key_name'] == '':
continue
self._curr_deal_num = 0
self._curr_rule = rule
self._curr_file = self._global_file
self._beginRule()
conn = redis.Redis(host = self._global_redis_conf['redis_host'],
port = self._global_redis_conf['redis_port'],
#password = self._global_redis_conf['redis_passwd'],
db = self._global_redis_conf.get('redis_db', '0'),
decode_responses = True
)
assert conn
for conf in self._server_list_conf:
self._curr_serverid = conf['serverid']
self._procOneRedis(conn)
self._endRule()
def _beginRule(self):
self._hash_data = {}
self._single_value = 0
if self._curr_rule['process_type'] == kHtZSetUnion:
pass
def _endRule(self):
if self._curr_rule['process_type'] == kHtHashUnion:
self._endHashUnion()
elif self._curr_rule['process_type'] == kHtZSetStarUnion:
self._endZSetStarUnion()
elif self._curr_rule['process_type'] == kHtZSetUnion:
self._endZSetUnion()
elif self._curr_rule['process_type'] == kHtSpec1ZSetUnion:
self._endSpec1ZSetUnion()
elif self._curr_rule['process_type'] == kHtSpec1ZSetStarUnion:
self._endSpec1ZSetStarUnion()
elif self._curr_rule['process_type'] == kHtKeyStarAdd:
self._endKeyStarAdd()
elif self._curr_rule['process_type'] == kHtKeyAdd:
self._endKeyAdd()
self._hash_data = {}
self._single_value = 0
print('%-18s %-8s %s' % (
self._curr_rule['process_type'],
self._curr_deal_num,
self._curr_rule['key_name']
))
def _procOneRedis(self, conn):
if self._curr_rule['process_type'] == kHtHashUnion:
self._procHashUnion(conn)
elif self._curr_rule['process_type'] == kHtZSetStarUnion:
self._procZSetStarUnion(conn)
elif self._curr_rule['process_type'] == kHtZSetUnion:
self._procZSetUnion(conn)
elif self._curr_rule['process_type'] == kHtSpec1ZSetUnion:
self._procSpec1ZSetUnion(conn)
elif self._curr_rule['process_type'] == kHtSpec1ZSetStarUnion:
self._procSpec1ZSetStarUnion(conn)
elif self._curr_rule['process_type'] == kHtKeyStarAdd:
self._procKeyStarAdd(conn)
elif self._curr_rule['process_type'] == kHtKeyAdd:
self._procKeyAdd(conn)
else:
assert self._curr_rule['process_type'].strip() == ''
def _scanRedisKey(self, conn, pattern):
result = {}
cursor, keys = conn.scan(0, pattern, 1000)
while cursor != 0 or len(keys) > 0:
for key in keys:
if key in result:
result[key] += result[key]
else:
result[key] = 0
keys = []
if cursor != 0:
cursor, keys = conn.scan(cursor, pattern, 1000)
return result
def _procHashUnion(self, conn):
rows = conn.hgetall(self._curr_rule['key_name'])
for key in rows.keys():
val = rows[key]
self._hash_data[key] = val
def _procZSetStarUnion(self, conn):
keys = self._scanRedisKey(conn, self._curr_rule['key_name'] + '*')
for key in keys:
if key not in self._hash_data:
self._hash_data[key] = {}
node = self._hash_data[key]
cursor, rows = conn.zscan(key, 0, '*', 1000)
print(rows)
while cursor != 0 or len(rows) > 0:
for row in rows:
node[row[0]] = row[1]
rows = []
if cursor != 0:
cursor, rows = conn.zscan(key, cursor, '*', 1000)
def _procZSetUnion(self, conn):
cursor, rows = conn.zscan(self._curr_rule['key_name'], 0, '*', 1000)
print(self._curr_rule['key_name'], rows, conn)
while cursor != 0 or len(rows) > 0:
for row in rows:
self._hash_data[row[0]] = row[1]
rows = []
if cursor != 0:
cursor, rows = conn.zscan(self._curr_rule['key_name'], cursor, '*', 1000)
def _procSpec1ZSetUnion(self, conn):
cursor, rows = conn.zscan(self._curr_rule['key_name'] + str(self._curr_serverid), 0, '*', 1000)
while cursor != 0 or len(rows) > 0:
for row in rows:
self._hash_data[row[0]] = row[1]
rows = []
if cursor != 0:
cursor, rows = conn.zscan(self._curr_rule['key_name'] + str(self._curr_serverid), cursor, '*', 1000)
def _procSpec1ZSetStarUnion(self, conn):
keys = self._scanRedisKey(conn, self._curr_rule['key_name'] + '*_' + str(self._curr_serverid))
for key in keys:
if key not in self._hash_data:
self._hash_data[key] = {}
node = self._hash_data[key]
cursor, rows = conn.zscan(key, 0, '*', 1000)
while cursor != 0 or len(rows) > 0:
for row in rows:
node[row[0]] = row[1]
rows = []
if cursor != 0:
cursor, rows = conn.zscan(key, cursor, '*', 1000)
def _procKeyStarAdd(self, conn):
assert self._curr_rule['key_name'] == 'giftbag_'
cursor, keys = conn.scan(0, self._curr_rule['key_name'] + '*', 1000)
got_gifts = {}
while cursor != 0 or len(keys) > 0:
for key in keys:
num = conn.get(key)
if num != None and (key not in got_gifts):
got_gifts[key] = True
self._hash_data[key] = self._hash_data.get(key, 0) + num
keys = []
if cursor != 0:
cursor, keys = conn.scan(cursor, self._curr_rule['key_name'] + '*', 1000)
def _procKeyAdd(self, conn):
assert self._curr_rule['key_name'] == 'group_purchase_server'
num = conn.get(self._curr_rule['key_name'])
self._single_value += 0 if num == None else int(num)
def _endHashUnion(self):
for key in self._hash_data:
self._writeCmdToFile(['hset', self._curr_rule['key_name'], key, self._hash_data[key]])
def _endZSetStarUnion(self):
for key1 in self._hash_data:
node = self._hash_data[key1]
for key in node:
self._writeCmdToFile(['zadd', key1, node[key], key])
def _endZSetUnion(self):
for key in self._hash_data:
self._writeCmdToFile(['zadd', self._curr_rule['key_name'], self._hash_data[key], key])
def _endSpec1ZSetUnion(self):
for key in self._hash_data:
self._writeCmdToFile(['zadd',
self._curr_rule['key_name'] + str(self._main_serverid),
self._hash_data[key], key])
def _endSpec1ZSetStarUnion(self):
#key1 sqhw_score_${curr_season_id}_${world_id}
#为了实现合并到主服 需要把world_id替换为main_serverid
def fetchServerId(key):
return key.split('_')[-1]
for key1 in self._hash_data:
node = self._hash_data[key1]
new_key1 = key1 + '$'
server_id = fetchServerId(key1)
new_key1 = new_key1.replace('_' + str(server_id) + '$',
'_' + str(self._main_serverid))
assert new_key1 != key1 + '$'
for key in node:
self._writeCmdToFile(['zadd', new_key1, node[key], key])
def _endKeyStarAdd(self):
for key in self._hash_data:
self._writeCmdToFile(['set', key, self._hash_data[key]])
def _endKeyAdd(self):
self._writeCmdToFile(['set', self._curr_rule['key_name'], self._single_value])
def _writeCmdToFile(self, cmdlist):
assert isinstance(cmdlist, list)
self._curr_deal_num += 1
self._curr_file.write(json.dumps(cmdlist) + '\n')
combine = Combine()
combine.run()