jump_collect/scripts/sync_data.py
2021-11-23 18:04:58 +08:00

65 lines
1.7 KiB
Python

import json
from pydantic.fields import T
from config.config import settings
import requests
from starlette.requests import Request
from starlette.responses import JSONResponse
from fastapi.encoders import jsonable_encoder
from scripts.logger import logger
async def get_jump_data(db) -> bool:
platform_info = get_platform()
platforms = platform_info.get('data')
for platform in platforms:
platformAlias = platform.get('platformAlias')
existing_platform = await db["platform"].find_one(
{"platformAlias": platformAlias})
if not existing_platform:
platform = jsonable_encoder(a)
new_platform = await db["platform"].insert_one(platform)
created_platform = await db["platform"].find_one(
{"_id": new_platform.inserted_id})
logger.info(f"insert platform {created_platform}")
else:
logger.info(f"key found with {existing_platform}")
# all_game = get_all_game()
# #gameids = all_game['data']
# print(all_game)
return True
# for gameid in gameids:
# game_info = get_game_info(gameid)
def get_platform() -> json:
info = get_url_data(settings.platform_url)
return info
def get_all_game() -> json:
info = get_url_data(settings.all_game_url)
print(info)
return None
def get_game_info(gameid: int) -> dict:
pass
def get_game_price(gameid: int) -> dict:
pass
def get_history_price(gameid) -> dict:
pass
def get_url_data(url: str, data=None) -> json:
requests.adapters.DEFAULT_RETRIES = 5
s = requests.session()
s.keep_alive = False
headers = {'Connection': 'close'}
res = s.get(url=url, params=data, timeout=10, headers=headers).json()
return res