init
This commit is contained in:
commit
55b57b9053
25
README.MD
Normal file
25
README.MD
Normal file
@ -0,0 +1,25 @@
|
||||
### 项目简介:
|
||||
|
||||
new project with fastapi
|
||||
|
||||
https://fastapi.tiangolo.com/zh/tutorial/bigger-applications/
|
||||
|
||||
采用异步方式实现,考虑并发性能
|
||||
|
||||
### 功能
|
||||
|
||||
1. 实现认证功能
|
||||
|
||||
2. 实现接口
|
||||
|
||||
3. 后端采用redis,mysql
|
||||
|
||||
|
||||
#### route分类
|
||||
> common
|
||||
mail -clearn
|
||||
> deploy
|
||||
aliyun cdn
|
||||
|
||||
> ops
|
||||
> data_collect
|
0
__init__.py
Normal file
0
__init__.py
Normal file
0
config/__init__.py
Normal file
0
config/__init__.py
Normal file
35
config/config.py
Normal file
35
config/config.py
Normal file
@ -0,0 +1,35 @@
|
||||
from pydantic import BaseSettings
|
||||
from typing import Optional
|
||||
import os
|
||||
|
||||
from pydantic.fields import T
|
||||
|
||||
|
||||
class Settings(BaseSettings):
|
||||
app_name: str = "Kingsome API"
|
||||
admin_email: str = "pengtao@kingsome.cn"
|
||||
items_per_user: int = 50
|
||||
is_debug: bool = True
|
||||
|
||||
redis_host: str = "192.168.100.30" # reids 服务器IP
|
||||
redis_port: int = 6379 # redis 端口
|
||||
redis_db: int = 2 # redis db
|
||||
|
||||
mail_server: str = "smtp.exmail.qq.com" # 邮箱server
|
||||
mail_user: str = "ops@kingsome.cn" # 邮箱用户名
|
||||
mail_pswd: Optional[str] = os.getenv('mail_pswd') # 邮箱密码
|
||||
|
||||
x_token: str = "abc"
|
||||
|
||||
root_path_in_servers: Optional[bool] = False
|
||||
root_path: str = '/api/v1'
|
||||
|
||||
origins: list = [
|
||||
"http://*.kingsome.cn",
|
||||
"https://*.kingsome.cn",
|
||||
"http://localhost",
|
||||
"http://localhost:8080",
|
||||
]
|
||||
|
||||
|
||||
settings = Settings()
|
22
dependencies.py
Normal file
22
dependencies.py
Normal file
@ -0,0 +1,22 @@
|
||||
from fastapi import Header, HTTPException
|
||||
from pydantic.fields import T
|
||||
from starlette.requests import Request
|
||||
import pdb
|
||||
from config.config import settings
|
||||
|
||||
|
||||
async def get_token_header(request: Request, x_token: str = Header(...)):
|
||||
client = request.app.state.redis
|
||||
dd = await client.get('x_token')
|
||||
# pdb.set_trace()
|
||||
#print(dd, settings.x_token, x_token)
|
||||
if x_token == settings.x_token or x_token == dd:
|
||||
return True
|
||||
else:
|
||||
raise HTTPException(status_code=400, detail="X-Token header invalid")
|
||||
|
||||
|
||||
async def get_query_token(token: str):
|
||||
if token != "jessica":
|
||||
raise HTTPException(
|
||||
status_code=400, detail="No Jessica token provided")
|
BIN
jobs.sqlite
Normal file
BIN
jobs.sqlite
Normal file
Binary file not shown.
86
main.py
Normal file
86
main.py
Normal file
@ -0,0 +1,86 @@
|
||||
# uvicorn main:app --host=127.0.0.1 --port=8000 --reload
|
||||
from config.config import settings
|
||||
from fastapi import Depends, FastAPI, BackgroundTasks, Request
|
||||
from dependencies import get_token_header
|
||||
from ops.imgsmall import route as img_route
|
||||
|
||||
from typing import Optional
|
||||
from scripts.common.redis import get_redis_pool
|
||||
import pdb
|
||||
from scripts.logger import logger
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
|
||||
# from apscheduler.events import EVENT_JOB_EXECUTED
|
||||
# from jobs.jobs import Schedule, job_execute
|
||||
tags_metadata = [
|
||||
# {
|
||||
# "name": "common",
|
||||
# "description": "Operations with users. The **login** logic is also here.",
|
||||
# },
|
||||
{
|
||||
"name": "common",
|
||||
"description": "Manage items. So _fancy_ they have their own docs.",
|
||||
"externalDocs": {
|
||||
"description": "Items external docs",
|
||||
"url": "https://fastapi.tiangolo.com/",
|
||||
},
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
def create_app():
|
||||
application = FastAPI(dependencies=[Depends(get_token_header)],
|
||||
openapi_tags=tags_metadata)
|
||||
application.include_router(img_route.router, prefix="/img")
|
||||
return application
|
||||
|
||||
|
||||
app = create_app()
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=settings.origins,
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
|
||||
@app.on_event("startup")
|
||||
async def startup_event():
|
||||
app.state.redis = await get_redis_pool()
|
||||
# Schedule.start()
|
||||
# Schedule.add_listener(job_execute, EVENT_JOB_EXECUTED)
|
||||
|
||||
|
||||
@app.on_event("shutdown")
|
||||
async def shutdown_event():
|
||||
app.state.redis.close()
|
||||
await app.state.redis.wait_close()
|
||||
|
||||
|
||||
@app.get("/")
|
||||
async def root(request: Request):
|
||||
redis_client = request.app.state.redis
|
||||
keys = await redis_client.get("online_devices")
|
||||
logger.info("get keys was {0} with {1}".format(keys, request.url))
|
||||
if keys:
|
||||
return {"message": "Hello Bigger Applications! {}".format(keys)}
|
||||
else:
|
||||
return {"message": "kajsk"}
|
||||
|
||||
|
||||
|
||||
# @app.post("/send-notification/{email}")
|
||||
# async def send_notification(request: Request, email: str, background_tasks: BackgroundTasks, q: str = Depends(get_query)):
|
||||
# redis_client = request.app.state.redis
|
||||
# keys = await redis_client.get("online_devices")
|
||||
# logger.info("get keys = {}".format(keys))
|
||||
# message = f"message to {email} \n"
|
||||
# background_tasks.add_task(write_log, message)
|
||||
# return {"message": "Message sent"}
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import uvicorn
|
||||
uvicorn.run(app='main:app', host="127.0.0.1",
|
||||
port=8010, reload=True, debug=True)
|
0
ops/__init__.py
Normal file
0
ops/__init__.py
Normal file
0
ops/imgsmall/__init__.py
Normal file
0
ops/imgsmall/__init__.py
Normal file
42
ops/imgsmall/route.py
Normal file
42
ops/imgsmall/route.py
Normal file
@ -0,0 +1,42 @@
|
||||
from fastapi import APIRouter, BackgroundTasks, UploadFile, File, Form
|
||||
from typing import List
|
||||
from pydantic import BaseModel
|
||||
from starlette.requests import Request
|
||||
from starlette.responses import JSONResponse
|
||||
from config.config import settings
|
||||
router = APIRouter()
|
||||
|
||||
@router.get("/img")
|
||||
async def get_img(url:string)-> JSONResponse:
|
||||
|
||||
|
||||
# class EmailSchema(BaseModel):
|
||||
# email: List[EmailStr]
|
||||
# body: str
|
||||
# subject: str
|
||||
|
||||
|
||||
# MAIL_CONF = ConnectionConfig(
|
||||
# MAIL_USERNAME=settings.mail_user,
|
||||
# MAIL_PASSWORD=settings.mail_pswd,
|
||||
# MAIL_FROM=settings.mail_user,
|
||||
# MAIL_PORT='465',
|
||||
# MAIL_SERVER=settings.mail_server,
|
||||
# MAIL_TLS=False,
|
||||
# MAIL_SSL=True,
|
||||
# # USE_CREDENTIALS=True
|
||||
# )
|
||||
|
||||
|
||||
@router.post("/email")
|
||||
async def simple_send(email: EmailSchema) -> JSONResponse:
|
||||
message = MessageSchema(
|
||||
subject=email.dict().get("subject"),
|
||||
# List of recipients, as many as you can pass
|
||||
recipients=email.dict().get("email"),
|
||||
body="<html>{}</html>".format(email.dict().get("body")),
|
||||
subtype="html"
|
||||
)
|
||||
fm = FastMail(MAIL_CONF)
|
||||
await fm.send_message(message)
|
||||
return JSONResponse(status_code=200, content={"message": "email has been sent"})
|
7
requests.txt
Normal file
7
requests.txt
Normal file
@ -0,0 +1,7 @@
|
||||
aioredis==1.3.1
|
||||
fastapi==0.65.2
|
||||
fastapi-mail==0.3.7
|
||||
GitPython==3.1.18
|
||||
pydantic==1.8.2
|
||||
redis==3.2.1
|
||||
uvicorn==0.15.0
|
0
scripts/__init__.py
Normal file
0
scripts/__init__.py
Normal file
26
scripts/common/__init__.py
Normal file
26
scripts/common/__init__.py
Normal file
@ -0,0 +1,26 @@
|
||||
import subprocess
|
||||
from scripts.log import logger
|
||||
|
||||
|
||||
@logger.catch
|
||||
def run_cmd(cmd, shell=True, timeout=120):
|
||||
'''
|
||||
Run command with arguments, wait to complete and return ``True`` on success.
|
||||
|
||||
:param cls: The class as implicit first argument.
|
||||
:param cmd: Command string to be executed.
|
||||
:returns : ``True`` on success, otherwise ``None``.
|
||||
:rtype : ``bool``
|
||||
'''
|
||||
logger.debug('Execute command: {0}'.format(cmd))
|
||||
status = True
|
||||
try:
|
||||
out_bytes = subprocess.check_output(
|
||||
cmd, shell=shell, stderr=subprocess.STDOUT, timeout=timeout)
|
||||
except subprocess.CalledProcessError as e:
|
||||
out_bytes = e.output # Output generated before error
|
||||
code = e.returncode # Return code
|
||||
logger.error(f"run {cmd} failed,out={out_bytes},code={code}")
|
||||
status = False
|
||||
|
||||
return out_bytes, status
|
9
scripts/common/get_img.py
Normal file
9
scripts/common/get_img.py
Normal file
@ -0,0 +1,9 @@
|
||||
import shutil
|
||||
import requests
|
||||
def get_img(url):
|
||||
url = 'http://imgsmall.dmzj.com/h/59665/121592/0.jpg'
|
||||
headers = {'Referer': 'https://www.dmzj.com/'}
|
||||
response = requests.get(url=url, headers=headers, stream=True)
|
||||
with open('0.jpg', 'wb') as out_file:
|
||||
shutil.copyfileobj(response.raw, out_file)
|
||||
del response
|
14
scripts/common/redis.py
Normal file
14
scripts/common/redis.py
Normal file
@ -0,0 +1,14 @@
|
||||
from aioredis import create_redis_pool, Redis
|
||||
from config.config import settings
|
||||
|
||||
# settings = {
|
||||
# "redis_host": "192.168.100.30",
|
||||
# "redis_port": 6379,
|
||||
# "redis_db": 1,
|
||||
|
||||
# }
|
||||
|
||||
|
||||
async def get_redis_pool() -> Redis:
|
||||
redis = await create_redis_pool("redis://{ip}:{port}/{db}?encoding=utf-8".format(ip=settings.redis_host, port=settings.redis_port, db=settings.redis_db))
|
||||
return redis
|
20
scripts/logger.py
Normal file
20
scripts/logger.py
Normal file
@ -0,0 +1,20 @@
|
||||
import os
|
||||
import time
|
||||
from loguru import logger
|
||||
|
||||
basedir = os.path.dirname(
|
||||
os.path.dirname(os.path.abspath(__file__)))
|
||||
|
||||
# print(f"log basedir{basedir}") # /xxx/python_code/FastAdmin/backend/app
|
||||
# 定位到log日志文件
|
||||
log_path = os.path.join(basedir, 'logs')
|
||||
|
||||
if not os.path.exists(log_path):
|
||||
os.mkdir(log_path)
|
||||
|
||||
log_path_error = os.path.join(
|
||||
log_path, f'{time.strftime("%Y-%m-%d")}_error.log')
|
||||
|
||||
# 日志简单配置
|
||||
# 具体其他配置 可自行参考 https://github.com/Delgan/loguru
|
||||
logger.add(log_path_error, rotation="12:00", retention="5 days", enqueue=True)
|
30
test.py
Normal file
30
test.py
Normal file
@ -0,0 +1,30 @@
|
||||
from fastapi.testclient import TestClient
|
||||
from ops.common.common import EmailSchema
|
||||
from main import app
|
||||
import time
|
||||
import pdb
|
||||
import json
|
||||
client = TestClient(app)
|
||||
|
||||
|
||||
def test_read_main():
|
||||
response = client.get("/")
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {"message": "Hello Bigger Applications!"}
|
||||
|
||||
|
||||
def test_post_mail():
|
||||
data = {"email": ['pengtao@kingsome.cn'], "body": 'test message!',
|
||||
"subject": 'ttt {}'.format(time.time())}
|
||||
response = client.post("/common/email", headers={"X-Token": "fake-super-scret-token", "accept": "application/json", "Content-Type": "application/json"},
|
||||
data=json.dumps(data),
|
||||
)
|
||||
# data=EmailSchema(**data).json()
|
||||
print(response)
|
||||
# pdb.set_trace()
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {"message": "email has been sent"}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_post_mail()
|
Loading…
x
Reference in New Issue
Block a user