133 lines
5.1 KiB
Python
133 lines
5.1 KiB
Python
import time
|
|
from typing import Union
|
|
from datetime import datetime
|
|
from fastapi import FastAPI, Query, Body, APIRouter
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
from scripts.logger import logger
|
|
from jobs.jobs import Schedule, aps_test1
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
# 简单定义返回
|
|
|
|
|
|
def resp_ok(*, code=0, msg="ok", data: Union[list, dict, str] = None) -> dict:
|
|
return {"code": code, "msg": msg, "data": data}
|
|
|
|
|
|
def resp_fail(*, code=1, msg="fail", data: Union[list, dict, str] = None):
|
|
return {"code": code, "msg": msg, "data": data}
|
|
|
|
|
|
def cron_task(a1: str) -> None:
|
|
print(a1, time.strftime("'%Y-%m-%d %H:%M:%S'"))
|
|
|
|
|
|
def app_test01(msg: str = "hi"):
|
|
logger.info(f"run with {msg}")
|
|
|
|
|
|
@router.get("/all", tags=["schedule"], summary="获取所有job信息")
|
|
async def get_scheduled_syncs():
|
|
"""
|
|
获取所有job
|
|
:return:
|
|
"""
|
|
schedules = []
|
|
for job in Schedule.get_jobs():
|
|
schedules.append(
|
|
{"job_id": job.id, "func_name": job.func_ref, "func_args": job.args, "cron_model": str(job.trigger),
|
|
"next_run": str(job.next_run_time)}
|
|
)
|
|
return resp_ok(data=schedules)
|
|
|
|
|
|
@router.get("/once", tags=["schedule"], summary="获取指定的job信息")
|
|
async def get_target_sync(
|
|
job_id: str = Query("job1", title="任务id")
|
|
):
|
|
job = Schedule.get_job(job_id=job_id)
|
|
|
|
if not job:
|
|
return resp_fail(msg=f"not found job {job_id}")
|
|
|
|
return resp_ok(
|
|
data={"job_id": job.id, "func_name": job.func_ref, "func_args": job.args, "cron_model": str(job.trigger),
|
|
"next_run": str(job.next_run_time)})
|
|
|
|
|
|
# interval 固定间隔时间调度
|
|
@router.post("/interval/schedule/", tags=["schedule"], summary="开启定时:间隔时间循环")
|
|
async def add_interval_job(seconds: int = Body(120, title="循环间隔时间/秒,默认120s", embed=True),
|
|
job_id: str = Body(..., title="任务id", embed=True),
|
|
run_time: int = Body(time.time(), title="第一次运行时间",
|
|
description="默认立即执行", embed=True)
|
|
):
|
|
res = Schedule.get_job(job_id=job_id)
|
|
if res:
|
|
return resp_fail(msg=f"{job_id} job already exists")
|
|
schedule_job = Schedule.add_job(app_test01,
|
|
'interval',
|
|
args=(job_id,),
|
|
seconds=seconds, # 循环间隔时间 秒
|
|
id=job_id, # job ID
|
|
next_run_time=datetime.fromtimestamp(
|
|
run_time) # 立即执行
|
|
)
|
|
return resp_ok(data={"job_id": schedule_job.id})
|
|
|
|
|
|
# date 某个特定时间点只运行一次
|
|
@router.post("/date/schedule/", tags=["schedule"], summary="开启定时:固定只运行一次时间")
|
|
async def add_date_job(cronname: str, args: tuple,
|
|
run_time: str = Body(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), title="时间戳", description="固定只运行一次时间", embed=True),
|
|
job_id: str = Body(..., title="任务id", embed=True),
|
|
):
|
|
res = Schedule.get_job(job_id=job_id)
|
|
if res:
|
|
return resp_fail(msg=f"{job_id} job already exists")
|
|
schedule_job = Schedule.add_job(cronname,
|
|
'date',
|
|
args=args,
|
|
run_date=datetime.strptime(
|
|
run_time, '%Y-%m-%d %H:%M:%S'),
|
|
id=job_id, # job ID
|
|
)
|
|
logger.info("add job {}".format(schedule_job.id))
|
|
return resp_ok(data={"job_id": schedule_job.id})
|
|
|
|
|
|
# cron 更灵活的定时任务 可以使用crontab表达式
|
|
@router.post("/cron/schedule/", tags=["schedule"], summary="开启定时:crontab表达式")
|
|
async def add_cron_job(
|
|
job_id: str = Body(..., title="任务id", embed=True),
|
|
crontab: str = Body('*/1 * * * *', title="crontab 表达式"),
|
|
run_time: int = Body(time.time(), title="第一次运行时间",
|
|
description="默认立即执行", embed=True)
|
|
):
|
|
res = Schedule.get_job(job_id=job_id)
|
|
if res:
|
|
return resp_fail(msg=f"{job_id} job already exists")
|
|
schedule_job = Schedule.add_job(cron_task,
|
|
CronTrigger.from_crontab(crontab),
|
|
args=(job_id,),
|
|
id=job_id, # job ID
|
|
next_run_time=datetime.fromtimestamp(
|
|
run_time)
|
|
)
|
|
|
|
return resp_ok(data={"job_id": schedule_job.id})
|
|
|
|
|
|
@router.post("/del", tags=["schedule"], summary="移除任务")
|
|
async def remove_schedule(
|
|
job_id: str = Body(..., title="任务id", embed=True)
|
|
):
|
|
res = Schedule.get_job(job_id=job_id)
|
|
if not res:
|
|
return resp_fail(msg=f"not found job {job_id}")
|
|
Schedule.remove_job(job_id)
|
|
logger.info("remove job with id={}".format(job_id))
|
|
return resp_ok()
|