# -*- coding: utf-8 -*- import requests import json import datetime import os from tgasdk.sdk import * import pdb import pymysql from ops.mmysql import MysqlBase # 初始化命令:pip install ThinkingDataSdk class TgaUserData(object): def __init__(self): self.gameid = 0 self.accountid = "" self.distinctid = "" self._nickname = "" self._sex = 0 self._avatar_url = "" self._city = "" self._province = "" self._country = "" self.createtime = "" self.is_hide = 0 self.lastlogon = "" self.firstlogonip = "" # 未实现 先占坑 self.is_real = 1 self.unionid = "" self.wid = "" self.channel = 0 self.sid = "" self.ptid = "" self.from_appid = "" self.gamescore_his = 0 self.gamescore_week = 0 self.gamescore_last = 0 def check_byte(self, i): if not i: return "" elif isinstance(i, bytes): return pymysql.escape_string(i.decode()) else: return pymysql.escape_string(i) @property def nickname(self): return self._nickname @nickname.setter def nickname(self, nickname): self._nickname = self.check_byte(nickname) @property def sex(self): return self._sex @sex.setter def sex(self, sex): if sex: try: self._sex = int(sex) except Exception as e: print("ERROR get sex property from tga failed! output was {} ".format(e)) self._sex = 0 else: self._sex = 0 @property def city(self): return self._city @city.setter def city(self, city): self._city = self.check_byte(city) @property def avatar_url(self): return self._avatar_url @avatar_url.setter def avatar_url(self, avatar_url): self._avatar_url = self.check_byte(avatar_url) @property def province(self): return self._province @province.setter def province(self, province): self._province = self.check_byte(province) @property def country(self): return self._country @country.setter def country(self, country): self._country = self.check_byte(country) @property def distinctid(self): return self._distinctid @distinctid.setter def distinctid(self, distinctid): self._distinctid = self.check_byte(distinctid) @property def from_appid(self): return self._from_appid @from_appid.setter def from_appid(self, from_appid): self._from_appid = self.check_byte(from_appid) class CheckWords(object): def __init__(self): pass def run(self, item): if not item: return 0 if isinstance(item, (str, bytes, int, float)): try: return int(item) except Exception as e: print("return values failed,output was {}".format(e)) return 0 if isinstance(item, (list, tuple)): return self.run(item[0]) # 按提供的gameid从tga数据库中查询并输出其对应的url,suffix,appid,api_key,tgaid以dict形式存放 class GetTgaConfig(): def __init__(self): self.url = "http://10.10.3.17:8992/querySql" self.TGA = {'user': 'mytga', 'pswd': 'gzVwh4HGR68G', 'host': '10.10.3.5', 'db': 'tga'} def get_api_key(self, gameid, channel=6001): item = {} item['url'] = self.url sql = "select suffix,appid,api_secret,tgaid from tgainfo where gameid={} and channelid={} and " \ "in_used=1;".format(gameid, channel) t = MysqlBase(**self.TGA) data = t.query(sql) if data: item['suffix'], item['appid'], item['api_key'], item['tgaid'] = data[0] return item class FromTga: def __init__(self, url, token): if not token: raise Exception("{0} token not found in env !") self.url = url self.token = token self.output = "/data/logs/tga-report/" def get_data(self, sql): data = {'token': self.token, 'sql': sql} r = requests.post(self.url, data) if r.status_code != requests.codes.ok: print("connect tga failed!") return None out = r.content.decode('utf-8') if json.loads(out.split('\r\n')[0]).get('return_code', None) != 0: # raise Exception("get data from tga failed!") print("get data from tga failed!,sql was {}".format(sql)) return None data_out = out.split('\r\n')[1:] output = list() for row in data_out: if row: try: output.append(json.loads(row)) except Exception as e: print("转化数据失败,{} 提示为{}".format(row, e)) return output def init_tga_write(self, tgaid): ''' from tgasdk.sdk import TGAnalytics, BatchConsumer, LoggingConsumer, AsyncBatchConsumer 也可引入TGAnalytics与指定的Consumer ''' # now = datetime.date.today().strftime('%Y%m%d%H%M%S') # # 初始化SDK # filename = "{}/report_{}_{}.log".format(self.output, project, now) # 检查目录是否存在,如不存在创建之 paths = "{}/{}".format(self.output, tgaid) if not os.path.isdir(paths): os.makedirs(paths, mode=0o755) self.tga = TGAnalytics(LoggingConsumer(paths)) def _close_tga(self, tga): self.tga.flush() self.tga.close() def _split_user_data(self, data): distinct_id = data.get('distinct_id', None) account_id = data.get('account_id', None) if not (distinct_id or account_id): print("distinct_id 或 account_id 必须有一则有值!") return None if distinct_id: data.pop('distinct_id') if account_id: data.pop('account_id') return distinct_id, account_id, data def put_event_data(self, data, event_name="Payment"): # tga = self._init_tga_write() try: distinct_id, account_id, new_data = self._split_user_data(data) except Exception as e: print("拆解数据错误,输出为{}.请检查!".format(e)) return False # properties = { # "#time": datetime.datetime.now(), # "#ip": "192.168.1.1", # "Product_Name": "月卡", # "Price": 30, # "OrderId": "abc_123" # } # 上传事件,包含账号ID与访客ID try: self.tga.track(distinct_id, account_id, event_name, new_data) except Exception as e: print("write to tga failed,output was {}".format(e)) # self._close_tga(tga) return False # finally: # self._close_tga(tga) return True def put_user_data(self, data, method='user_set'): # tga = self._init_tga_write() try: distinct_id, account_id, new_data = self._split_user_data(data) except Exception as e: print("拆解数据错误,输出为{}.请检查!".format(e)) return False try: if method.lower() == "user_set": self.tga.user_set(account_id=account_id, distinct_id=distinct_id, properties=new_data) elif method.lower() == "user_setonce": self.tga.user_setOnce(account_id=account_id, distinct_id=distinct_id, properties=new_data) elif method.lower() == "user_add": self.tga.user_add(account_id=account_id, distinct_id=distinct_id, properties=new_data) elif method.lower() == "user_del": self.tga.user_del(account_id=account_id, distinct_id=distinct_id) else: print("请提供用户操作类型 [user_set/user_setOnce/user_add/user_del] !") return False except Exception as e: print("write to tga failed,output was {}".format(e)) return False # finally: # self._close_tga(tga) return True def main(): url = "http://10.10.3.17:8992/querySql" # sql = "SELECT \"#server_time\",localuuid,ext FROM v_event_3 where \"#event_name\"='event_1_1'" sql = "SELECT distinct from_appid FROM v_event_22 where \"$part_event\"='event_11_1' and gameid='2001' and \"$part_date\"='2019-06-18'" token = "ESnhwwLtVu7zO2h6SSTEZ1jYagbOet0Kur0XnpG9fVJF5ROsqUkcNO0inVyFtQd1" t = FromTga(url, token) # t._init_tga_write() # data={'account_id': 1012, 'distinct_id': 1012, 'gameid': 1012, 'from_appid': 'wx62d9035fd4fd2059', 'time': '2019-03-10', # 'new_user': 2, 'active_user': 4, 'avg_runing_time': 0, 'old_user_login': 2, 'newuser_rungame': 2, # 'newuser_rungame_rate': 100.0, 'newuser_qlty': 0} # t.put_event_data(data) print(json.dumps(t.get_data(sql))) if __name__ == "__main__": main()