jump_collect/scripts/sync_data.py
pengtao 5844564b25 1
2021-11-25 14:02:36 +08:00

94 lines
3.0 KiB
Python

import json
from pydantic.fields import T
from config.config import settings
import requests
from scripts.logger import logger
import time
async def get_jump_data(db) -> bool:
if not await get_platform(db):
logger.error("get platform info failed!")
else:
if not await get_all_game(db):
logger.error("get game list failed!")
return True
async def get_platform(db) -> json:
info = get_url_data(settings.platform_url)
for platform in info.get('data'):
platformAlias = platform.get('platformAlias')
existing_platform = await db["platform"].find_one(
{"platformAlias": platformAlias})
if not existing_platform:
#platform = jsonable_encoder(platform)
new_platform = await db["platform"].insert_one(platform)
created_platform = await db["platform"].find_one(
{"_id": new_platform.inserted_id})
logger.info(f"insert platform {created_platform}")
else:
logger.info(f"key found with {existing_platform}")
return True
async def get_all_game(db) -> json:
ids = await db['platform'].find({}, {
"_id": 0,
"moduleId": 1,
"gameNum": 1
}).to_list(10)
for id in ids:
platformid = id.get("moduleId")
nums = int(id.get("gameNum")) // 10 + 1
for i in range(0, nums):
url = settings.all_game_url.format(offset=i, platformid=platformid)
info = get_url_data(url)
#logger.debug(info.get("data"))
ll = len(info.get("data"))
logger.debug(f"collect {nums}\t, {i},{url},{ll}")
if info.get("data"):
for item in info.get("data"):
gameid = item.get("oldGameId", 0)
existing_gameid = await db["gameinfo"].find_one(
{"oldGameId": gameid})
if not existing_gameid:
#logger.debug(item)
new_gameinfo = await db["gameinfo"].insert_one(item)
created_gameinfo = await db["gameinfo"].find_one(
{"_id": new_gameinfo.inserted_id})
# logger.info(f"insert gameinfo {created_gameinfo}")
else:
# logger.info(
# f"key found with {existing_gameid} in gameinfo")
await db["gameinfo"].update_one({"oldGameId": gameid},
{"$set": item})
else:
break
return True
def get_game_info(gameid: int) -> dict:
pass
def get_game_price(gameid: int) -> dict:
pass
def get_history_price(gameid) -> dict:
pass
def get_url_data(url: str, data=None) -> json:
requests.adapters.DEFAULT_RETRIES = 5
s = requests.session()
s.keep_alive = False
headers = {'Connection': 'close'}
time.sleep(1)
res = s.get(url=url, params=data, timeout=10, headers=headers).json()
return res