diff --git a/config/config_real.py b/config/config_real.py index ef7a886..5694db3 100644 --- a/config/config_real.py +++ b/config/config_real.py @@ -3,20 +3,5 @@ # gameid,channel,event_type is_debug = True -DB = {'user': 'mytga', 'pswd': 'gzVwh4HGR68G', 'host': '10.10.3.5', 'db': 'games_report'} -adv_id_state = {'1': '重新拉取', '0': '显示'} -ad_type = {'1': '视屏启动', '2': '视频错误回调', '101': 'banner显示', '201': '插屏显示'} - -# if not is_debug: -# event_list = ( -# (1016, 6001, 'shop_1016'), (1016, 6001, 'shop_new_1016')) -# else: -# event_list = ( -# (1004, 6001, 'tap_1004'), (1004, 6001, 'tap_new_1004'), (1004, 6001, 'share'), (1004, 6001, 'share_new'), -# (1004, 6001, 'stage'), (1004, 6001, 'stage_new'), (1004, 6001, 'ad_video'), (1004, 6001, 'ad_video_new'), -# (1016, 6001, 'shop_1016'), (1016, 6001, 'shop_new_1016'), (1016, 6001, 'tap_1016'), -# (1016, 6001, 'tap_new_1016'), (2001, 6001, 'tap_2001'), (2001, 6001, 'tap_new_2001'), -# (1004, 6001, 'items_produce'), (1004, 6001, 'items_consum'), (1004, 6001, 'share_map')) - - -# event_list = ((1004, 6001, 'items_produce'), (1004, 6001, 'items_consum'),(1004, 6001, 'share_map')) +My_influx = {'host': '10.10.3.19', 'port': 8086, 'username': 'miles', 'password': 'aspect', 'database': 'useronline'} +influx_meas = 'real_game_data' diff --git a/mytask/tga.py b/mytask/tga.py index 0607f28..6086c64 100644 --- a/mytask/tga.py +++ b/mytask/tga.py @@ -16,6 +16,8 @@ class GetFromTga: def __init__(self, **kwargs): # log.info("begin collect gameid={},times={}!".format(kwargs.get('gameid'), kwargs.get('date'))) self.date = kwargs.get('date') + self.b_time=kwargs.get('b_time',None) + self.e_time = kwargs.get('e_time', None) self.gameid = kwargs.get('gameid') self.channelid = kwargs.get('channelid') self.event_type = kwargs.get('event_type') diff --git a/ops/myinflux.py b/ops/myinflux.py new file mode 100644 index 0000000..3bb9475 --- /dev/null +++ b/ops/myinflux.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- +import logging +from influxdb import InfluxDBClient, exceptions +from config.config_real import My_influx + +log = logging.getLogger(__name__) + + +class Myinflux: + def __init__(self): + self.influxdb = InfluxDBClient(**My_influx) + + def write(self, body): + self._write(body) + + def _write(self, body): + try: + self.influxdb.write_points(body) + return True + except exceptions.InfluxDBClientError: + log.error(f"write {body} to influx failed!", exc_info=True) + return False + + def _read(self, query): + try: + result = self.client.query(query) + return result + except exceptions.InfluxDBClientError: + log.error(f"write {query} to influx failed!", exc_info=True) + return False diff --git a/real_task/tasks_base_real.py b/real_task/tasks_base_real.py index 7c68785..b4c8781 100644 --- a/real_task/tasks_base_real.py +++ b/real_task/tasks_base_real.py @@ -1 +1,103 @@ -# -*- coding: utf-8 -*- \ No newline at end of file +# -*- coding: utf-8 -*- +from mytask.tga import FromTga +from config.config_real import influx_meas +from ops.myinflux import Myinflux + + +class RealData(FromTga): + def event_user(self): + sql_new_user = f"""select + count(DISTINCT "#account_id") , + ad_channel + from + v_event_{self.suffix} + where + "$part_event"='event_11_1' + and gameid='{self.gameid}' + and channel='{self.channelid}' + and "account_register_date" between timestamp'{self.b_time}' and timestamp'{self.e_time}' + GROUP BY ad_channel""" + sql_login_user = f"""select + count(1), + count(distinct "#account_id") , + ad_channel + from + v_event_{self.suffix} + where + "$part_event"='event_11_1' + and gameid='{self.gameid}' + and channel='{self.channelid}' + and "#server_time" between timestamp'{self.b_time}' and timestamp'{self.e_time}' + GROUP BY ad_channel""" + + out_new = self.get_data(sql_new_user) + out_login = self.get_data(sql_login_user) + data = {} + try: + for line in out_new: + new_user, ad_channel = line + data[ad_channel]['new_user'] = new_user + except: + pass + + try: + for l in out_login: + logins, login_user, ad_channel = l + data[ad_channel]['logins'] = logins + data[ad_channel]['login_user'] = login_user + except: + pass + + for key in data.keys(): + body = [{"measurement": influx_meas, + "tags": {"bengin": self.b_time, "end": self.e_time, "gameid": self.gameid, "type": "real_event_user", + "channelid": self.channelid}, + "fields": {"new_user": data[key]['new_user'], "logins": data[key]['logins'], + "login_user": data[key]['login_user']}}] + influx = Myinflux() + influx.write(body) + + + def event_share(self): + sql = f"""select + count(1), + count(distinct "#account_id"), + ad_channel + from + v_event_{self.suffix} + where + "$part_event"='event_11_10' + and gameid='{self.gameid}' + and channel='{self.channelid}' + and "#server_time" between timestamp'{self.b_time}' and timestamp'{self.e_time}' + group by + ad_channel""" + out = self.get_data(sql) + data = {} + for line in out: + try: + shares, share_people, ad_channel = line + data[ad_channel]['shares'] = shares + data[ad_channel]['share_people'] = share_people + except: + pass + + for key in data.keys(): + body = [{"measurement": influx_meas, + "tags": {"bengin": self.b_time, "end": self.e_time, "gameid": self.gameid, "type": "real_event_share", + "channelid": self.channelid}, + "fields": {"shares": data[key]['shares'], "share_people": data[key]['share_people']}}] + influx = Myinflux() + influx.write(body) + + +def real_event_user(kwargs): + # new_user,active_user,login_user + rd = RealData(**kwargs) + rd.event_user() + + +def real_event_share(kwargs): + # share_by_people,share_nums + rd = RealData(**kwargs) + rd.event_share() diff --git a/real_tasks.py b/real_tasks.py index ca3464c..40a8f61 100644 --- a/real_tasks.py +++ b/real_tasks.py @@ -33,15 +33,19 @@ def run_tasks(): class CollectGameData(): - def __init__(self, times): + def __init__(self, times, b_time, e_time): self.times = times + self.b_time = b_time + self.e_time = e_time + def workflow(self, line): kwargs = {} kwargs['gameid'], kwargs['channelid'], kwargs['event_type'] = line kwargs['date'] = self.times - - func = f"run_event_{kwargs['event_type']}(kwargs)" + kwargs['b_time'] = self.b_time + kwargs['e_time'] = self.e_time + func = f"real_event_{kwargs['event_type']}(kwargs)" log.info(f"run {func} kwargs={kwargs}!") eval(func)