From 431fc5cd2588a163bab392dfaf70556c86e431d2 Mon Sep 17 00:00:00 2001 From: pengtao Date: Fri, 25 Jun 2021 10:24:38 +0800 Subject: [PATCH] 1 --- jobs.sqlite | Bin 0 -> 16384 bytes jobs/__init__.py | 0 jobs/jobs.py | 40 +++++++++ jobs/route.py | 129 +++++++++++++++++++++++++++++ main.py | 15 ++-- ops/common/{common.py => route.py} | 0 ops/deploy/{deploy.py => route.py} | 6 +- scripts/common/__init__.py | 23 ++--- scripts/common/deploy.py | 73 +++++++++++++--- scripts/logger.py | 4 +- 10 files changed, 258 insertions(+), 32 deletions(-) create mode 100644 jobs.sqlite create mode 100644 jobs/__init__.py create mode 100644 jobs/jobs.py create mode 100644 jobs/route.py rename ops/common/{common.py => route.py} (100%) rename ops/deploy/{deploy.py => route.py} (83%) diff --git a/jobs.sqlite b/jobs.sqlite new file mode 100644 index 0000000000000000000000000000000000000000..e5392716dae6879e43d854c2380e827d561b5b19 GIT binary patch literal 16384 zcmeI&-*3`D902f^Rwy8JE=JtLVjm$94Dm@{lrhW}7$~68r|wn`I0ps$Lz!{O&`e#@ zx1IlpFa96yZ_rm0vxmKyc*hSNuuLC(*y6XOX?wl)?ml--_+DFiP#_N0>K&_%9POqs zCy1hOSJMPRh_F>=>v-|8o#V&i_{1sOh>$A1il|3|6txA_uYO`1SU>;-KmY_l00ck) z1V8`;KmY{(4}t!i6o{3QVn0pH8t##vj=Hwd#5K2tE&XX{!`4mQb9Bo!b%(U^^jSGu z&Sk1OZMC?Zd#wHOa;;RNOp32n2W_-wN}RsjH>yE3vsB2PaPv@* z)U-#La&{$Cj;C*>6I!uU)rt=bh3i@?_%d2ad z@)Iqed!ofjEfGrG@%dwwq$mso?Po0(!|TX(I)md`&-8Q)BsSC(KHPYaFGjb7=+?#W zVZu{j0Ra#I0T2KI5C8!X009vA2Lj^WHF?)T-LCFn+exnT8f>V=otraH>GPZNF0ifxLlIE6=B!%Pa_@W6|Dzjya(HWc{ckIi4<60A1eE$1cm ze0rDevzkXTHbFNkO=LEjh|sm2ZeH-DY^6#aTBdK=>TY-n?qP!kDXfx#8m=R^<**we zt6|fov`anz=8M6>HcK@TFF$y!h4XD<*NKJohJ_3~hzz-tVsuao+Xkk4UZjnBIx$%k zWg6I~dC7}R1Y5MZ;HFr-^KMbzwZ^pT2TG6iL-VwLP{x_hOgDtidpEq1#S6o3um{h> z`sW`X|Dqd)na@f$T)N6%inH3By{F;*%+nyA^fd5sB&vskddPo#P{9HMAOHd&00JNY z0w4eaAOHd&00JOzo&=)fS*tXE-5(hr`NGm7n^+vqqzvQ#BSAe {}\njob.id => {}\njobstore=>{}".format( + event.code, + event.job_id, + event.jobstore + )) + + +def aps_test1(x): + print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), json.dumps(x)) + logger.info("run appt_test1,msg={}".format(json.dumps(x))) diff --git a/jobs/route.py b/jobs/route.py new file mode 100644 index 0000000..2d28478 --- /dev/null +++ b/jobs/route.py @@ -0,0 +1,129 @@ +import time +from typing import Union +from datetime import datetime +from fastapi import FastAPI, Query, Body, APIRouter +from apscheduler.triggers.cron import CronTrigger +from scripts.logger import logger +from jobs.jobs import Schedule, aps_test1 + +router = APIRouter() + + +# 简单定义返回 + + +def resp_ok(*, code=0, msg="ok", data: Union[list, dict, str] = None) -> dict: + return {"code": code, "msg": msg, "data": data} + + +def resp_fail(*, code=1, msg="fail", data: Union[list, dict, str] = None): + return {"code": code, "msg": msg, "data": data} + + +def cron_task(a1: str) -> None: + print(a1, time.strftime("'%Y-%m-%d %H:%M:%S'")) + + +@router.get("/all", tags=["schedule"], summary="获取所有job信息") +async def get_scheduled_syncs(): + """ + 获取所有job + :return: + """ + schedules = [] + for job in Schedule.get_jobs(): + schedules.append( + {"job_id": job.id, "func_name": job.func_ref, "func_args": job.args, "cron_model": str(job.trigger), + "next_run": str(job.next_run_time)} + ) + return resp_ok(data=schedules) + + +@router.get("/once", tags=["schedule"], summary="获取指定的job信息") +async def get_target_sync( + job_id: str = Query("job1", title="任务id") +): + job = Schedule.get_job(job_id=job_id) + + if not job: + return resp_fail(msg=f"not found job {job_id}") + + return resp_ok( + data={"job_id": job.id, "func_name": job.func_ref, "func_args": job.args, "cron_model": str(job.trigger), + "next_run": str(job.next_run_time)}) + + +# interval 固定间隔时间调度 +@router.post("/interval/schedule/", tags=["schedule"], summary="开启定时:间隔时间循环") +async def add_interval_job( + seconds: int = Body(120, title="循环间隔时间/秒,默认120s", embed=True), + job_id: str = Body(..., title="任务id", embed=True), + run_time: int = Body(time.time(), title="第一次运行时间", + description="默认立即执行", embed=True) +): + res = Schedule.get_job(job_id=job_id) + if res: + return resp_fail(msg=f"{job_id} job already exists") + schedule_job = Schedule.add_job(cron_task, + 'interval', + args=(job_id,), + seconds=seconds, # 循环间隔时间 秒 + id=job_id, # job ID + next_run_time=datetime.fromtimestamp( + run_time) # 立即执行 + ) + return resp_ok(data={"job_id": schedule_job.id}) + + +# date 某个特定时间点只运行一次 +@router.post("/date/schedule/", tags=["schedule"], summary="开启定时:固定只运行一次时间") +async def add_date_job(cronname: str, args: tuple, + run_time: str = Body(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), title="时间戳", description="固定只运行一次时间", embed=True), + job_id: str = Body(..., title="任务id", embed=True), + ): + res = Schedule.get_job(job_id=job_id) + if res: + return resp_fail(msg=f"{job_id} job already exists") + schedule_job = Schedule.add_job(cronname, + 'date', + args=args, + run_date=datetime.strptime( + run_time, '%Y-%m-%d %H:%M:%S'), + id=job_id, # job ID + ) + logger.info("add job {}".format(schedule_job.id)) + return resp_ok(data={"job_id": schedule_job.id}) + + +# cron 更灵活的定时任务 可以使用crontab表达式 +@router.post("/cron/schedule/", tags=["schedule"], summary="开启定时:crontab表达式") +async def add_cron_job( + job_id: str = Body(..., title="任务id", embed=True), + crontab: str = Body('*/1 * * * *', title="crontab 表达式"), + run_time: int = Body(time.time(), title="第一次运行时间", + description="默认立即执行", embed=True) +): + res = Schedule.get_job(job_id=job_id) + if res: + return resp_fail(msg=f"{job_id} job already exists") + schedule_job = Schedule.add_job(cron_task, + CronTrigger.from_crontab(crontab), + args=(job_id,), + id=job_id, # job ID + next_run_time=datetime.fromtimestamp( + run_time) + ) + + return resp_ok(data={"job_id": schedule_job.id}) + + +@router.post("/del", tags=["schedule"], summary="移除任务") +async def remove_schedule( + job_id: str = Body(..., title="任务id", embed=True) +): + res = Schedule.get_job(job_id=job_id) + if not res: + return resp_fail(msg=f"not found job {job_id}") + Schedule.remove_job(job_id) + logger.info("remove job with id={}".format(job_id)) + return resp_ok() diff --git a/main.py b/main.py index 0068707..d80497d 100644 --- a/main.py +++ b/main.py @@ -2,13 +2,15 @@ from config.config import settings from fastapi import Depends, FastAPI, BackgroundTasks, Request from dependencies import get_token_header -from ops.common import common -from ops.deploy import deploy +from ops.common import route as common_route +from ops.deploy import route as deploy_route +from jobs import route as jobs_route from typing import Optional from scripts.common.redis import get_redis_pool import pdb from scripts.logger import logger - +from apscheduler.events import EVENT_JOB_EXECUTED +from jobs.jobs import Schedule, job_execute tags_metadata = [ # { # "name": "common", @@ -28,8 +30,9 @@ tags_metadata = [ def create_app(): application = FastAPI(dependencies=[Depends(get_token_header)], openapi_tags=tags_metadata) - application.include_router(common.router, prefix="/common") - application.include_router(deploy.router, prefix="/deploy") + application.include_router(common_route.router, prefix="/common") + application.include_router(deploy_route.router, prefix="/deploy") + application.include_router(jobs_route.router, prefix="/jobs") return application @@ -39,6 +42,8 @@ app = create_app() @app.on_event("startup") async def startup_event(): app.state.redis = await get_redis_pool() + Schedule.start() + Schedule.add_listener(job_execute, EVENT_JOB_EXECUTED) @app.on_event("shutdown") diff --git a/ops/common/common.py b/ops/common/route.py similarity index 100% rename from ops/common/common.py rename to ops/common/route.py diff --git a/ops/deploy/deploy.py b/ops/deploy/route.py similarity index 83% rename from ops/deploy/deploy.py rename to ops/deploy/route.py index 54f8f37..0ea2fc6 100644 --- a/ops/deploy/deploy.py +++ b/ops/deploy/route.py @@ -4,12 +4,12 @@ from starlette.requests import Request from starlette.responses import JSONResponse from config.config import settings from scripts.common.deploy import ProjectInfo, deploy_service -from uuid import uuid1 +from uuid import uuid4 router = APIRouter() -@router.post("/server") +@router.post("/server", tags=["deploy"], summary="部署服务器端") async def simple_send(project: ProjectInfo, tag: str, background_tasks: BackgroundTasks) -> JSONResponse: - uuid = uuid1 + uuid = uuid4().hex background_tasks.add_task(deploy_service, project, tag, uuid) return JSONResponse(status_code=200, content={"message": "{0} {1} deploy success,uuid={2}!".format(project.name, tag, uuid)}) diff --git a/scripts/common/__init__.py b/scripts/common/__init__.py index 4c5ec20..3658478 100644 --- a/scripts/common/__init__.py +++ b/scripts/common/__init__.py @@ -1,8 +1,9 @@ import subprocess -import logging +from scripts.log import logger -def run_cmd(cmd, shell=True): +@logger.catch +def run_cmd(cmd, shell=True, timeout=120): ''' Run command with arguments, wait to complete and return ``True`` on success. @@ -11,13 +12,15 @@ def run_cmd(cmd, shell=True): :returns : ``True`` on success, otherwise ``None``. :rtype : ``bool`` ''' - # logger = logger.getLogger('SPOT.INGEST.COMMON.UTIL') - logging.debug('Execute command: {0}'.format(cmd)) - + logger.debug('Execute command: {0}'.format(cmd)) + status = True try: - subprocess.call(cmd, shell=shell) - return True + out_bytes = subprocess.check_output( + cmd, shell=shell, stderr=subprocess.STDOUT, timeout=timeout) + except subprocess.CalledProcessError as e: + out_bytes = e.output # Output generated before error + code = e.returncode # Return code + logger.error(f"run {cmd} failed,out={out_bytes},code={code}") + status = False - except Exception as exc: - logging.error('[{0}] {1}'.format( - exc.__class__.__name__, exc.message)) + return out_bytes, status diff --git a/scripts/common/deploy.py b/scripts/common/deploy.py index d6c73ba..cf49b19 100644 --- a/scripts/common/deploy.py +++ b/scripts/common/deploy.py @@ -1,11 +1,14 @@ import re +from subprocess import run from pydantic import BaseModel +from pydantic.fields import T from scripts.common.cvs import GitRepository import os from fastapi import HTTPException, Request from . import run_cmd from .ansible import AnsibleAPI, write_host from scripts.logger import logger +from loguru import logger as base_logger, os, time class ProjectInfo(BaseModel): @@ -17,23 +20,69 @@ class ProjectInfo(BaseModel): name: str +def create_deploy_log(uuid): + basedir = os.path.dirname(os.path.abspath(__file__)) + log_path = os.path.join(basedir, 'logs') + + if not os.path.exists(log_path): + os.mkdir(log_path) + log_path_deploy = os.path.join(log_path, f'{uuid}_deploy.log') + base_logger.add(log_path_deploy, retention="20 days") + return base_logger + + async def deploy_service(request: Request, project: ProjectInfo, tag: str, uuid: str): client = request.app.state.redis dd = await client.set('x_token') - # git clone + yaml_files = "./playbook/1.yaml" + deploy_log = create_deploy_log(uuid) + tag_files = build_project(project, deploy_log, tag) + if tag_files: + run_data = {"source": tag_files} + deploy_log = create_deploy_log(uuid) + if publish_remote(project.host, yaml_files, run_data, deploy_log): + logger.info(f"deploy {project.name} with {tag} success!") + else: + logger.error(f"push remote failed!") + else: + logger.error("git failed") + + +@logger.trace +def build_project(project: ProjectInfo, deploy_log: logger, tag: str): + # git clone local_path = project.base_dir+'/'+tag #print(local_path, project.git_url, tag) - git = GitRepository(local_path=local_path, repo_url=project.git_url) - git.pull() - git.change_to_tag(tag) + try: + git = GitRepository(local_path=local_path, repo_url=project.git_url) + git.pull() + git.change_to_tag(tag) + deploy_log.debug(f"git files from {project.name}") + except: + logger.error(f"git {project.name} failed") + deploy_log.error(f"git {project.name} failed") + return False + # run pre scripts - run_cmd(project.pre_script) + out, status = run_cmd(project.pre_script) + if not status: + return False # find tag files - hosts = write_host(project.host) - an = AnsibleAPI(hosts) - run_data = { - "desc": "a" - } - run_out = an.run_playbook('test,yml', run_data) - logger.info(run_out) + tag_files = "1.tag.gz" + return tag_files + + +@logger.trace +def publish_remote(host: str, yaml_files: str, run_data: dict, deploy_log: logger): # run ansible-play deloy tag files && run start script in remote + hosts = write_host(host) + an = AnsibleAPI(hosts) + run_out = an.run_playbook(yaml_files, run_data) + print(run_out) + if re.findall('error', run_out): + deploy_log.error(f"deploy failed") + deploy_log.error(run_out) + return False + else: + deploy_log.info(run_out) + return True diff --git a/scripts/logger.py b/scripts/logger.py index dc30da9..4f6ab0c 100644 --- a/scripts/logger.py +++ b/scripts/logger.py @@ -2,8 +2,8 @@ import os import time from loguru import logger -basedir = os.path.dirname(os.path.dirname( - os.path.dirname(os.path.abspath(__file__)))) +basedir = os.path.dirname( + os.path.dirname(os.path.abspath(__file__))) # print(f"log basedir{basedir}") # /xxx/python_code/FastAdmin/backend/app # 定位到log日志文件