This commit is contained in:
pengtao 2021-06-25 10:24:38 +08:00
parent df97c9815d
commit 431fc5cd25
10 changed files with 258 additions and 32 deletions

BIN
jobs.sqlite Normal file

Binary file not shown.

0
jobs/__init__.py Normal file
View File

40
jobs/jobs.py Normal file
View File

@ -0,0 +1,40 @@
import datetime
from scripts.logger import logger
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from scripts.logger import logger
import json
Schedule = AsyncIOScheduler(
jobstores={
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
},
executors={
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}, job_defaults={
'coalesce': False,
'max_instances': 3
}
)
def job_execute(event):
"""
监听事件处理
:param event:
:return:
"""
logger.info(
"job执行job:\ncode => {}\njob.id => {}\njobstore=>{}".format(
event.code,
event.job_id,
event.jobstore
))
def aps_test1(x):
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), json.dumps(x))
logger.info("run appt_test1,msg={}".format(json.dumps(x)))

129
jobs/route.py Normal file
View File

@ -0,0 +1,129 @@
import time
from typing import Union
from datetime import datetime
from fastapi import FastAPI, Query, Body, APIRouter
from apscheduler.triggers.cron import CronTrigger
from scripts.logger import logger
from jobs.jobs import Schedule, aps_test1
router = APIRouter()
# 简单定义返回
def resp_ok(*, code=0, msg="ok", data: Union[list, dict, str] = None) -> dict:
return {"code": code, "msg": msg, "data": data}
def resp_fail(*, code=1, msg="fail", data: Union[list, dict, str] = None):
return {"code": code, "msg": msg, "data": data}
def cron_task(a1: str) -> None:
print(a1, time.strftime("'%Y-%m-%d %H:%M:%S'"))
@router.get("/all", tags=["schedule"], summary="获取所有job信息")
async def get_scheduled_syncs():
"""
获取所有job
:return:
"""
schedules = []
for job in Schedule.get_jobs():
schedules.append(
{"job_id": job.id, "func_name": job.func_ref, "func_args": job.args, "cron_model": str(job.trigger),
"next_run": str(job.next_run_time)}
)
return resp_ok(data=schedules)
@router.get("/once", tags=["schedule"], summary="获取指定的job信息")
async def get_target_sync(
job_id: str = Query("job1", title="任务id")
):
job = Schedule.get_job(job_id=job_id)
if not job:
return resp_fail(msg=f"not found job {job_id}")
return resp_ok(
data={"job_id": job.id, "func_name": job.func_ref, "func_args": job.args, "cron_model": str(job.trigger),
"next_run": str(job.next_run_time)})
# interval 固定间隔时间调度
@router.post("/interval/schedule/", tags=["schedule"], summary="开启定时:间隔时间循环")
async def add_interval_job(
seconds: int = Body(120, title="循环间隔时间/秒,默认120s", embed=True),
job_id: str = Body(..., title="任务id", embed=True),
run_time: int = Body(time.time(), title="第一次运行时间",
description="默认立即执行", embed=True)
):
res = Schedule.get_job(job_id=job_id)
if res:
return resp_fail(msg=f"{job_id} job already exists")
schedule_job = Schedule.add_job(cron_task,
'interval',
args=(job_id,),
seconds=seconds, # 循环间隔时间 秒
id=job_id, # job ID
next_run_time=datetime.fromtimestamp(
run_time) # 立即执行
)
return resp_ok(data={"job_id": schedule_job.id})
# date 某个特定时间点只运行一次
@router.post("/date/schedule/", tags=["schedule"], summary="开启定时:固定只运行一次时间")
async def add_date_job(cronname: str, args: tuple,
run_time: str = Body(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), title="时间戳", description="固定只运行一次时间", embed=True),
job_id: str = Body(..., title="任务id", embed=True),
):
res = Schedule.get_job(job_id=job_id)
if res:
return resp_fail(msg=f"{job_id} job already exists")
schedule_job = Schedule.add_job(cronname,
'date',
args=args,
run_date=datetime.strptime(
run_time, '%Y-%m-%d %H:%M:%S'),
id=job_id, # job ID
)
logger.info("add job {}".format(schedule_job.id))
return resp_ok(data={"job_id": schedule_job.id})
# cron 更灵活的定时任务 可以使用crontab表达式
@router.post("/cron/schedule/", tags=["schedule"], summary="开启定时:crontab表达式")
async def add_cron_job(
job_id: str = Body(..., title="任务id", embed=True),
crontab: str = Body('*/1 * * * *', title="crontab 表达式"),
run_time: int = Body(time.time(), title="第一次运行时间",
description="默认立即执行", embed=True)
):
res = Schedule.get_job(job_id=job_id)
if res:
return resp_fail(msg=f"{job_id} job already exists")
schedule_job = Schedule.add_job(cron_task,
CronTrigger.from_crontab(crontab),
args=(job_id,),
id=job_id, # job ID
next_run_time=datetime.fromtimestamp(
run_time)
)
return resp_ok(data={"job_id": schedule_job.id})
@router.post("/del", tags=["schedule"], summary="移除任务")
async def remove_schedule(
job_id: str = Body(..., title="任务id", embed=True)
):
res = Schedule.get_job(job_id=job_id)
if not res:
return resp_fail(msg=f"not found job {job_id}")
Schedule.remove_job(job_id)
logger.info("remove job with id={}".format(job_id))
return resp_ok()

15
main.py
View File

@ -2,13 +2,15 @@
from config.config import settings
from fastapi import Depends, FastAPI, BackgroundTasks, Request
from dependencies import get_token_header
from ops.common import common
from ops.deploy import deploy
from ops.common import route as common_route
from ops.deploy import route as deploy_route
from jobs import route as jobs_route
from typing import Optional
from scripts.common.redis import get_redis_pool
import pdb
from scripts.logger import logger
from apscheduler.events import EVENT_JOB_EXECUTED
from jobs.jobs import Schedule, job_execute
tags_metadata = [
# {
# "name": "common",
@ -28,8 +30,9 @@ tags_metadata = [
def create_app():
application = FastAPI(dependencies=[Depends(get_token_header)],
openapi_tags=tags_metadata)
application.include_router(common.router, prefix="/common")
application.include_router(deploy.router, prefix="/deploy")
application.include_router(common_route.router, prefix="/common")
application.include_router(deploy_route.router, prefix="/deploy")
application.include_router(jobs_route.router, prefix="/jobs")
return application
@ -39,6 +42,8 @@ app = create_app()
@app.on_event("startup")
async def startup_event():
app.state.redis = await get_redis_pool()
Schedule.start()
Schedule.add_listener(job_execute, EVENT_JOB_EXECUTED)
@app.on_event("shutdown")

View File

@ -4,12 +4,12 @@ from starlette.requests import Request
from starlette.responses import JSONResponse
from config.config import settings
from scripts.common.deploy import ProjectInfo, deploy_service
from uuid import uuid1
from uuid import uuid4
router = APIRouter()
@router.post("/server")
@router.post("/server", tags=["deploy"], summary="部署服务器端")
async def simple_send(project: ProjectInfo, tag: str, background_tasks: BackgroundTasks) -> JSONResponse:
uuid = uuid1
uuid = uuid4().hex
background_tasks.add_task(deploy_service, project, tag, uuid)
return JSONResponse(status_code=200, content={"message": "{0} {1} deploy success,uuid={2}!".format(project.name, tag, uuid)})

View File

@ -1,8 +1,9 @@
import subprocess
import logging
from scripts.log import logger
def run_cmd(cmd, shell=True):
@logger.catch
def run_cmd(cmd, shell=True, timeout=120):
'''
Run command with arguments, wait to complete and return ``True`` on success.
@ -11,13 +12,15 @@ def run_cmd(cmd, shell=True):
:returns : ``True`` on success, otherwise ``None``.
:rtype : ``bool``
'''
# logger = logger.getLogger('SPOT.INGEST.COMMON.UTIL')
logging.debug('Execute command: {0}'.format(cmd))
logger.debug('Execute command: {0}'.format(cmd))
status = True
try:
subprocess.call(cmd, shell=shell)
return True
out_bytes = subprocess.check_output(
cmd, shell=shell, stderr=subprocess.STDOUT, timeout=timeout)
except subprocess.CalledProcessError as e:
out_bytes = e.output # Output generated before error
code = e.returncode # Return code
logger.error(f"run {cmd} failed,out={out_bytes},code={code}")
status = False
except Exception as exc:
logging.error('[{0}] {1}'.format(
exc.__class__.__name__, exc.message))
return out_bytes, status

View File

@ -1,11 +1,14 @@
import re
from subprocess import run
from pydantic import BaseModel
from pydantic.fields import T
from scripts.common.cvs import GitRepository
import os
from fastapi import HTTPException, Request
from . import run_cmd
from .ansible import AnsibleAPI, write_host
from scripts.logger import logger
from loguru import logger as base_logger, os, time
class ProjectInfo(BaseModel):
@ -17,23 +20,69 @@ class ProjectInfo(BaseModel):
name: str
def create_deploy_log(uuid):
basedir = os.path.dirname(os.path.abspath(__file__))
log_path = os.path.join(basedir, 'logs')
if not os.path.exists(log_path):
os.mkdir(log_path)
log_path_deploy = os.path.join(log_path, f'{uuid}_deploy.log')
base_logger.add(log_path_deploy, retention="20 days")
return base_logger
async def deploy_service(request: Request, project: ProjectInfo, tag: str, uuid: str):
client = request.app.state.redis
dd = await client.set('x_token')
# git clone
yaml_files = "./playbook/1.yaml"
deploy_log = create_deploy_log(uuid)
tag_files = build_project(project, deploy_log, tag)
if tag_files:
run_data = {"source": tag_files}
deploy_log = create_deploy_log(uuid)
if publish_remote(project.host, yaml_files, run_data, deploy_log):
logger.info(f"deploy {project.name} with {tag} success!")
else:
logger.error(f"push remote failed!")
else:
logger.error("git failed")
@logger.trace
def build_project(project: ProjectInfo, deploy_log: logger, tag: str):
# git clone
local_path = project.base_dir+'/'+tag
#print(local_path, project.git_url, tag)
git = GitRepository(local_path=local_path, repo_url=project.git_url)
git.pull()
git.change_to_tag(tag)
try:
git = GitRepository(local_path=local_path, repo_url=project.git_url)
git.pull()
git.change_to_tag(tag)
deploy_log.debug(f"git files from {project.name}")
except:
logger.error(f"git {project.name} failed")
deploy_log.error(f"git {project.name} failed")
return False
# run pre scripts
run_cmd(project.pre_script)
out, status = run_cmd(project.pre_script)
if not status:
return False
# find tag files
hosts = write_host(project.host)
an = AnsibleAPI(hosts)
run_data = {
"desc": "a"
}
run_out = an.run_playbook('test,yml', run_data)
logger.info(run_out)
tag_files = "1.tag.gz"
return tag_files
@logger.trace
def publish_remote(host: str, yaml_files: str, run_data: dict, deploy_log: logger):
# run ansible-play deloy tag files && run start script in remote
hosts = write_host(host)
an = AnsibleAPI(hosts)
run_out = an.run_playbook(yaml_files, run_data)
print(run_out)
if re.findall('error', run_out):
deploy_log.error(f"deploy failed")
deploy_log.error(run_out)
return False
else:
deploy_log.info(run_out)
return True

View File

@ -2,8 +2,8 @@ import os
import time
from loguru import logger
basedir = os.path.dirname(os.path.dirname(
os.path.dirname(os.path.abspath(__file__))))
basedir = os.path.dirname(
os.path.dirname(os.path.abspath(__file__)))
# print(f"log basedir{basedir}") # /xxx/python_code/FastAdmin/backend/app
# 定位到log日志文件