datacollect/taptap/collect_mongodb.py

195 lines
7.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
from ops.mmongo import MongodbBase
import pdb
# mongodb://10.10.5.4/taptap games
# {
# "_id": "5d91abaa7ed6a01e9824f941",
# "tags": ["魔性", "io", "休闲"],
# "source": "taptap",
# "cateName": "download",
# "icon": "https://img.tapimg.com/market/lcs/601735dfe48b2fbac327851d75b0dc72_360.png?imageMogr2/auto-orient/strip",
# "order": 1,
# "title": "我的大刀四十米(测试版)",
# "author": "厂商: 睡神飞工作室",
# "score": 8.3,
# "desc": "明明可以靠抄袭偏偏却要死磕原创。我不知道明明是谁反正我们是偏偏。全新玩法io类游戏来啦魔性欢快地砍来砍去吧开局一把刀越砍刀越长谁挡我砍谁我长我有理不服来砍我睡神飞工作室继《我飞刀玩得贼6》之后又一新品魔性翻倍快乐翻倍",
# "cate": "休闲",
# "gameid": 176279,
# "date": "2019-09-30",
# "watch": 65640,
# "reserve": 0,
# "sell": 0,
# "download": 0,
# "review": 199,
# "topic": 36,
# "__v": 0,
# "createdAt": "2019-09-30T07:15:54.041Z",
# "updatedAt": "2019-09-30T07:15:54.041Z"
# }
# event_3_3_8002 = {}
# event_3_3_8002['db'] = "beagle-production"
# event_3_3_8002['coll'] = "compay_record"
# event_3_3_8002['event_name'] = "event_3_3"
# event_3_3_8002['gameid'] = 8002
# event_3_3_8002['pipeline'] = [{'$match': {'createdAt': {'$gte': start_date, '$lt': end_date}}},
# {"$unwind": "$wechat_pay_result"}, {
# '$project': {'money_records': 1, 'pay_type': 1, 'account': 1, 'status': 1, 'money': 1, 'createdAt': 1,
# 'updatedAt': 1, 'return_code': '$wechat_pay_result.return_code',
# 'result_code': '$wechat_pay_result.result_code', 'return_msg': '$wechat_pay_result.return_msg',
# 'payment_no': '$wechat_pay_result.payment_no', 'payment_time': '$wechat_pay_result.payment_time',
# 'original_msg': '$wechat_pay_result.original_msg', 'payment_no': 1, 'payment_time': 1,
# 'distinct_id': '$_id', 'object_id': '$_id', '_id': 0}}]
#
#
# {
# "_id" : ObjectId("5d92b4f713c3487bd374c691"),
# "tags" : [
# "魔性",
# "io",
# "休闲"
# ],
# "source" : "taptap",
# "cateName" : "download",
# "icon" : "https://img.tapimg.com/market/lcs/601735dfe48b2fbac327851d75b0dc72_360.png?imageMogr2/auto-orient/strip",
# "order" : 1,
# "title" : "我的大刀四十米(测试版)",
# "author" : "厂商: 睡神飞工作室",
# "score" : 8.6,
# "desc" : "明明可以靠抄袭偏偏却要死磕原创。我不知道明明是谁反正我们是偏偏。全新玩法io类游戏来啦魔性欢快地砍来砍去吧开局一把刀越砍刀越长谁挡我砍谁我长我有理不服来砍我睡神飞工作室继《我飞刀玩得贼6》之后又一新品魔性翻倍快乐翻倍",
# "cate" : "休闲",
# "gameid" : 176279,
# "date" : "2019-09-30",
# "watch" : 118674,
# "reserve" : 0,
# "sell" : 0,
# "download" : 0,
# "review" : 274,
# "topic" : 49,
# "__v" : 0,
# "createdAt" : ISODate("2019-10-01T02:07:51.256Z"),
# "updatedAt" : ISODate("2019-10-01T02:07:51.256Z")
# }
from ops.mtga import FromTga
from ops.plog import define_logger
import logging
import datetime
import sys
from bson.objectid import ObjectId
import json
define_logger("/data/logs/ops/reptile2ss.log")
log = logging.getLogger(__name__)
MONGOINFO = {'host': '10.10.5.6', 'port': 27017}
conver_float = ('score',)
conver_int = ('order', 'gameid', 'watch', 'reserve', 'sell', 'download', 'review', 'topic')
class CollectMongo():
def __init__(self, item):
self.url = "http://10.10.3.17:8992/querySql"
self.api_secret = "n9H4R32ZcjtSeN89ljCY6ESzTmOlnwwnOB3r4YsggnP5M1AXLtKtiS4sS1KKLOEQ"
self.tga = FromTga(url=self.url, token=self.api_secret)
self.item = item
self.gameid = self.item.get('gameid')
self.tgaid = self.item.get('tgaid')
# 初始化tga写方法初始化
self.tga.init_tga_write(self.tgaid)
def get_data_from_mongo(self):
dbset = MONGOINFO.copy()
dbset['dbname'] = self.item.get('db')
dbset['collname'] = self.item.get('coll')
m = MongodbBase(**dbset)
data = list(m.query(self.item.get('pipeline')))
# print(data)
return data
def write2ss(self, data):
event_name = self.item.get('event_name')
run_false = False
try:
nums = len(data)
except:
nums = 0
for line in data:
if line:
for key in line.keys():
if isinstance(line[key], ObjectId):
line[key] = str(line[key]) # line[key]=ObjectId(base64.b64decode(key))
elif isinstance(line[key], (list, tuple, dict)):
line[key] = json.dumps(line[key])
line['account_id'] = str(line['object_id'])
line['#time'] = line['createdAt']
try:
line['tags'] = ",".join(json.loads(line['tags']))
except Exception:
log.error(f"correct {line['tags']} Failed")
line['tags'] = ""
for item in conver_float:
try:
line[item] = float(line[item])
except Exception:
log.error(f"correct {line[item]} Failed")
line[item] = 0
for item in conver_int:
try:
line[item] = int(line[item])
except Exception:
log.error(f"correct {line[item]} Failed")
line[item] = 0
if not self.tga.put_event_data(line, event_name):
log.error("write {}_{} event error! {}\n".format(event_name, self.tgaid, line))
run_false = True
if not run_false:
log.info("write {}_{} num={} success\n".format(event_name, self.tgaid, nums))
def run(self):
data = self.get_data_from_mongo()
self.write2ss(data)
def read_mongo(start_date, end_date):
print(f"start ={start_date},end={end_date}")
all_type = []
reptile_4_1 = {}
reptile_4_1['db'] = "taptap"
reptile_4_1['coll'] = "games"
reptile_4_1['event_name'] = "reptile_4_1"
# reptile_4_1['gameid'] = 9999
reptile_4_1['tgaid'] = 6
reptile_4_1['pipeline'] = [{'$match': {'createdAt': {'$gte': start_date, '$lt': end_date}}}, {
'$project': {'tags': 1, 'source': 1, 'cateName': 1, 'icon': 1, 'order': 1, 'title': 1, 'author': 1, 'score': 1,
'cate': 1, 'gameid': 1, 'watch': 1, 'reserve': 1, 'sell': 1, 'download': 1,
'review': 1, 'topic': 1, 'createdAt': 1, 'updatedAt': 1, 'object_id': '$_id', '_id': 0}}]
all_type.append(reptile_4_1)
for item in all_type:
cc = CollectMongo(item)
cc.run()
if __name__ == "__main__":
if len(sys.argv) == 2:
try:
d = datetime.datetime.strptime(sys.argv[1], "%Y-%m-%d")
except Exception:
log.error(f"split args with start time failed ,args={d}", exc_info=True)
raise Exception(f"split args with start time failed ,args={d}")
else:
d = datetime.datetime.now() - datetime.timedelta(days=1)
start_date = datetime.datetime(d.year, d.month, d.day)
end_date = datetime.datetime(d.year, d.month, d.day, 23, 59, 59)
read_mongo(start_date, end_date)