# -*- coding: utf-8 -*- from myredis.myredis import my_redis from ops.mtga import FromTga, GetTgaConfig import datetime from mysql.mmysql import MysqlBase from prod_config import mysql_promotion_config from log.mylog import define_logger import logging from collections import defaultdict import json define_logger("/data/logs/dump2shshu.log") import pdb log = logging.getLogger(__name__) redis_key = f"ad::jumpRecording" def write2tga(data): print(f"3 {data}") for gameid in data.keys(): g = GetTgaConfig() item = g.get_api_key(gameid) if not item.get('appid', None): result = {'errcode': 2, "errmsg": f"get config via gameid failed ,gameid={gameid}"} log.error(result) return False url = item['url'] tgaid = 1234 # tgaid = item['tgaid'] api_key = item.get('api_key', None) tga = FromTga(url, api_key) tga.init_tga_write(tgaid) event_type = "jump_recording" for new in list(data[gameid]): try: line=json.loads(new) tga_data = {} tga_data['ad_channel'] = line['ad_channel'] tga_data['account_id'] = line['uid'] tga_data['gameid'] = gameid tga_data['recordtime'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") tga_data['locationid'] = line['locationid'] tga_data['adid'] = line['adid'] print(f"{tga_data}") tga.put_event_data(tga_data, f'rep_{event_type}') except Exception: log.error(f"write 2 tga failed! data={tga_data}", exc_info=True) def get_from_redis(): while 1: data = defaultdict(set) for i in range(1, 1000): try: one_line = my_redis.lpop(redis_key) if one_line: print(f"get {one_line}") new = json.loads(one_line) item = new['gameid'] data[item].add(one_line) else: continue except Exception: log.error(f"get data failed!", exc_info=True) print(f"2={data}") write2tga(data) if my_redis.llen(redis_key) == 0: break def main(): get_from_redis() if __name__ == "__main__": main()