base model

This commit is contained in:
pengtao 2021-06-21 12:07:23 +08:00
parent 5bbbb1d680
commit 20313358c3
8 changed files with 132 additions and 0 deletions

0
__init__.py Normal file
View File

12
dependencies.py Normal file
View File

@ -0,0 +1,12 @@
from fastapi import Header, HTTPException
async def get_token_header(x_token: str = Header(...)):
if x_token != "fake-super-scret-token":
raise HTTPException(status_code=400, detail="X-Token header invalid")
async def get_query_token(token: str):
if token != "jessica":
raise HTTPException(
status_code=400, detail="No Jessica token provided")

0
internal/__inti__.py Normal file
View File

7
internal/admin.py Normal file
View File

@ -0,0 +1,7 @@
from fastapi import APIRouter
router = APIRouter()
@router.post("/")
async def update_admin():
return {"message": "Admin getting schwifty"}

64
main.py Normal file
View File

@ -0,0 +1,64 @@
# cd .. && uvicorn myops.main:app --reload
from fastapi import Depends, FastAPI, BackgroundTasks
from .dependencies import get_query_token, get_token_header
from .routers import items, users
from .internal import admin
from typing import Optional
tags_metadata = [
{
"name": "users",
"description": "Operations with users. The **login** logic is also here.",
},
{
"name": "items",
"description": "Manage items. So _fancy_ they have their own docs.",
"externalDocs": {
"description": "Items external docs",
"url": "https://fastapi.tiangolo.com/",
},
},
]
# app = FastAPI(dependencies=[Depends(get_token_header)], title="My ops project",
# description="This is some interface for ops in kingsome!", version="1.0.1")
app = FastAPI(dependencies=[Depends(get_token_header)],
openapi_tags=tags_metadata)
app.include_router(users.router)
app.include_router(items.router)
app.include_router(
admin.router,
prefix="/admin",
tags=["admin"],
dependencies=[Depends(get_token_header)],
responses={418: {"description": "I`m a teapot"}}
)
app.get("/")
async def root():
return {"message": "Hello Bigger Applications!"}
def write_log(message: str):
with open("log.txt", mode="a") as log:
log.write(message)
def get_query(background_tasks: BackgroundTasks, q: Optional[str] = None):
if q:
message = f"found query:{q}\n"
background_tasks.add_task(write_log, message)
return q
@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks, q: str = Depends(get_query)):
message = f"message to {email} \n"
background_tasks.add_task(write_log, message)
return {"message": "Message sent"}

0
routers/__init__.py Normal file
View File

32
routers/items.py Normal file
View File

@ -0,0 +1,32 @@
from fastapi import APIRouter, Depends, HTTPException
from ..dependencies import get_token_header
router = APIRouter(
prefix="/items",
tags=["item"],
dependencies=[Depends(get_token_header)],
responses={404: {"description": "Not found"}},
)
fake_item_db = {"plumbus": {"name": "Plumbus", "gun": {"name": "Portal Gun"}}}
@router.get("/")
async def read_items():
return fake_item_db
@router.get("/{itemid}")
async def read_item(item_id: str):
if item_id not in fake_item_db:
raise HTTPException(status_code=404, detail="Item not found")
return {"name": fake_item_db[item_id]["name"], "item_id": item_id}
@router.put("/{item_id}", tags=["custom"], responses={403: {"description": "Operation forbidden"}})
async def update_item(item_id: str):
if item_id != "pulmbus":
raise HTTPException(
status_code=403, detail="You can only update the item plumbus")
return {"item_id": item_id, "name": "The great plumbus"}

17
routers/users.py Normal file
View File

@ -0,0 +1,17 @@
from fastapi import APIRouter
router = APIRouter()
@router.get('/users/', tags=["users"])
async def read_users():
return [{"username": "Rick"}, {"username": "Morty"}]
@router.get("/user/me", tags=["users"])
async def read_user_me():
return {"username": "fakecurrentuser"}
@router.get("/user/{username}", tags=["users"])
async def read_user(username: str):
return {"username": username}