myops/jobs/route.py
pengtao 9e4adf241f 1
2021-07-06 17:24:30 +08:00

133 lines
5.1 KiB
Python

import time
from typing import Union
from datetime import datetime
from fastapi import FastAPI, Query, Body, APIRouter
from apscheduler.triggers.cron import CronTrigger
from scripts.logger import logger
from jobs.jobs import Schedule, aps_test1
router = APIRouter()
# 简单定义返回
def resp_ok(*, code=0, msg="ok", data: Union[list, dict, str] = None) -> dict:
return {"code": code, "msg": msg, "data": data}
def resp_fail(*, code=1, msg="fail", data: Union[list, dict, str] = None):
return {"code": code, "msg": msg, "data": data}
def cron_task(a1: str) -> None:
print(a1, time.strftime("'%Y-%m-%d %H:%M:%S'"))
def app_test01(msg: str = "hi"):
logger.info(f"run with {msg}")
@router.get("/all", tags=["schedule"], summary="获取所有job信息")
async def get_scheduled_syncs():
"""
获取所有job
:return:
"""
schedules = []
for job in Schedule.get_jobs():
schedules.append(
{"job_id": job.id, "func_name": job.func_ref, "func_args": job.args, "cron_model": str(job.trigger),
"next_run": str(job.next_run_time)}
)
return resp_ok(data=schedules)
@router.get("/once", tags=["schedule"], summary="获取指定的job信息")
async def get_target_sync(
job_id: str = Query("job1", title="任务id")
):
job = Schedule.get_job(job_id=job_id)
if not job:
return resp_fail(msg=f"not found job {job_id}")
return resp_ok(
data={"job_id": job.id, "func_name": job.func_ref, "func_args": job.args, "cron_model": str(job.trigger),
"next_run": str(job.next_run_time)})
# interval 固定间隔时间调度
@router.post("/interval/schedule/", tags=["schedule"], summary="开启定时:间隔时间循环")
async def add_interval_job(seconds: int = Body(120, title="循环间隔时间/秒,默认120s", embed=True),
job_id: str = Body(..., title="任务id", embed=True),
run_time: int = Body(time.time(), title="第一次运行时间",
description="默认立即执行", embed=True)
):
res = Schedule.get_job(job_id=job_id)
if res:
return resp_fail(msg=f"{job_id} job already exists")
schedule_job = Schedule.add_job(app_test01,
'interval',
args=(job_id,),
seconds=seconds, # 循环间隔时间 秒
id=job_id, # job ID
next_run_time=datetime.fromtimestamp(
run_time) # 立即执行
)
return resp_ok(data={"job_id": schedule_job.id})
# date 某个特定时间点只运行一次
@router.post("/date/schedule/", tags=["schedule"], summary="开启定时:固定只运行一次时间")
async def add_date_job(cronname: str, args: tuple,
run_time: str = Body(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), title="时间戳", description="固定只运行一次时间", embed=True),
job_id: str = Body(..., title="任务id", embed=True),
):
res = Schedule.get_job(job_id=job_id)
if res:
return resp_fail(msg=f"{job_id} job already exists")
schedule_job = Schedule.add_job(cronname,
'date',
args=args,
run_date=datetime.strptime(
run_time, '%Y-%m-%d %H:%M:%S'),
id=job_id, # job ID
)
logger.info("add job {}".format(schedule_job.id))
return resp_ok(data={"job_id": schedule_job.id})
# cron 更灵活的定时任务 可以使用crontab表达式
@router.post("/cron/schedule/", tags=["schedule"], summary="开启定时:crontab表达式")
async def add_cron_job(
job_id: str = Body(..., title="任务id", embed=True),
crontab: str = Body('*/1 * * * *', title="crontab 表达式"),
run_time: int = Body(time.time(), title="第一次运行时间",
description="默认立即执行", embed=True)
):
res = Schedule.get_job(job_id=job_id)
if res:
return resp_fail(msg=f"{job_id} job already exists")
schedule_job = Schedule.add_job(cron_task,
CronTrigger.from_crontab(crontab),
args=(job_id,),
id=job_id, # job ID
next_run_time=datetime.fromtimestamp(
run_time)
)
return resp_ok(data={"job_id": schedule_job.id})
@router.post("/del", tags=["schedule"], summary="移除任务")
async def remove_schedule(
job_id: str = Body(..., title="任务id", embed=True)
):
res = Schedule.get_job(job_id=job_id)
if not res:
return resp_fail(msg=f"not found job {job_id}")
Schedule.remove_job(job_id)
logger.info("remove job with id={}".format(job_id))
return resp_ok()