myops/main.py
2021-06-21 17:38:32 +08:00

75 lines
1.9 KiB
Python

# uvicorn main:app --host=127.0.0.1 --port=8000 --reload
from fastapi import Depends, FastAPI, BackgroundTasks
from dependencies import get_query_token, get_token_header
# from routers import items, users
# from internal import admin
from ops.common import common
from typing import Optional
from scripts.common.redis import get_redis_pool
# import sys
# sys.path.append('.')
tags_metadata = [
# {
# "name": "common",
# "description": "Operations with users. The **login** logic is also here.",
# },
{
"name": "common",
"description": "Manage items. So _fancy_ they have their own docs.",
"externalDocs": {
"description": "Items external docs",
"url": "https://fastapi.tiangolo.com/",
},
},
]
app = FastAPI(dependencies=[Depends(get_token_header)],
openapi_tags=tags_metadata)
@app.on_event("startup")
async def startup_event():
app.state.redis = await get_redis_pool()
@app.on_event("shutdown")
async def shutdown_event():
app.state.redis.close()
await app.state.redis.wait_close()
app.include_router(common.router)
@app.get("/")
async def root():
return {"message": "Hello Bigger Applications!"}
def write_log(message: str) -> True:
with open("log.txt", mode="a") as log:
log.write(message)
return True
def get_query(background_tasks: BackgroundTasks, q: Optional[str] = None):
if q:
message = f"found query:{q}\n"
background_tasks.add_task(write_log, message)
return q
@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks, q: str = Depends(get_query)):
message = f"message to {email} \n"
background_tasks.add_task(write_log, message)
return {"message": "Message sent"}
if __name__ == '__main__':
import uvicorn
uvicorn.run(app='main:app', host="127.0.0.1",
port=8010, reload=True, debug=True)