wjtx/tools/combine_db/script/combine_workflow.py
2021-01-14 17:35:06 +08:00

290 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# python combine_workflow.py combine -f ../tools_conf/server_list_1.json
import click
import fabric
import time
import json
import redis
import os
import datetime
import pymysql
#import pdb
ops_db = {"db_host": "10.10.3.5", "db_port": 3306, "db_user": "root", "db_passwd": "0usmUwROtWjf"}
@click.group()
def main():
pass
@main.command()
@click.option('-f', '--filename', type=str, help='config filename with json format', required=True)
def before(filename):
wjtx = WjtxCombine()
wjtx.pre_workflow(filename)
@main.command()
def done():
wjtx = WjtxCombine()
wjtx.combine_workflow()
@main.command()
def finish():
wjtx = WjtxCombine()
wjtx.after_workflow()
WjtxCombine.monitor_autoload()
class WjtxCombine:
def __init__(self):
self.config = "db_conf.json"
self.game_config = 'server_list.json'
self._get_global_config()
def _get_global_config(self):
global_config = json.loads(open('../tools_conf/' + self.config, 'r').read())
self.redis_config = global_config.get('global_redis')
self.mysql_config = global_config.get('mysql')
self.global_redis_conn = redis.Redis(host=self.redis_config['redis_host'], port=self.redis_config['redis_port'],
charset='utf8', db=self.redis_config.get('db', 0), decode_responses=True)
assert self.redis_config and self.mysql_config
def _get_game_config(self):
with open('../tools_conf/' + self.game_config, 'r') as f:
self.server_config = json.loads(f.read())
self.serverids = []
self.primary_redis_ip = self.server_config[0].get('redis_host')
self.primary_redis_port = self.server_config[0].get('redis_port')
self.primary_serverid = f"{self.server_config[0].get('serverid')}"
assert self.primary_serverid and self.primary_redis_port and self.primary_redis_ip
for recode in self.server_config:
self.serverids.append(recode.get('serverid'))
assert self.serverids
def after_workflow(self):
pass
def pre_workflow(self, filename):
# change server_list_1.json to server_list.json
change_config_cmd = f"echo 'y'|cp {filename} server_list.json -f"
assert os.system(change_config_cmd) == 0
# init server_json.json config
assert self._get_game_config()
assert self._check_config()
assert self._get_all_serverids()
# change service status to maintain
assert self._change_service_status()
assert self._change_ops_status()
# kill service which in server_json.json
assert self._kill_service()
time.sleep(10)
assert self._back_mysqldb()
assert self._back_game_redis()
return True
def _change_service_status(self, is_add=True):
serverids = ""
for i in self.all_serverids:
serverids = serverids + str(i) + ','
serverids = serverids[:-1]
if is_add:
cmd = f"python wjtx.py change_server_status -m maintain -o add -s {serverids}"
else:
cmd = f"python wjtx.py change_server_status -m maintain -o remove -s {serverids}"
print(f"change server status with {cmd}")
if os.system(cmd) == 0:
return True
else:
return False
def _kill_service(self):
# step kill , check
for item in self.server_config:
serverid = item['serverid']
ip = item['redis_host']
get_pid = f"ps -ef|grep java|grep {serverid}|cut -c 9-15"
c = fabric.Connection(ip)
result = c.run(get_pid).stdout
assert result
run_pids = " ".join(result.split('\n').strip())
kill_cmd = f"kill -9 {run_pids}"
print(kill_cmd)
c.run(kill_cmd)
time.sleep(20)
check = c.run(get_pid).stdout
if check:
raise Exception(f"kill service {ip} Failed!\t{check}")
return True
def _back_global_redis(self):
status = True
now = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
redis_ip = self.redis_config['redis_host']
redis_port = self.redis_config['redis_port']
serverid = 'global'
back_redis_cmd = f"redis-cli -h {redis_ip} -p {redis_port} --rdb back/{serverid}_{now}.rdb "
if os.system(back_redis_cmd) != 0:
status = False
return status
def _back_game_redis(self):
status = True
for item in self.server_config:
redis_ip = item['redis_host']
redis_port = item['redis_port']
serverid = item['serverid']
now = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
back_redis_cmd = f"redis-cli -h {redis_ip} -p {redis_port} --rdb back/{serverid}_{now}.rdb "
if os.system(back_redis_cmd) != 0:
status = False
return status
def _back_mysqldb(self):
status = True
for serverid in self.serverids:
now = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
cmd = f"mysqldump --default-character-set=utf8 -uwjtx -pmMWA4DKCfeOL -h10.10.6.2 legend_{serverid} >back/legend_{serverid}_{now}.sql"
if os.system(cmd) != 0:
status = False
return status
def _check_config(self):
server_lists = {}
with open('../tools_conf/1009.csv', 'r') as f:
data = f.read()
for line in data.split('\n'):
if line:
server_lists[line.split()[0]] = {'host': line.split()[1], 'port': line.split()[2]}
check = True
for item in self.server_config:
serverid = str(item.get('serverid'))
redis_host = item.get('redis_host')
redis_port = item.get('redis_port')
if str(redis_port) == str(server_lists[serverid].get('port')) and str(redis_host) == str(
server_lists[serverid].get('host')):
pass
else:
check = False
return check
def _check_combine_status(self):
status = True
for serverid in self.serverids:
server_info = json.loads(self.global_redis_conn.hget('game_info', serverid))
game_port = server_info.get('tcpPort')
if int(game_port) != 9000 + int(serverid) % 100:
status = False
return status
def _get_all_serverids(self):
all_serverids = []
for serverid in self.serverids:
hefu_info = self.global_redis_conn.hget('hefu_info', serverid)
if hefu_info:
self.all_serverids.append(serverid)
for item in hefu_info:
all_serverids.append(item)
self.all_serverids = sorted(all_serverids)
return True
def _change_ops_status(self):
ips = []
for item in self.server_config:
ips.append(str(item['redis_host']))
ip = json.dumps(ips).replace('[', '(').replace(']', ')')
db = pymysql.connect(ops_db['db_host'], ops_db['db_user'], ops_db['db_passwd'], 'ywplatform')
cursor = db.cursor()
remove_monitor = f"delete from ops_servicemonitor where up_addr in {ip}"
try:
cursor.execute(remove_monitor)
db.commit()
except Exception as e:
print(f"run {remove_monitor} error,{e}")
db.rollback()
db.close()
# todo remark combine service ,table dismiss row
return True
def _clear_dest_redis(self):
clear_redis = f"cat clear_redis.txt|redis-cli -h {self.primary_redis_ip} -p {self.primary_redis_port}"
return os.system(clear_redis) == 0
def _check_mysql_combine(self):
# wait for user double check
db = pymysql.connect(self.mysql_config['db_host'], self.mysql_config['db_user'], self.mysql_config['db_passwd'],
'legend_' + self.primary_serverid)
user_group_num = f"select count(1),server_id from user group by server_id;"
cursor = db.cursor(cursor=pymysql.cursors.DictCursor)
try:
cursor.execute(user_group_num)
row_data = cursor.fetchone()
print(f"check combine user\n {row_data}")
db.commit()
except Exception as e:
print(f"run {user_group_num} error,{e}")
db.close()
assert self._double_check()
return True
def _double_check(self):
double_check = input("请输入Y/N")
return double_check.lower() == 'y'
def combine_workflow(self):
if os.listdir('out'):
now = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
mv_old_files = f"cd out && mkdir -p {now} && mv * {now}"
assert os.system(mv_old_files) == 0
combine_mysql = f"python combine_db.py"
assert os.system(combine_mysql) == 0
combine_redis = f"python combine_redis.py"
assert os.system(combine_redis) == 0
print(f"ready remove legend_{self.primary_serverid}!")
assert self._double_check()
drop_mysql = f"mysql -uroot -p0usmUwROtWjf -h10.10.6.2 -e'drop database legend_{self.primary_serverid};drop database logger_{self.primary_serverid}'"
assert os.system(drop_mysql) == 0
print(f"drop db with {drop_mysql}")
create_db = f"python wjtx.py init_wjtx_mysql -s {self.primary_serverid}"
assert os.system(create_db) == 0
print(f"create db with {create_db}")
restore_db = f"mysql -uroot -p0usmUwROtWjf -h10.10.6.2 legend_{self.primary_serverid} -e'source out/all.sql'"
assert os.system(restore_db) == 0
print(f"restore db with {restore_db}")
self._check_mysql_combine()
assert self._clear_dest_redis()
restore_game_redis = f"python import_redis.py {self.primary_redis_ip} {self.primary_redis_port} out/game_redis.script"
assert os.system(restore_game_redis) == 0
assert self._back_global_redis()
restore_global_redis = f"python import_redis.py {self.redis_config['ip']} {self.redis_config['port']} out/global_redis.script"
assert os.system(restore_global_redis) == 0
print(f"run with {restore_game_redis}\n{restore_global_redis}")
return True
def _start_service(self):
for item in ('game1009stage', 'game1009game'):
c = fabric.Connection(self.primary_redis_port)
with c.cd('/data/apps' + item):
c.run(f"sh restart {self.primary_serverid}")
return True
@staticmethod
def monitor_autoload():
cmd = "python wjtx.py autoload"
if os.system(cmd) == 0:
return True
else:
return False
def after_combine(self):
self._start_service()
self._change_service_status(is_add=False)
return True
if __name__ == "__main__":
main()