接口写入redis,另一个脚本将REDIS数据写入数数
This commit is contained in:
parent
236bd72da2
commit
cbf285a6ce
@ -44,62 +44,18 @@ class DispatchHandler(tornado.web.RequestHandler):
|
|||||||
self._selfupJumpRecording()
|
self._selfupJumpRecording()
|
||||||
|
|
||||||
def _selfupJumpRecording(self):
|
def _selfupJumpRecording(self):
|
||||||
"""| gameid | int | 是 | 游戏ID |
|
|
||||||
| locationid | int | 是 | 广告位置ID |
|
|
||||||
| uid | int | 是 | 用户ID或微信openid |
|
|
||||||
| ad_channel | int | 是 | 广告特征码 见上ad_property.jump_param |
|
|
||||||
| adid | int | 是 | 广告ID |
|
|
||||||
"""
|
|
||||||
try:
|
try:
|
||||||
post_data = self.request.body_arguments
|
post_data = self.request.body_arguments
|
||||||
post_data = {x: post_data.get(x)[0].decode("utf-8") for x in post_data.keys()}
|
post_data = {x: post_data.get(x)[0].decode("utf-8") for x in post_data.keys()}
|
||||||
if not post_data:
|
if not post_data:
|
||||||
post_data = self.request.body.decode('utf-8')
|
post_data = self.request.body.decode('utf-8')
|
||||||
post_data = json.loads(post_data)
|
post_data = json.loads(post_data)
|
||||||
gameid = post_data['gameid']
|
redis_key = f"ad::jumpRecording"
|
||||||
locationid = post_data['locationid']
|
my_redis.lpush(redis_key, post_data)
|
||||||
uid = post_data['uid']
|
return self.write({'errcode': 0, "errmsg": '', "message": ""})
|
||||||
ad_channel = post_data['ad_channel']
|
|
||||||
adid = post_data['adid']
|
|
||||||
print(f"gameid={gameid},locationid={locationid},uid={uid},ad_channel={ad_channel},adid={adid}")
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
result = {'errcode': 2, "errmsg": f"get args failed,{str(e)}"}
|
result = {'errcode': 2, "errmsg": f"get args failed,{str(e)}"}
|
||||||
log.error(result)
|
log.error(result)
|
||||||
return self.write({'errcode': 1, "errmsg": 'get args failed!'})
|
|
||||||
|
|
||||||
g = GetTgaConfig()
|
|
||||||
item = g.get_api_key(gameid)
|
|
||||||
if not (gameid and locationid and uid and ad_channel and adid):
|
|
||||||
log.error(f"")
|
|
||||||
return self.write({'errcode': 1, "errmsg": 'get args failed!'})
|
|
||||||
|
|
||||||
if not item.get('appid', None):
|
|
||||||
result = {'errcode': 2, "errmsg": f"get config via gameid failed ,gameid={gameid}"}
|
|
||||||
log.error(result)
|
|
||||||
return self.write({'errcode': 1, "errmsg": f'{gameid} not in define!'})
|
|
||||||
|
|
||||||
self.url = item['url']
|
|
||||||
self.tgaid = 1234
|
|
||||||
# self.tgaid = item['tgaid']
|
|
||||||
self.suffix = item.get('suffix', 0)
|
|
||||||
self.api_key = item.get('api_key', None)
|
|
||||||
self.tga = FromTga(self.url, self.api_key)
|
|
||||||
self.tga.init_tga_write(self.tgaid)
|
|
||||||
self.event_type = "jump_recording"
|
|
||||||
try:
|
|
||||||
tga_data = {}
|
|
||||||
tga_data['ad_channel'] = ad_channel
|
|
||||||
tga_data['account_id'] = uid
|
|
||||||
tga_data['gameid'] = gameid
|
|
||||||
tga_data['recordtime'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
||||||
tga_data['locationid'] = locationid
|
|
||||||
tga_data['adid'] = adid
|
|
||||||
print(f"{tga_data}")
|
|
||||||
self.tga.put_event_data(tga_data, f'rep_{self.event_type}')
|
|
||||||
return self.write({'errcode': 0, "errmsg": '', "message": ""})
|
|
||||||
except Exception:
|
|
||||||
log.error(f"write 2 tga failed!", exc_info=True)
|
|
||||||
return ({'errcode': 2, "errmsg": f'write failed!'})
|
|
||||||
|
|
||||||
|
|
||||||
def _selfGetLocation(self):
|
def _selfGetLocation(self):
|
||||||
|
70
dump2shushu.py
Normal file
70
dump2shushu.py
Normal file
@ -0,0 +1,70 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
from myredis.myredis import my_redis
|
||||||
|
from ops.mtga import FromTga, GetTgaConfig
|
||||||
|
import datetime
|
||||||
|
from mysql.mmysql import MysqlBase
|
||||||
|
from prod_config import mysql_promotion_config
|
||||||
|
from log.mylog import define_logger
|
||||||
|
import logging
|
||||||
|
from collections import defaultdict
|
||||||
|
|
||||||
|
define_logger("/data/logs/dump2shshu.log")
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
redis_key = f"ad::jumpRecording"
|
||||||
|
|
||||||
|
|
||||||
|
def write2tga(data):
|
||||||
|
for gameid in data.keys():
|
||||||
|
g = GetTgaConfig()
|
||||||
|
item = g.get_api_key(gameid)
|
||||||
|
if not item.get('appid', None):
|
||||||
|
result = {'errcode': 2, "errmsg": f"get config via gameid failed ,gameid={gameid}"}
|
||||||
|
log.error(result)
|
||||||
|
return False
|
||||||
|
url = item['url']
|
||||||
|
tgaid = 1234
|
||||||
|
# tgaid = item['tgaid']
|
||||||
|
|
||||||
|
api_key = item.get('api_key', None)
|
||||||
|
tga = FromTga(url, api_key)
|
||||||
|
tga.init_tga_write(tgaid)
|
||||||
|
event_type = "jump_recording"
|
||||||
|
for line in data[gameid]:
|
||||||
|
try:
|
||||||
|
tga_data = {}
|
||||||
|
tga_data['ad_channel'] = line['ad_channel']
|
||||||
|
tga_data['account_id'] = line['uid']
|
||||||
|
tga_data['gameid'] = gameid
|
||||||
|
tga_data['recordtime'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
tga_data['locationid'] = ['locationid']
|
||||||
|
tga_data['adid'] = ['adid']
|
||||||
|
print(f"{tga_data}")
|
||||||
|
tga.put_event_data(tga_data, f'rep_{event_type}')
|
||||||
|
except Exception:
|
||||||
|
log.error(f"write 2 tga failed! data={tga_data}", exc_info=True)
|
||||||
|
|
||||||
|
|
||||||
|
def get_from_redis():
|
||||||
|
while 1:
|
||||||
|
data = defaultdict(set)
|
||||||
|
all = []
|
||||||
|
for i in range(1, 1000):
|
||||||
|
try:
|
||||||
|
all.append(my_redis.brpop(redis_key))
|
||||||
|
except:
|
||||||
|
log.error(f"get data failed!")
|
||||||
|
for one in all:
|
||||||
|
item = one['item']
|
||||||
|
data[item].add(one)
|
||||||
|
write2tga(data)
|
||||||
|
if my_redis.llen(redis_key) == 0:
|
||||||
|
break
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
get_from_redis()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
Loading…
x
Reference in New Issue
Block a user