添加实时数据采集
This commit is contained in:
parent
cc16077385
commit
8302ea74ce
22
config/config_real.py
Normal file
22
config/config_real.py
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
# gameid,channel,event_type
|
||||||
|
is_debug = True
|
||||||
|
|
||||||
|
DB = {'user': 'mytga', 'pswd': 'gzVwh4HGR68G', 'host': '10.10.3.5', 'db': 'games_report'}
|
||||||
|
adv_id_state = {'1': '重新拉取', '0': '显示'}
|
||||||
|
ad_type = {'1': '视屏启动', '2': '视频错误回调', '101': 'banner显示', '201': '插屏显示'}
|
||||||
|
|
||||||
|
# if not is_debug:
|
||||||
|
# event_list = (
|
||||||
|
# (1016, 6001, 'shop_1016'), (1016, 6001, 'shop_new_1016'))
|
||||||
|
# else:
|
||||||
|
# event_list = (
|
||||||
|
# (1004, 6001, 'tap_1004'), (1004, 6001, 'tap_new_1004'), (1004, 6001, 'share'), (1004, 6001, 'share_new'),
|
||||||
|
# (1004, 6001, 'stage'), (1004, 6001, 'stage_new'), (1004, 6001, 'ad_video'), (1004, 6001, 'ad_video_new'),
|
||||||
|
# (1016, 6001, 'shop_1016'), (1016, 6001, 'shop_new_1016'), (1016, 6001, 'tap_1016'),
|
||||||
|
# (1016, 6001, 'tap_new_1016'), (2001, 6001, 'tap_2001'), (2001, 6001, 'tap_new_2001'),
|
||||||
|
# (1004, 6001, 'items_produce'), (1004, 6001, 'items_consum'), (1004, 6001, 'share_map'))
|
||||||
|
|
||||||
|
|
||||||
|
# event_list = ((1004, 6001, 'items_produce'), (1004, 6001, 'items_consum'),(1004, 6001, 'share_map'))
|
1
real_task/__init__.py
Normal file
1
real_task/__init__.py
Normal file
@ -0,0 +1 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
1
real_task/tasks_base_real.py
Normal file
1
real_task/tasks_base_real.py
Normal file
@ -0,0 +1 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
59
real_tasks.py
Normal file
59
real_tasks.py
Normal file
@ -0,0 +1,59 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
from ops.mmysql import MysqlBase
|
||||||
|
import logging
|
||||||
|
from multiprocessing import Pool, cpu_count
|
||||||
|
import datetime
|
||||||
|
from real_task.tasks_base_real import *
|
||||||
|
from config.config_real import *
|
||||||
|
import sys
|
||||||
|
from ops.plog import define_logger
|
||||||
|
|
||||||
|
define_logger("/data/logs/data_collect_real.log")
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def get_event():
|
||||||
|
sql = f"""SELECT gameid,channelid,method_name FROM methods WHERE in_used=1"""
|
||||||
|
db = MysqlBase(**DB)
|
||||||
|
data = db.query(sql)
|
||||||
|
events = []
|
||||||
|
if data:
|
||||||
|
for line in data:
|
||||||
|
if line:
|
||||||
|
events.append(line)
|
||||||
|
return events
|
||||||
|
|
||||||
|
|
||||||
|
def run_tasks():
|
||||||
|
args = get_event()
|
||||||
|
pool = Pool(processes=cpu_count())
|
||||||
|
pool.map(simple_work, args)
|
||||||
|
pool.close()
|
||||||
|
pool.join()
|
||||||
|
|
||||||
|
|
||||||
|
class CollectGameData():
|
||||||
|
def __init__(self, times):
|
||||||
|
self.times = times
|
||||||
|
|
||||||
|
def workflow(self, line):
|
||||||
|
kwargs = {}
|
||||||
|
kwargs['gameid'], kwargs['channelid'], kwargs['event_type'] = line
|
||||||
|
kwargs['date'] = self.times
|
||||||
|
|
||||||
|
func = f"run_event_{kwargs['event_type']}(kwargs)"
|
||||||
|
log.info(f"run {func} kwargs={kwargs}!")
|
||||||
|
eval(func)
|
||||||
|
|
||||||
|
|
||||||
|
def simple_work(line):
|
||||||
|
try:
|
||||||
|
times = sys.argv[1]
|
||||||
|
except:
|
||||||
|
times = (datetime.date.today() - datetime.timedelta(days=1)).strftime('%Y-%m-%d')
|
||||||
|
cc = CollectGameData(times)
|
||||||
|
cc.workflow(line)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
run_tasks()
|
Loading…
x
Reference in New Issue
Block a user