import json from pydantic.fields import T from config.config import settings import requests from scripts.logger import logger import time async def get_jump_data(db) -> bool: if not await get_platform(db): logger.error("get platform info failed!") else: if not await get_all_game(db): logger.error("get game list failed!") return True async def get_platform(db) -> json: info = get_url_data(settings.platform_url) for platform in info.get('data'): platformAlias = platform.get('platformAlias') existing_platform = await db["platform"].find_one( {"platformAlias": platformAlias}) if not existing_platform: #platform = jsonable_encoder(platform) new_platform = await db["platform"].insert_one(platform) created_platform = await db["platform"].find_one( {"_id": new_platform.inserted_id}) logger.info(f"insert platform {created_platform}") else: logger.info(f"key found with {existing_platform}") return True async def get_all_game(db) -> json: ids = await db['platform'].find({}, { "_id": 0, "moduleId": 1, "gameNum": 1 }).to_list(10) for id in ids: platformid = id.get("moduleId") nums = int(id.get("gameNum")) // 10 + 1 for i in range(0, nums): url = settings.all_game_url.format(offset=i, platformid=platformid) info = get_url_data(url) #logger.debug(info.get("data")) ll = len(info.get("data")) logger.debug(f"collect {nums}\t, {i},{url},{ll}") if info.get("data"): for item in info.get("data"): gameid = item.get("oldGameId", 0) existing_gameid = await db["gameinfo"].find_one( {"oldGameId": gameid}) if not existing_gameid: #logger.debug(item) new_gameinfo = await db["gameinfo"].insert_one(item) created_gameinfo = await db["gameinfo"].find_one( {"_id": new_gameinfo.inserted_id}) # logger.info(f"insert gameinfo {created_gameinfo}") else: # logger.info( # f"key found with {existing_gameid} in gameinfo") await db["gameinfo"].update_one({"oldGameId": gameid}, {"$set": item}) else: break return True def get_game_info(gameid: int) -> dict: pass def get_game_price(gameid: int) -> dict: pass def get_history_price(gameid) -> dict: pass def get_url_data(url: str, data=None) -> json: requests.adapters.DEFAULT_RETRIES = 5 s = requests.session() s.keep_alive = False headers = {'Connection': 'close'} time.sleep(1) res = s.get(url=url, params=data, timeout=10, headers=headers).json() return res