# -*- coding: utf-8 -*- from ops.mmongo import MongodbBase import pdb # mongodb://10.10.5.4/taptap games # { # "_id": "5d91abaa7ed6a01e9824f941", # "tags": ["魔性", "io", "休闲"], # "source": "taptap", # "cateName": "download", # "icon": "https://img.tapimg.com/market/lcs/601735dfe48b2fbac327851d75b0dc72_360.png?imageMogr2/auto-orient/strip", # "order": 1, # "title": "我的大刀四十米(测试版)", # "author": "厂商: 睡神飞工作室", # "score": 8.3, # "desc": "明明可以靠抄袭,偏偏却要死磕原创。我不知道明明是谁,反正我们是偏偏。全新玩法io类游戏来啦!魔性欢快地砍来砍去吧!开局一把刀越砍刀越长谁挡我砍谁我长我有理不服来砍我睡神飞工作室继《我飞刀玩得贼6》之后又一新品!魔性翻倍!快乐翻倍!", # "cate": "休闲", # "gameid": 176279, # "date": "2019-09-30", # "watch": 65640, # "reserve": 0, # "sell": 0, # "download": 0, # "review": 199, # "topic": 36, # "__v": 0, # "createdAt": "2019-09-30T07:15:54.041Z", # "updatedAt": "2019-09-30T07:15:54.041Z" # } # event_3_3_8002 = {} # event_3_3_8002['db'] = "beagle-production" # event_3_3_8002['coll'] = "compay_record" # event_3_3_8002['event_name'] = "event_3_3" # event_3_3_8002['gameid'] = 8002 # event_3_3_8002['pipeline'] = [{'$match': {'createdAt': {'$gte': start_date, '$lt': end_date}}}, # {"$unwind": "$wechat_pay_result"}, { # '$project': {'money_records': 1, 'pay_type': 1, 'account': 1, 'status': 1, 'money': 1, 'createdAt': 1, # 'updatedAt': 1, 'return_code': '$wechat_pay_result.return_code', # 'result_code': '$wechat_pay_result.result_code', 'return_msg': '$wechat_pay_result.return_msg', # 'payment_no': '$wechat_pay_result.payment_no', 'payment_time': '$wechat_pay_result.payment_time', # 'original_msg': '$wechat_pay_result.original_msg', 'payment_no': 1, 'payment_time': 1, # 'distinct_id': '$_id', 'object_id': '$_id', '_id': 0}}] # # # { # "_id" : ObjectId("5d92b4f713c3487bd374c691"), # "tags" : [ # "魔性", # "io", # "休闲" # ], # "source" : "taptap", # "cateName" : "download", # "icon" : "https://img.tapimg.com/market/lcs/601735dfe48b2fbac327851d75b0dc72_360.png?imageMogr2/auto-orient/strip", # "order" : 1, # "title" : "我的大刀四十米(测试版)", # "author" : "厂商: 睡神飞工作室", # "score" : 8.6, # "desc" : "明明可以靠抄袭,偏偏却要死磕原创。我不知道明明是谁,反正我们是偏偏。全新玩法io类游戏来啦!魔性欢快地砍来砍去吧!开局一把刀越砍刀越长谁挡我砍谁我长我有理不服来砍我睡神飞工作室继《我飞刀玩得贼6》之后又一新品!魔性翻倍!快乐翻倍!", # "cate" : "休闲", # "gameid" : 176279, # "date" : "2019-09-30", # "watch" : 118674, # "reserve" : 0, # "sell" : 0, # "download" : 0, # "review" : 274, # "topic" : 49, # "__v" : 0, # "createdAt" : ISODate("2019-10-01T02:07:51.256Z"), # "updatedAt" : ISODate("2019-10-01T02:07:51.256Z") # } from ops.mtga import FromTga from ops.plog import define_logger import logging import datetime import sys from bson.objectid import ObjectId import json define_logger("/data/logs/ops/reptile2ss.log") log = logging.getLogger(__name__) MONGOINFO = {'host': '10.10.5.6', 'port': 27017} conver_float = ('score',) conver_int = ('order', 'gameid', 'watch', 'reserve', 'sell', 'download', 'review', 'topic') class CollectMongo(): def __init__(self, item): self.url = "http://10.10.3.17:8992/querySql" self.api_secret = "n9H4R32ZcjtSeN89ljCY6ESzTmOlnwwnOB3r4YsggnP5M1AXLtKtiS4sS1KKLOEQ" self.tga = FromTga(url=self.url, token=self.api_secret) self.item = item self.gameid = self.item.get('gameid') self.tgaid = self.item.get('tgaid') # 初始化tga写方法初始化 self.tga.init_tga_write(self.tgaid) def get_data_from_mongo(self): dbset = MONGOINFO.copy() dbset['dbname'] = self.item.get('db') dbset['collname'] = self.item.get('coll') m = MongodbBase(**dbset) data = list(m.query(self.item.get('pipeline'))) # print(data) return data def write2ss(self, data): event_name = self.item.get('event_name') run_false = False try: nums = len(data) except: nums = 0 for line in data: if line: for key in line.keys(): if isinstance(line[key], ObjectId): line[key] = str(line[key]) # line[key]=ObjectId(base64.b64decode(key)) elif isinstance(line[key], (list, tuple, dict)): line[key] = json.dumps(line[key]) line['account_id'] = str(line['object_id']) line['#time'] = line['createdAt'] try: line['tags'] = ",".join(json.loads(line['tags'])) except Exception: log.error(f"correct {line['tags']} Failed") line['tags'] = "" for item in conver_float: try: line[item] = float(line[item]) except Exception: log.error(f"correct {line[item]} Failed") line[item] = 0 for item in conver_int: try: line[item] = int(line[item]) except Exception: log.error(f"correct {line[item]} Failed") line[item] = 0 if not self.tga.put_event_data(line, event_name): log.error("write {}_{} event error! {}\n".format(event_name, self.tgaid, line)) run_false = True if not run_false: log.info("write {}_{} num={} success!\n".format(event_name, self.tgaid, nums)) def run(self): data = self.get_data_from_mongo() self.write2ss(data) def read_mongo(start_date, end_date): print(f"start ={start_date},end={end_date}") all_type = [] reptile_4_1 = {} reptile_4_1['db'] = "taptap" reptile_4_1['coll'] = "games" reptile_4_1['event_name'] = "reptile_4_1" # reptile_4_1['gameid'] = 9999 reptile_4_1['tgaid'] = 6 reptile_4_1['pipeline'] = [{'$match': {'createdAt': {'$gte': start_date, '$lt': end_date}}}, { '$project': {'tags': 1, 'source': 1, 'cateName': 1, 'icon': 1, 'order': 1, 'title': 1, 'author': 1, 'score': 1, 'cate': 1, 'gameid': 1, 'watch': 1, 'reserve': 1, 'sell': 1, 'download': 1, 'review': 1, 'topic': 1, 'createdAt': 1, 'updatedAt': 1, 'object_id': '$_id', '_id': 0}}] all_type.append(reptile_4_1) for item in all_type: cc = CollectMongo(item) cc.run() if __name__ == "__main__": if len(sys.argv) == 2: try: d = datetime.datetime.strptime(sys.argv[1], "%Y-%m-%d") except Exception: log.error(f"split args with start time failed ,args={d}", exc_info=True) raise Exception(f"split args with start time failed ,args={d}") else: d = datetime.datetime.now() - datetime.timedelta(days=1) start_date = datetime.datetime(d.year, d.month, d.day) end_date = datetime.datetime(d.year, d.month, d.day, 23, 59, 59) read_mongo(start_date, end_date)