diff --git a/config/config.py b/config/config.py index 2c72808..75dce58 100644 --- a/config/config.py +++ b/config/config.py @@ -24,5 +24,12 @@ class Settings(BaseSettings): root_path_in_servers: Optional[bool] = False root_path: str = '/api/v1' + origins: list = [ + "http://*.kingsome.cn", + "https://*.kingsome.cn", + "http://localhost", + "http://localhost:8080", + ] + settings = Settings() diff --git a/jobs.sqlite b/jobs.sqlite index e539271..b369870 100644 Binary files a/jobs.sqlite and b/jobs.sqlite differ diff --git a/jobs/jobs.py b/jobs/jobs.py index 8e193be..90f4b4b 100644 --- a/jobs/jobs.py +++ b/jobs/jobs.py @@ -38,3 +38,7 @@ def job_execute(event): def aps_test1(x): print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), json.dumps(x)) logger.info("run appt_test1,msg={}".format(json.dumps(x))) + + +def app_test01(msg: str = "hi"): + logger.info(f"run with {msg}") diff --git a/jobs/route.py b/jobs/route.py index 2d28478..638dc22 100644 --- a/jobs/route.py +++ b/jobs/route.py @@ -24,6 +24,10 @@ def cron_task(a1: str) -> None: print(a1, time.strftime("'%Y-%m-%d %H:%M:%S'")) +def app_test01(msg: str = "hi"): + logger.info(f"run with {msg}") + + @router.get("/all", tags=["schedule"], summary="获取所有job信息") async def get_scheduled_syncs(): """ @@ -55,16 +59,15 @@ async def get_target_sync( # interval 固定间隔时间调度 @router.post("/interval/schedule/", tags=["schedule"], summary="开启定时:间隔时间循环") -async def add_interval_job( - seconds: int = Body(120, title="循环间隔时间/秒,默认120s", embed=True), - job_id: str = Body(..., title="任务id", embed=True), - run_time: int = Body(time.time(), title="第一次运行时间", - description="默认立即执行", embed=True) -): +async def add_interval_job(seconds: int = Body(120, title="循环间隔时间/秒,默认120s", embed=True), + job_id: str = Body(..., title="任务id", embed=True), + run_time: int = Body(time.time(), title="第一次运行时间", + description="默认立即执行", embed=True) + ): res = Schedule.get_job(job_id=job_id) if res: return resp_fail(msg=f"{job_id} job already exists") - schedule_job = Schedule.add_job(cron_task, + schedule_job = Schedule.add_job(app_test01, 'interval', args=(job_id,), seconds=seconds, # 循环间隔时间 秒 diff --git a/main.py b/main.py index d80497d..c5d90dc 100644 --- a/main.py +++ b/main.py @@ -4,13 +4,16 @@ from fastapi import Depends, FastAPI, BackgroundTasks, Request from dependencies import get_token_header from ops.common import route as common_route from ops.deploy import route as deploy_route +from ops.wjtx import route as wjtx_route from jobs import route as jobs_route from typing import Optional from scripts.common.redis import get_redis_pool import pdb from scripts.logger import logger -from apscheduler.events import EVENT_JOB_EXECUTED -from jobs.jobs import Schedule, job_execute +from fastapi.middleware.cors import CORSMiddleware + +# from apscheduler.events import EVENT_JOB_EXECUTED +# from jobs.jobs import Schedule, job_execute tags_metadata = [ # { # "name": "common", @@ -32,18 +35,26 @@ def create_app(): openapi_tags=tags_metadata) application.include_router(common_route.router, prefix="/common") application.include_router(deploy_route.router, prefix="/deploy") - application.include_router(jobs_route.router, prefix="/jobs") + application.include_router(wjtx_route.router, prefix="/wjtx") + # application.include_router(jobs_route.router, prefix="/jobs") return application app = create_app() +app.add_middleware( + CORSMiddleware, + allow_origins=settings.origins, + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) @app.on_event("startup") async def startup_event(): app.state.redis = await get_redis_pool() - Schedule.start() - Schedule.add_listener(job_execute, EVENT_JOB_EXECUTED) + # Schedule.start() + # Schedule.add_listener(job_execute, EVENT_JOB_EXECUTED) @app.on_event("shutdown") diff --git a/ops/wjtx/route.py b/ops/wjtx/route.py new file mode 100644 index 0000000..5a4e356 --- /dev/null +++ b/ops/wjtx/route.py @@ -0,0 +1,21 @@ +from fastapi import APIRouter, BackgroundTasks, UploadFile, File, Form +from starlette.requests import Request +from starlette.responses import JSONResponse +from config.config import settings +from scripts.wjtx.tools import WjtxTools +from uuid import uuid4 +router = APIRouter() + + +@router.post("/change_starttime", tags=["wjtx"], summary="修改服务器启动时间") +async def change_starttime(request: Request, serverid: int, starttime: str) -> JSONResponse: + wj = WjtxTools() + await wj.change_start_time(serverid, starttime) + return JSONResponse(status_code=200, content={"message": "restart server {0},starttime={1}!".format(serverid, starttime)}) + + +@router.post("/change_worldboss", tags=["wjtx"], summary="修改世界boss血量配置") +async def change_worldboss(request: Request, serverid: int, values: int) -> JSONResponse: + wj = WjtxTools() + await wj.change_worldboss(serverid, values) + return JSONResponse(status_code=200, content={"message": "change {0} worldboss to {1}!".format(serverid, values)}) diff --git a/ops/wjtx/wjtx.py b/ops/wjtx/wjtx.py deleted file mode 100644 index ae52407..0000000 --- a/ops/wjtx/wjtx.py +++ /dev/null @@ -1,30 +0,0 @@ -from fastapi import APIRouter, Depends, HTTPException -from ...dependencies import get_token_header - -router = APIRouter( - prefix="/wjtx", - tags=["wjtx"], - dependencies=[Depends(get_token_header)], - responses={404: {"description": "Not found"}}, - -) - - -@router.get("/") -async def read_items(): - return {"msg": "1"} - - -# @router.get("/{itemid}") -# async def read_item(item_id: str): -# if item_id not in fake_item_db: -# raise HTTPException(status_code=404, detail="Item not found") -# return {"name": fake_item_db[item_id]["name"], "item_id": item_id} - - -# @router.put("/{item_id}", tags=["custom"], responses={403: {"description": "Operation forbidden"}}) -# async def update_item(item_id: str): -# if item_id != "pulmbus": -# raise HTTPException( -# status_code=403, detail="You can only update the item plumbus") -# return {"item_id": item_id, "name": "The great plumbus"} diff --git a/scripts/common/deploy.py b/scripts/common/deploy.py index cf49b19..06d8196 100644 --- a/scripts/common/deploy.py +++ b/scripts/common/deploy.py @@ -8,7 +8,9 @@ from fastapi import HTTPException, Request from . import run_cmd from .ansible import AnsibleAPI, write_host from scripts.logger import logger -from loguru import logger as base_logger, os, time +from loguru import logger as base_logger +import os +import time class ProjectInfo(BaseModel): diff --git a/scripts/common/mymysql.py b/scripts/common/mymysql.py new file mode 100644 index 0000000..efda7a5 --- /dev/null +++ b/scripts/common/mymysql.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python +# -*- coding:utf-8 -*- + +""" +date: 2016/07/11 +role: mysql的增删改查类 +usage: m = mysqlBase(host='xxx',db='xxx',user='xxx',pwd='xxx') 实例化 + m.insert('core',{'host_name':'ccc','process_name':'ddd','ip_addr':'192.168.136.41','status':'4'}) + m.update('table',{'field1':'value1','field2':'value2'},'id=1') 更新表名 字段名:值 条件 + m.delete('core','status=5 and id=12') + m.change("update core set a='aaa' where id=1") 可以多条插入 + m.query("select * from core") +""" +from scripts.logger import logger as log +import warnings + + +import pymysql as MySQLdb + + +# mysql操作类 +class MysqlBase: + + # 连接数据库 + def __init__(self, **args): + + # 获取参数 + self.host = args.get('host', 'localhost') + self.user = args.get('user') + self.pswd = args.get('pswd') + self.db = args.get('db', 'mysql') + self.port = args.get('port', '3306') + self.charset = args.get('charset', 'utf8') + + try: + self.conn = MySQLdb.connect(host=self.host, user=self.user, passwd=self.pswd, db=self.db, + port=int(self.port), charset=self.charset) + + self.curs = self.conn.cursor() + + self.curs.execute('SET NAMES utf8') + except: + log.error('%s mysql connect error' % self.host) + raise ValueError('mysql connect error %s' % self.host) + + # 释放资源 + def __del__(self): + self.curs.close() + self.conn.close() + + # 插入 + + def insert(self, table, data): + _field = ','.join(['`%s`' % (k_insert) for k_insert in data.keys()]) + _value = ','.join(["'%s'" % (str(v_insert).replace("'", "\'")) + for v_insert in data.values()]) + # 拼接成sql语句 + _sql = 'INSERT INTO `%s`(%s) VALUES(%s)' % (table, _field, _value) + + # 执行 + self.curs.lastrowid = 0 + try: + self.curs.execute(_sql) + # 提交 + self.conn.commit() + # log.info('%s insert ' % _sql) + + except: + self.conn.rollback() + log.error('%s insert error' % _sql) + raise ValueError('112,insert error %s' % _sql) + + return self.curs.lastrowid + + # 更新 + def update(self, table, data, condition): + _field = ','.join(["`%s`='%s'" % (k_update, str( + data[k_update]).replace("'", "\'")) for k_update in data]) + + _sql = 'UPDATE `%s` SET %s WHERE %s' % (table, _field, condition) + # 执行 + resNum = 0 + try: + resNum = self.curs.execute(_sql) + # 提交 + self.conn.commit() + # log.info('%s update ' % _sql) + except Exception: + log.error("run {_sql} failed", exc_info=True) + self.conn.rollback() + # log.error('%s update error' % _sql) + raise ValueError('update error %s' % _sql) + + return resNum + + # 删除 + def delete(self, table, condition): + _sql = 'DELETE FROM `%s` WHERE %s' % (table, condition) + + # 执行 + resNum = 0 + try: + resNum = self.curs.execute(_sql) + # 提交 + self.conn.commit() + # log.info('%s delete ' % _sql) + + except: + self.conn.rollback() + log.error('%s delete error' % _sql) + raise ValueError('112,delete error %s' % _sql) + + return resNum + + # 直接给修改语句执行 + def change(self, sql, many=False): + # 过滤unknow table的warning + warnings.filterwarnings('ignore') + resNum = 0 + if many: + try: + # 多条同时插入 + resNum = self.curs.executemany(sql, many) + self.conn.commit() + # log.info('%s exec ' % sql) + + except: + self.conn.rollback() + log.error('%s exec error' % sql) + raise ValueError('exec error %s' % sql) + else: + try: + resNum = self.curs.execute(sql) + # 提交 + self.conn.commit() + # log.info('%s exec ' % sql) + + except: + self.conn.rollback() + log.error('%s exec error' % sql) + raise ValueError('112,exec error %s' % sql) + + return resNum + + # 查询 + def query(self, sql): + res = '' + try: + self.curs.execute(sql) + res = self.curs.fetchall() + # log.info('%s query ' % sql) + + except: + log.error('%s query error' % sql) + + # raise ValueError('query error %s'% sql) + + return res + + +if __name__ == "__main__": + args = dict() + args['host'] = '172.16.17.164' + args['user'] = 'miles' + args['pswd'] = 'aspect' + args['db'] = 'test' + sql_sel = "select * from bigdata_host limit 5" + m = MysqlBase(**args) + data = m.query(sql=sql_sel) + m.insert('bigdata_host', {'hostname': 'ccc', 'remark': 'ddd', 'up_addr_p': '192.168.136.41', 'states': '4', + 'enter_time': '2017-03-13'}) + m.delete('bigdata_host', 'hostname="ccc"') diff --git a/scripts/wjtx/__init__.py b/scripts/wjtx/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scripts/wjtx/tools.py b/scripts/wjtx/tools.py new file mode 100644 index 0000000..ea663ce --- /dev/null +++ b/scripts/wjtx/tools.py @@ -0,0 +1,130 @@ +import time +from scripts.common.ansible import AnsiInterface, write_host +import redis +from ops.mmysql import MysqlBase +import json +from scripts.logger import logger as log + +config = { + "global_redis_conf": { + "host": "10.10.6.4", + "port": "6379" + }, + "game_conf": { + "host": "10.10.6.2", + "port": 3306, + "user": "wjtx", + "pswd": "mMWA4DKCfeOL" + } +} + + +class WjtxTools: + def __init__(self) -> None: + self.mysql_conf = config['game_conf'] + self.redis_conf = config['global_redis_conf'] + + async def change_start_time(self, serverid: int, start_time: str) -> bool: + self.serverid = serverid + self.mysql_conf['db'] = f"legend_{self.serverid}" + if self.check_serverid(): + if self._change_db(start_time): + if self._restart_service(): + return True + else: + return False + else: + return False + else: + return False + + def _reset_worldboss(self, values): + legend_db = MysqlBase(**self.mysql_conf) + tablename = "world_boss" + u_data = {'worldboss_level': values} + try: + legend_db.update(tablename, u_data, True) + log.info(f"change world_boss to {values} success!") + return True + except Exception as e: + log.error(e) + return False + + # def _restart_stage(self): + # global_redis = redis.Redis(host=self.redis_conf['host'], port=self.redis_conf['port'], charset='utf8', + # db=self.redis_conf.get('db') or 0) + # host_info = json.loads(global_redis.hget('game_info', self.serverid)) + # hostip = host_info.get('innerIp', None) + # if not hostip: + # log.error(f"get host with {self.serverid} failed ,{host_info}") + # return False + + # restart_cmd = f"su - kingsome -c 'cd /data/apps/game1009stage && sh restart.sh {self.serverid}'" + # hostfilename = write_host(hostip) + # time.sleep(1) + # myansible = AnsiInterface(hostfilename) + # resule = myansible.exec_command(restart_cmd) + # log.debug(resule) + # if not (resule['failed'] or resule['unreachable']): + # log.info(f"restart {self.serverid} success!") + # return True + # else: + # log.error(f"restart {self.serverid} failed,output was {resule}") + # return False + + def _change_db(self, opentime): + legend_db = MysqlBase(**self.mysql_conf) + # update server_info set start_time='2020-03-16 11:00:00'; + tablename = "server_info" + u_data = {'start_time': opentime} + try: + legend_db.update(tablename, u_data, True) + log.info("change db success!") + return True + except Exception as e: + log.error(e) + return False + + def _restart_service(self): + global_redis = redis.Redis(host=self.redis_conf['host'], port=self.redis_conf['port'], charset='utf8', + db=self.redis_conf.get('db') or 0) + host_info = json.loads(global_redis.hget('game_info', self.serverid)) + hostip = host_info.get('innerIp', None) + if not hostip: + log.error(f"get host info with {self.serverid} failed!") + if int(self.serverid) < 20000: + restart_cmd = f"su - kingsome -c 'cd /data/apps/game1009game && sh restart.sh {self.serverid}'" + else: + restart_cmd = f"su - kingsome -c 'cd /data/apps/game1012game && sh restart.sh {self.serverid}'" + hostfilename = write_host(hostip) + time.sleep(1) + myansible = AnsiInterface(hostfilename) + resule = myansible.exec_command(restart_cmd) + log.debug(resule) + if not (resule['failed'] or resule['unreachable']): + log.info(f"restart {self.serverid} success!") + return True + else: + log.error(f"restart {self.serverid} failed,output was {resule}") + return False + + def check_serverid(self): + global_redis = redis.Redis(host=self.redis_conf['host'], port=self.redis_conf['port'], charset='utf8', + db=self.redis_conf.get('db') or 0) + server_state = global_redis.hget('server_state', self.serverid) + if int(server_state) != 12: + return False + else: + log.info(f"{self.serverid} check ok!") + return True + + async def change_worldboss(self, serverid: int, values: int) -> bool: + self.serverid = serverid + self.mysql_conf['db'] = f"legend_{self.serverid}" + if self._reset_worldboss(values): + if self._restart_stage(): + return True + else: + return False + else: + return False