新增实时数据采集脚本

This commit is contained in:
pengtao 2019-08-19 16:45:37 +08:00
parent 8302ea74ce
commit 06604cedda
5 changed files with 144 additions and 21 deletions

View File

@ -3,20 +3,5 @@
# gameid,channel,event_type
is_debug = True
DB = {'user': 'mytga', 'pswd': 'gzVwh4HGR68G', 'host': '10.10.3.5', 'db': 'games_report'}
adv_id_state = {'1': '重新拉取', '0': '显示'}
ad_type = {'1': '视屏启动', '2': '视频错误回调', '101': 'banner显示', '201': '插屏显示'}
# if not is_debug:
# event_list = (
# (1016, 6001, 'shop_1016'), (1016, 6001, 'shop_new_1016'))
# else:
# event_list = (
# (1004, 6001, 'tap_1004'), (1004, 6001, 'tap_new_1004'), (1004, 6001, 'share'), (1004, 6001, 'share_new'),
# (1004, 6001, 'stage'), (1004, 6001, 'stage_new'), (1004, 6001, 'ad_video'), (1004, 6001, 'ad_video_new'),
# (1016, 6001, 'shop_1016'), (1016, 6001, 'shop_new_1016'), (1016, 6001, 'tap_1016'),
# (1016, 6001, 'tap_new_1016'), (2001, 6001, 'tap_2001'), (2001, 6001, 'tap_new_2001'),
# (1004, 6001, 'items_produce'), (1004, 6001, 'items_consum'), (1004, 6001, 'share_map'))
# event_list = ((1004, 6001, 'items_produce'), (1004, 6001, 'items_consum'),(1004, 6001, 'share_map'))
My_influx = {'host': '10.10.3.19', 'port': 8086, 'username': 'miles', 'password': 'aspect', 'database': 'useronline'}
influx_meas = 'real_game_data'

View File

@ -16,6 +16,8 @@ class GetFromTga:
def __init__(self, **kwargs):
# log.info("begin collect gameid={},times={}!".format(kwargs.get('gameid'), kwargs.get('date')))
self.date = kwargs.get('date')
self.b_time=kwargs.get('b_time',None)
self.e_time = kwargs.get('e_time', None)
self.gameid = kwargs.get('gameid')
self.channelid = kwargs.get('channelid')
self.event_type = kwargs.get('event_type')

30
ops/myinflux.py Normal file
View File

@ -0,0 +1,30 @@
# -*- coding: utf-8 -*-
import logging
from influxdb import InfluxDBClient, exceptions
from config.config_real import My_influx
log = logging.getLogger(__name__)
class Myinflux:
def __init__(self):
self.influxdb = InfluxDBClient(**My_influx)
def write(self, body):
self._write(body)
def _write(self, body):
try:
self.influxdb.write_points(body)
return True
except exceptions.InfluxDBClientError:
log.error(f"write {body} to influx failed!", exc_info=True)
return False
def _read(self, query):
try:
result = self.client.query(query)
return result
except exceptions.InfluxDBClientError:
log.error(f"write {query} to influx failed!", exc_info=True)
return False

View File

@ -1 +1,103 @@
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
from mytask.tga import FromTga
from config.config_real import influx_meas
from ops.myinflux import Myinflux
class RealData(FromTga):
def event_user(self):
sql_new_user = f"""select
count(DISTINCT "#account_id") ,
ad_channel
from
v_event_{self.suffix}
where
"$part_event"='event_11_1'
and gameid='{self.gameid}'
and channel='{self.channelid}'
and "account_register_date" between timestamp'{self.b_time}' and timestamp'{self.e_time}'
GROUP BY ad_channel"""
sql_login_user = f"""select
count(1),
count(distinct "#account_id") ,
ad_channel
from
v_event_{self.suffix}
where
"$part_event"='event_11_1'
and gameid='{self.gameid}'
and channel='{self.channelid}'
and "#server_time" between timestamp'{self.b_time}' and timestamp'{self.e_time}'
GROUP BY ad_channel"""
out_new = self.get_data(sql_new_user)
out_login = self.get_data(sql_login_user)
data = {}
try:
for line in out_new:
new_user, ad_channel = line
data[ad_channel]['new_user'] = new_user
except:
pass
try:
for l in out_login:
logins, login_user, ad_channel = l
data[ad_channel]['logins'] = logins
data[ad_channel]['login_user'] = login_user
except:
pass
for key in data.keys():
body = [{"measurement": influx_meas,
"tags": {"bengin": self.b_time, "end": self.e_time, "gameid": self.gameid, "type": "real_event_user",
"channelid": self.channelid},
"fields": {"new_user": data[key]['new_user'], "logins": data[key]['logins'],
"login_user": data[key]['login_user']}}]
influx = Myinflux()
influx.write(body)
def event_share(self):
sql = f"""select
count(1),
count(distinct "#account_id"),
ad_channel
from
v_event_{self.suffix}
where
"$part_event"='event_11_10'
and gameid='{self.gameid}'
and channel='{self.channelid}'
and "#server_time" between timestamp'{self.b_time}' and timestamp'{self.e_time}'
group by
ad_channel"""
out = self.get_data(sql)
data = {}
for line in out:
try:
shares, share_people, ad_channel = line
data[ad_channel]['shares'] = shares
data[ad_channel]['share_people'] = share_people
except:
pass
for key in data.keys():
body = [{"measurement": influx_meas,
"tags": {"bengin": self.b_time, "end": self.e_time, "gameid": self.gameid, "type": "real_event_share",
"channelid": self.channelid},
"fields": {"shares": data[key]['shares'], "share_people": data[key]['share_people']}}]
influx = Myinflux()
influx.write(body)
def real_event_user(kwargs):
# new_user,active_user,login_user
rd = RealData(**kwargs)
rd.event_user()
def real_event_share(kwargs):
# share_by_people,share_nums
rd = RealData(**kwargs)
rd.event_share()

View File

@ -33,15 +33,19 @@ def run_tasks():
class CollectGameData():
def __init__(self, times):
def __init__(self, times, b_time, e_time):
self.times = times
self.b_time = b_time
self.e_time = e_time
def workflow(self, line):
kwargs = {}
kwargs['gameid'], kwargs['channelid'], kwargs['event_type'] = line
kwargs['date'] = self.times
func = f"run_event_{kwargs['event_type']}(kwargs)"
kwargs['b_time'] = self.b_time
kwargs['e_time'] = self.e_time
func = f"real_event_{kwargs['event_type']}(kwargs)"
log.info(f"run {func} kwargs={kwargs}!")
eval(func)