This commit is contained in:
pengtao 2021-07-06 17:24:30 +08:00
parent 431fc5cd25
commit 9e4adf241f
11 changed files with 363 additions and 43 deletions

View File

@ -24,5 +24,12 @@ class Settings(BaseSettings):
root_path_in_servers: Optional[bool] = False
root_path: str = '/api/v1'
origins: list = [
"http://*.kingsome.cn",
"https://*.kingsome.cn",
"http://localhost",
"http://localhost:8080",
]
settings = Settings()

Binary file not shown.

View File

@ -38,3 +38,7 @@ def job_execute(event):
def aps_test1(x):
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), json.dumps(x))
logger.info("run appt_test1,msg={}".format(json.dumps(x)))
def app_test01(msg: str = "hi"):
logger.info(f"run with {msg}")

View File

@ -24,6 +24,10 @@ def cron_task(a1: str) -> None:
print(a1, time.strftime("'%Y-%m-%d %H:%M:%S'"))
def app_test01(msg: str = "hi"):
logger.info(f"run with {msg}")
@router.get("/all", tags=["schedule"], summary="获取所有job信息")
async def get_scheduled_syncs():
"""
@ -55,16 +59,15 @@ async def get_target_sync(
# interval 固定间隔时间调度
@router.post("/interval/schedule/", tags=["schedule"], summary="开启定时:间隔时间循环")
async def add_interval_job(
seconds: int = Body(120, title="循环间隔时间/秒,默认120s", embed=True),
job_id: str = Body(..., title="任务id", embed=True),
run_time: int = Body(time.time(), title="第一次运行时间",
description="默认立即执行", embed=True)
):
async def add_interval_job(seconds: int = Body(120, title="循环间隔时间/秒,默认120s", embed=True),
job_id: str = Body(..., title="任务id", embed=True),
run_time: int = Body(time.time(), title="第一次运行时间",
description="默认立即执行", embed=True)
):
res = Schedule.get_job(job_id=job_id)
if res:
return resp_fail(msg=f"{job_id} job already exists")
schedule_job = Schedule.add_job(cron_task,
schedule_job = Schedule.add_job(app_test01,
'interval',
args=(job_id,),
seconds=seconds, # 循环间隔时间 秒

21
main.py
View File

@ -4,13 +4,16 @@ from fastapi import Depends, FastAPI, BackgroundTasks, Request
from dependencies import get_token_header
from ops.common import route as common_route
from ops.deploy import route as deploy_route
from ops.wjtx import route as wjtx_route
from jobs import route as jobs_route
from typing import Optional
from scripts.common.redis import get_redis_pool
import pdb
from scripts.logger import logger
from apscheduler.events import EVENT_JOB_EXECUTED
from jobs.jobs import Schedule, job_execute
from fastapi.middleware.cors import CORSMiddleware
# from apscheduler.events import EVENT_JOB_EXECUTED
# from jobs.jobs import Schedule, job_execute
tags_metadata = [
# {
# "name": "common",
@ -32,18 +35,26 @@ def create_app():
openapi_tags=tags_metadata)
application.include_router(common_route.router, prefix="/common")
application.include_router(deploy_route.router, prefix="/deploy")
application.include_router(jobs_route.router, prefix="/jobs")
application.include_router(wjtx_route.router, prefix="/wjtx")
# application.include_router(jobs_route.router, prefix="/jobs")
return application
app = create_app()
app.add_middleware(
CORSMiddleware,
allow_origins=settings.origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.on_event("startup")
async def startup_event():
app.state.redis = await get_redis_pool()
Schedule.start()
Schedule.add_listener(job_execute, EVENT_JOB_EXECUTED)
# Schedule.start()
# Schedule.add_listener(job_execute, EVENT_JOB_EXECUTED)
@app.on_event("shutdown")

21
ops/wjtx/route.py Normal file
View File

@ -0,0 +1,21 @@
from fastapi import APIRouter, BackgroundTasks, UploadFile, File, Form
from starlette.requests import Request
from starlette.responses import JSONResponse
from config.config import settings
from scripts.wjtx.tools import WjtxTools
from uuid import uuid4
router = APIRouter()
@router.post("/change_starttime", tags=["wjtx"], summary="修改服务器启动时间")
async def change_starttime(request: Request, serverid: int, starttime: str) -> JSONResponse:
wj = WjtxTools()
await wj.change_start_time(serverid, starttime)
return JSONResponse(status_code=200, content={"message": "restart server {0},starttime={1}!".format(serverid, starttime)})
@router.post("/change_worldboss", tags=["wjtx"], summary="修改世界boss血量配置")
async def change_worldboss(request: Request, serverid: int, values: int) -> JSONResponse:
wj = WjtxTools()
await wj.change_worldboss(serverid, values)
return JSONResponse(status_code=200, content={"message": "change {0} worldboss to {1}!".format(serverid, values)})

View File

@ -1,30 +0,0 @@
from fastapi import APIRouter, Depends, HTTPException
from ...dependencies import get_token_header
router = APIRouter(
prefix="/wjtx",
tags=["wjtx"],
dependencies=[Depends(get_token_header)],
responses={404: {"description": "Not found"}},
)
@router.get("/")
async def read_items():
return {"msg": "1"}
# @router.get("/{itemid}")
# async def read_item(item_id: str):
# if item_id not in fake_item_db:
# raise HTTPException(status_code=404, detail="Item not found")
# return {"name": fake_item_db[item_id]["name"], "item_id": item_id}
# @router.put("/{item_id}", tags=["custom"], responses={403: {"description": "Operation forbidden"}})
# async def update_item(item_id: str):
# if item_id != "pulmbus":
# raise HTTPException(
# status_code=403, detail="You can only update the item plumbus")
# return {"item_id": item_id, "name": "The great plumbus"}

View File

@ -8,7 +8,9 @@ from fastapi import HTTPException, Request
from . import run_cmd
from .ansible import AnsibleAPI, write_host
from scripts.logger import logger
from loguru import logger as base_logger, os, time
from loguru import logger as base_logger
import os
import time
class ProjectInfo(BaseModel):

172
scripts/common/mymysql.py Normal file
View File

@ -0,0 +1,172 @@
#!/usr/bin/env python
# -*- coding:utf-8 -*-
"""
date: 2016/07/11
role: mysql的增删改查类
usage: m = mysqlBase(host='xxx',db='xxx',user='xxx',pwd='xxx') 实例化
m.insert('core',{'host_name':'ccc','process_name':'ddd','ip_addr':'192.168.136.41','status':'4'})
m.update('table',{'field1':'value1','field2':'value2'},'id=1') 更新表名 字段名 条件
m.delete('core','status=5 and id=12')
m.change("update core set a='aaa' where id=1") 可以多条插入
m.query("select * from core")
"""
from scripts.logger import logger as log
import warnings
import pymysql as MySQLdb
# mysql操作类
class MysqlBase:
# 连接数据库
def __init__(self, **args):
# 获取参数
self.host = args.get('host', 'localhost')
self.user = args.get('user')
self.pswd = args.get('pswd')
self.db = args.get('db', 'mysql')
self.port = args.get('port', '3306')
self.charset = args.get('charset', 'utf8')
try:
self.conn = MySQLdb.connect(host=self.host, user=self.user, passwd=self.pswd, db=self.db,
port=int(self.port), charset=self.charset)
self.curs = self.conn.cursor()
self.curs.execute('SET NAMES utf8')
except:
log.error('%s mysql connect error' % self.host)
raise ValueError('mysql connect error %s' % self.host)
# 释放资源
def __del__(self):
self.curs.close()
self.conn.close()
# 插入
def insert(self, table, data):
_field = ','.join(['`%s`' % (k_insert) for k_insert in data.keys()])
_value = ','.join(["'%s'" % (str(v_insert).replace("'", "\'"))
for v_insert in data.values()])
# 拼接成sql语句
_sql = 'INSERT INTO `%s`(%s) VALUES(%s)' % (table, _field, _value)
# 执行
self.curs.lastrowid = 0
try:
self.curs.execute(_sql)
# 提交
self.conn.commit()
# log.info('%s insert ' % _sql)
except:
self.conn.rollback()
log.error('%s insert error' % _sql)
raise ValueError('112,insert error %s' % _sql)
return self.curs.lastrowid
# 更新
def update(self, table, data, condition):
_field = ','.join(["`%s`='%s'" % (k_update, str(
data[k_update]).replace("'", "\'")) for k_update in data])
_sql = 'UPDATE `%s` SET %s WHERE %s' % (table, _field, condition)
# 执行
resNum = 0
try:
resNum = self.curs.execute(_sql)
# 提交
self.conn.commit()
# log.info('%s update ' % _sql)
except Exception:
log.error("run {_sql} failed", exc_info=True)
self.conn.rollback()
# log.error('%s update error' % _sql)
raise ValueError('update error %s' % _sql)
return resNum
# 删除
def delete(self, table, condition):
_sql = 'DELETE FROM `%s` WHERE %s' % (table, condition)
# 执行
resNum = 0
try:
resNum = self.curs.execute(_sql)
# 提交
self.conn.commit()
# log.info('%s delete ' % _sql)
except:
self.conn.rollback()
log.error('%s delete error' % _sql)
raise ValueError('112,delete error %s' % _sql)
return resNum
# 直接给修改语句执行
def change(self, sql, many=False):
# 过滤unknow table的warning
warnings.filterwarnings('ignore')
resNum = 0
if many:
try:
# 多条同时插入
resNum = self.curs.executemany(sql, many)
self.conn.commit()
# log.info('%s exec ' % sql)
except:
self.conn.rollback()
log.error('%s exec error' % sql)
raise ValueError('exec error %s' % sql)
else:
try:
resNum = self.curs.execute(sql)
# 提交
self.conn.commit()
# log.info('%s exec ' % sql)
except:
self.conn.rollback()
log.error('%s exec error' % sql)
raise ValueError('112,exec error %s' % sql)
return resNum
# 查询
def query(self, sql):
res = ''
try:
self.curs.execute(sql)
res = self.curs.fetchall()
# log.info('%s query ' % sql)
except:
log.error('%s query error' % sql)
# raise ValueError('query error %s'% sql)
return res
if __name__ == "__main__":
args = dict()
args['host'] = '172.16.17.164'
args['user'] = 'miles'
args['pswd'] = 'aspect'
args['db'] = 'test'
sql_sel = "select * from bigdata_host limit 5"
m = MysqlBase(**args)
data = m.query(sql=sql_sel)
m.insert('bigdata_host', {'hostname': 'ccc', 'remark': 'ddd', 'up_addr_p': '192.168.136.41', 'states': '4',
'enter_time': '2017-03-13'})
m.delete('bigdata_host', 'hostname="ccc"')

0
scripts/wjtx/__init__.py Normal file
View File

130
scripts/wjtx/tools.py Normal file
View File

@ -0,0 +1,130 @@
import time
from scripts.common.ansible import AnsiInterface, write_host
import redis
from ops.mmysql import MysqlBase
import json
from scripts.logger import logger as log
config = {
"global_redis_conf": {
"host": "10.10.6.4",
"port": "6379"
},
"game_conf": {
"host": "10.10.6.2",
"port": 3306,
"user": "wjtx",
"pswd": "mMWA4DKCfeOL"
}
}
class WjtxTools:
def __init__(self) -> None:
self.mysql_conf = config['game_conf']
self.redis_conf = config['global_redis_conf']
async def change_start_time(self, serverid: int, start_time: str) -> bool:
self.serverid = serverid
self.mysql_conf['db'] = f"legend_{self.serverid}"
if self.check_serverid():
if self._change_db(start_time):
if self._restart_service():
return True
else:
return False
else:
return False
else:
return False
def _reset_worldboss(self, values):
legend_db = MysqlBase(**self.mysql_conf)
tablename = "world_boss"
u_data = {'worldboss_level': values}
try:
legend_db.update(tablename, u_data, True)
log.info(f"change world_boss to {values} success!")
return True
except Exception as e:
log.error(e)
return False
# def _restart_stage(self):
# global_redis = redis.Redis(host=self.redis_conf['host'], port=self.redis_conf['port'], charset='utf8',
# db=self.redis_conf.get('db') or 0)
# host_info = json.loads(global_redis.hget('game_info', self.serverid))
# hostip = host_info.get('innerIp', None)
# if not hostip:
# log.error(f"get host with {self.serverid} failed ,{host_info}")
# return False
# restart_cmd = f"su - kingsome -c 'cd /data/apps/game1009stage && sh restart.sh {self.serverid}'"
# hostfilename = write_host(hostip)
# time.sleep(1)
# myansible = AnsiInterface(hostfilename)
# resule = myansible.exec_command(restart_cmd)
# log.debug(resule)
# if not (resule['failed'] or resule['unreachable']):
# log.info(f"restart {self.serverid} success!")
# return True
# else:
# log.error(f"restart {self.serverid} failed,output was {resule}")
# return False
def _change_db(self, opentime):
legend_db = MysqlBase(**self.mysql_conf)
# update server_info set start_time='2020-03-16 11:00:00';
tablename = "server_info"
u_data = {'start_time': opentime}
try:
legend_db.update(tablename, u_data, True)
log.info("change db success!")
return True
except Exception as e:
log.error(e)
return False
def _restart_service(self):
global_redis = redis.Redis(host=self.redis_conf['host'], port=self.redis_conf['port'], charset='utf8',
db=self.redis_conf.get('db') or 0)
host_info = json.loads(global_redis.hget('game_info', self.serverid))
hostip = host_info.get('innerIp', None)
if not hostip:
log.error(f"get host info with {self.serverid} failed!")
if int(self.serverid) < 20000:
restart_cmd = f"su - kingsome -c 'cd /data/apps/game1009game && sh restart.sh {self.serverid}'"
else:
restart_cmd = f"su - kingsome -c 'cd /data/apps/game1012game && sh restart.sh {self.serverid}'"
hostfilename = write_host(hostip)
time.sleep(1)
myansible = AnsiInterface(hostfilename)
resule = myansible.exec_command(restart_cmd)
log.debug(resule)
if not (resule['failed'] or resule['unreachable']):
log.info(f"restart {self.serverid} success!")
return True
else:
log.error(f"restart {self.serverid} failed,output was {resule}")
return False
def check_serverid(self):
global_redis = redis.Redis(host=self.redis_conf['host'], port=self.redis_conf['port'], charset='utf8',
db=self.redis_conf.get('db') or 0)
server_state = global_redis.hget('server_state', self.serverid)
if int(server_state) != 12:
return False
else:
log.info(f"{self.serverid} check ok!")
return True
async def change_worldboss(self, serverid: int, values: int) -> bool:
self.serverid = serverid
self.mysql_conf['db'] = f"legend_{self.serverid}"
if self._reset_worldboss(values):
if self._restart_stage():
return True
else:
return False
else:
return False