diff --git a/config/__init__.py b/config/__init__.py new file mode 100644 index 0000000..7c68785 --- /dev/null +++ b/config/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- \ No newline at end of file diff --git a/config.py b/config/config.py similarity index 89% rename from config.py rename to config/config.py index de8dd59..fdadeab 100644 --- a/config.py +++ b/config/config.py @@ -3,6 +3,7 @@ # gameid,channel,event_type is_debug = True +DB = {'user': 'mytga', 'pswd': 'gzVwh4HGR68G', 'host': '10.10.3.5', 'db': 'games_report'} adv_id_state = {'1': '重新拉取', '0': '显示'} ad_type = {'1': '视屏启动', '2': '视频错误回调', '101': 'banner显示', '201': '插屏显示'} diff --git a/mytask/tasks_1004.py b/mytask/tasks_1004.py index 798e2a6..dc3342f 100644 --- a/mytask/tasks_1004.py +++ b/mytask/tasks_1004.py @@ -1,10 +1,8 @@ # -*- coding: utf-8 -*- -from tga import GetFromTga +from mytask.tga import GetFromTga import logging import json -import pdb - log = logging.getLogger(__name__) diff --git a/mytask/tasks_1016.py b/mytask/tasks_1016.py index 150cff1..d3441d3 100644 --- a/mytask/tasks_1016.py +++ b/mytask/tasks_1016.py @@ -1,9 +1,6 @@ # -*- coding: utf-8 -*- -from tga import GetFromTga +from mytask.tga import GetFromTga import logging -import json - -import pdb log = logging.getLogger(__name__) diff --git a/mytask/tasks_2001.py b/mytask/tasks_2001.py index 6560c8c..b8d4fac 100644 --- a/mytask/tasks_2001.py +++ b/mytask/tasks_2001.py @@ -1,9 +1,6 @@ # -*- coding: utf-8 -*- -from tga import GetFromTga +from mytask.tga import GetFromTga import logging -import json - -import pdb log = logging.getLogger(__name__) tap_items = {'1': '登录按钮', '2': '进入游戏主界面', '3': '单人模式', '4': '组队模式', '5': '角色', '6': '观看视频试用皮肤按钮', diff --git a/mytask/tasks_base.py b/mytask/tasks_base.py index c5a5ea4..e9542da 100644 --- a/mytask/tasks_base.py +++ b/mytask/tasks_base.py @@ -1,7 +1,6 @@ # -*- coding: utf-8 -*- -from tga import GetFromTga +from mytask.tga import GetFromTga import logging -import json log = logging.getLogger(__name__) diff --git a/tga.py b/mytask/tga.py similarity index 99% rename from tga.py rename to mytask/tga.py index b0878e3..8fcb3f1 100644 --- a/tga.py +++ b/mytask/tga.py @@ -4,7 +4,7 @@ from ops.plog import define_logger import logging import requests import json -from config import is_debug, adv_id_state, ad_type +from config.config import is_debug, adv_id_state, ad_type define_logger("/data/logs/reports.log") log = logging.getLogger(__name__) diff --git a/ops/__init__.py b/ops/__init__.py new file mode 100644 index 0000000..7c68785 --- /dev/null +++ b/ops/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- \ No newline at end of file diff --git a/ops/mmysql.py b/ops/mmysql.py new file mode 100644 index 0000000..0d41a10 --- /dev/null +++ b/ops/mmysql.py @@ -0,0 +1,171 @@ +#!/usr/bin/env python +# -*- coding:utf-8 -*- + +""" +date: 2016/07/11 +role: mysql的增删改查类 +usage: m = mysqlBase(host='xxx',db='xxx',user='xxx',pwd='xxx') 实例化 + m.insert('core',{'host_name':'ccc','process_name':'ddd','ip_addr':'192.168.136.41','status':'4'}) + m.update('table',{'field1':'value1','field2':'value2'},'id=1') 更新表名 字段名:值 条件 + m.delete('core','status=5 and id=12') + m.change("update core set a='aaa' where id=1") 可以多条插入 + m.query("select * from core") +""" +import logging +import warnings + +try: + import MySQLdb +except: + import pymysql + + pymysql.install_as_MySQLdb() + import MySQLdb + +log = logging.getLogger(__name__) + + +###mysql操作类 +class MysqlBase: + + ###连接数据库 + def __init__(self, **args): + + ###获取参数 + self.host = args.get('host', 'localhost') + self.user = args.get('user') + self.pswd = args.get('pswd') + self.db = args.get('db', 'mysql') + self.port = args.get('port', '3306') + self.charset = args.get('charset', 'utf8') + + try: + self.conn = MySQLdb.connect(host=self.host, user=self.user, passwd=self.pswd, db=self.db, + port=int(self.port), charset=self.charset) + + self.curs = self.conn.cursor() + + self.curs.execute('SET NAMES utf8') + except: + log.error('%s mysql connect error' % self.host) + raise ValueError('mysql connect error %s' % self.host) + + ###释放资源 + def __del__(self): + self.curs.close() + self.conn.close() + + ###插入 + + def insert(self, table, data): + _field = ','.join(['`%s`' % (k_insert) for k_insert in data.keys()]) + _value = ','.join(["'%s'" % (str(v_insert).replace("'", "\'")) for v_insert in data.values()]) + ###拼接成sql语句 + _sql = 'INSERT INTO `%s`(%s) VALUES(%s)' % (table, _field, _value) + + ###执行 + self.curs.lastrowid = 0 + try: + self.curs.execute(_sql) + ###提交 + self.conn.commit() # log.info('%s insert ' % _sql) + + except: + self.conn.rollback() + log.error('%s insert error' % _sql) + raise ValueError('112,insert error %s' % _sql) + + return self.curs.lastrowid + + ###更新 + def update(self, table, data, condition): + _field = ','.join(["`%s`='%s'" % (k_update, str(data[k_update]).replace("'", "\'")) for k_update in data]) + + _sql = 'UPDATE `%s` SET %s WHERE %s' % (table, _field, condition) + + ###执行 + resNum = 0 + try: + resNum = self.curs.execute(_sql) + ###提交 + self.conn.commit() # log.info('%s update ' % _sql) + except: + self.conn.rollback() + log.error('%s update error' % _sql) + raise ValueError('update error %s' % _sql) + + return resNum + + ###删除 + def delete(self, table, condition): + _sql = 'DELETE FROM `%s` WHERE %s' % (table, condition) + + ###执行 + resNum = 0 + try: + resNum = self.curs.execute(_sql) + ###提交 + self.conn.commit() # log.info('%s delete ' % _sql) + + except: + self.conn.rollback() + log.error('%s delete error' % _sql) + raise ValueError('112,delete error %s' % _sql) + + return resNum + + ###直接给修改语句执行 + def change(self, sql, many=False): + ###过滤unknow table的warning + warnings.filterwarnings('ignore') + resNum = 0 + if many: + try: + ###多条同时插入 + resNum = self.curs.executemany(sql, many) + self.conn.commit() # log.info('%s exec ' % sql) + + except: + self.conn.rollback() + log.error('%s exec error' % sql) + raise ValueError('exec error %s' % sql) + else: + try: + resNum = self.curs.execute(sql) + ###提交 + self.conn.commit() # log.info('%s exec ' % sql) + + except: + self.conn.rollback() + log.error('%s exec error' % sql) + raise ValueError('112,exec error %s' % sql) + + return resNum + + ###查询 + def query(self, sql): + res = '' + try: + self.curs.execute(sql) + res = self.curs.fetchall() # log.info('%s query ' % sql) + + except: + log.error('%s query error' % sql) + + # raise ValueError('query error %s'% sql) + + return res + + +if __name__ == "__main__": + args = dict() + args['host'] = '172.16.17.164' + args['user'] = 'miles' + args['pswd'] = 'aspect' + args['db'] = 'test' + sql_sel = "SELECT * FROM bigdata_host LIMIT 5" + m = MysqlBase(**args) + data = m.query(sql=sql_sel) + m.insert('bigdata_host', + {'hostname': 'ccc', 'remark': 'ddd', 'up_addr_p': '192.168.136.41', 'states': '4', 'enter_time': '2017-03-13'}) + m.delete('bigdata_host', 'hostname="ccc"') diff --git a/ops/mtga.py b/ops/mtga.py new file mode 100644 index 0000000..6998ab9 --- /dev/null +++ b/ops/mtga.py @@ -0,0 +1,293 @@ +# -*- coding: utf-8 -*- + +import requests +import json +import datetime +import os +from tgasdk.sdk import * +import pdb +import pymysql +from ops.mmysql import MysqlBase + + + +# 初始化命令:pip install ThinkingDataSdk + +class TgaUserData(object): + def __init__(self): + self.gameid = 0 + self.accountid = "" + self.distinctid = "" + self._nickname = "" + self._sex = 0 + self._avatar_url = "" + self._city = "" + self._province = "" + self._country = "" + self.createtime = "" + self.is_hide = 0 + self.lastlogon = "" + self.firstlogonip = "" + # 未实现 先占坑 + self.is_real = 1 + self.unionid = "" + self.wid = "" + self.channel = 0 + self.sid = "" + self.ptid = "" + self.from_appid = "" + self.gamescore_his = 0 + self.gamescore_week = 0 + self.gamescore_last = 0 + + def check_byte(self, i): + if not i: + return "" + elif isinstance(i, bytes): + return pymysql.escape_string(i.decode()) + else: + return pymysql.escape_string(i) + + @property + def nickname(self): + return self._nickname + + @nickname.setter + def nickname(self, nickname): + self._nickname = self.check_byte(nickname) + + @property + def sex(self): + return self._sex + + @sex.setter + def sex(self, sex): + if sex: + try: + self._sex = int(sex) + except Exception as e: + print("ERROR get sex property from tga failed! output was {} ".format(e)) + self._sex = 0 + else: + self._sex = 0 + + @property + def city(self): + return self._city + + @city.setter + def city(self, city): + self._city = self.check_byte(city) + + @property + def avatar_url(self): + return self._avatar_url + + @avatar_url.setter + def avatar_url(self, avatar_url): + self._avatar_url = self.check_byte(avatar_url) + + @property + def province(self): + return self._province + + @province.setter + def province(self, province): + self._province = self.check_byte(province) + + @property + def country(self): + return self._country + + @country.setter + def country(self, country): + self._country = self.check_byte(country) + + @property + def distinctid(self): + return self._distinctid + + @distinctid.setter + def distinctid(self, distinctid): + self._distinctid = self.check_byte(distinctid) + + @property + def from_appid(self): + return self._from_appid + + @from_appid.setter + def from_appid(self, from_appid): + self._from_appid = self.check_byte(from_appid) + + +class CheckWords(object): + def __init__(self): + pass + + def run(self, item): + if not item: + return 0 + if isinstance(item, (str, bytes, int, float)): + try: + return int(item) + except Exception as e: + print("return values failed,output was {}".format(e)) + return 0 + + if isinstance(item, (list, tuple)): + return self.run(item[0]) + + +# 按提供的gameid从tga数据库中查询并输出其对应的url,suffix,appid,api_key,tgaid以dict形式存放 +class GetTgaConfig(): + def __init__(self): + self.url = "http://10.10.3.17:8992/querySql" + self.TGA = {'user': 'mytga', 'pswd': 'gzVwh4HGR68G', 'host': '10.10.3.5', 'db': 'tga'} + + def get_api_key(self, gameid, channel=6001): + item = {} + item['url'] = self.url + sql = "select suffix,appid,api_secret,tgaid from tgainfo where gameid={} and channelid={} and " \ + "in_used=1;".format(gameid, channel) + t = MysqlBase(**self.TGA) + data = t.query(sql) + if data: + item['suffix'], item['appid'], item['api_key'], item['tgaid'] = data[0] + return item + + +class FromTga: + def __init__(self, url, token): + if not token: + raise Exception("{0} token not found in env !") + self.url = url + self.token = token + self.output = "/data/logs/tga-report/" + + def get_data(self, sql): + data = {'token': self.token, 'sql': sql} + r = requests.post(self.url, data) + if r.status_code != requests.codes.ok: + print("connect tga failed!") + return None + + out = r.content.decode('utf-8') + if json.loads(out.split('\r\n')[0]).get('return_code', None) != 0: + # raise Exception("get data from tga failed!") + print("get data from tga failed!,sql was {}".format(sql)) + return None + + data_out = out.split('\r\n')[1:] + output = list() + for row in data_out: + if row: + try: + output.append(json.loads(row)) + except Exception as e: + print("转化数据失败,{} 提示为{}".format(row, e)) + return output + + def init_tga_write(self, tgaid): + ''' + from tgasdk.sdk import TGAnalytics, BatchConsumer, LoggingConsumer, AsyncBatchConsumer + 也可引入TGAnalytics与指定的Consumer + ''' + # now = datetime.date.today().strftime('%Y%m%d%H%M%S') + # # 初始化SDK + # filename = "{}/report_{}_{}.log".format(self.output, project, now) + # 检查目录是否存在,如不存在创建之 + paths = "{}/{}".format(self.output, tgaid) + if not os.path.isdir(paths): + os.makedirs(paths, mode=0o755) + + self.tga = TGAnalytics(LoggingConsumer(paths)) + + def _close_tga(self, tga): + self.tga.flush() + self.tga.close() + + def _split_user_data(self, data): + distinct_id = data.get('distinct_id', None) + account_id = data.get('account_id', None) + + if not (distinct_id or account_id): + print("distinct_id 或 account_id 必须有一则有值!") + return None + + if distinct_id: + data.pop('distinct_id') + if account_id: + data.pop('account_id') + + return distinct_id, account_id, data + + def put_event_data(self, data, event_name="Payment"): + # tga = self._init_tga_write() + try: + distinct_id, account_id, new_data = self._split_user_data(data) + except Exception as e: + print("拆解数据错误,输出为{}.请检查!".format(e)) + return False + + # properties = { + # "#time": datetime.datetime.now(), + # "#ip": "192.168.1.1", + # "Product_Name": "月卡", + # "Price": 30, + # "OrderId": "abc_123" + # } + + # 上传事件,包含账号ID与访客ID + try: + self.tga.track(distinct_id, account_id, event_name, new_data) + except Exception as e: + print("write to tga failed,output was {}".format(e)) + # self._close_tga(tga) + return False + # finally: + # self._close_tga(tga) + return True + + def put_user_data(self, data, method='user_set'): + # tga = self._init_tga_write() + try: + distinct_id, account_id, new_data = self._split_user_data(data) + except Exception as e: + print("拆解数据错误,输出为{}.请检查!".format(e)) + return False + try: + if method.lower() == "user_set": + self.tga.user_set(account_id=account_id, distinct_id=distinct_id, properties=new_data) + elif method.lower() == "user_setonce": + self.tga.user_setOnce(account_id=account_id, distinct_id=distinct_id, properties=new_data) + elif method.lower() == "user_add": + self.tga.user_add(account_id=account_id, distinct_id=distinct_id, properties=new_data) + elif method.lower() == "user_del": + self.tga.user_del(account_id=account_id, distinct_id=distinct_id) + else: + print("请提供用户操作类型 [user_set/user_setOnce/user_add/user_del] !") + return False + except Exception as e: + print("write to tga failed,output was {}".format(e)) + return False + # finally: + # self._close_tga(tga) + return True + + +def main(): + url = "http://10.10.3.17:8992/querySql" + # sql = "SELECT \"#server_time\",localuuid,ext FROM v_event_3 where \"#event_name\"='event_1_1'" + sql = "SELECT DISTINCT from_appid FROM v_event_22 WHERE \"$part_event\"='event_11_1' AND gameid='2001' AND \"$part_date\"='2019-06-18'" + token = "ESnhwwLtVu7zO2h6SSTEZ1jYagbOet0Kur0XnpG9fVJF5ROsqUkcNO0inVyFtQd1" + t = FromTga(url, token) + # t._init_tga_write() + # data={'account_id': 1012, 'distinct_id': 1012, 'gameid': 1012, 'from_appid': 'wx62d9035fd4fd2059', 'time': '2019-03-10', + # 'new_user': 2, 'active_user': 4, 'avg_runing_time': 0, 'old_user_login': 2, 'newuser_rungame': 2, + # 'newuser_rungame_rate': 100.0, 'newuser_qlty': 0} + # t.put_event_data(data) + print(json.dumps(t.get_data(sql))) + + +if __name__ == "__main__": + main() diff --git a/ops/plog.py b/ops/plog.py new file mode 100644 index 0000000..065ccb7 --- /dev/null +++ b/ops/plog.py @@ -0,0 +1,27 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import +import logging + + +def define_logger(filename="/data/logs/aa.log",debug=True): + logger = logging.getLogger("") + if debug==True: + logger.setLevel(logging.INFO) + else: + logger.setLevel(logging.ERROR) + + # 设置输出格式 + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + + # 设置日志文件处理器 + fh = logging.FileHandler(filename) + fh.setFormatter(formatter) # 为这个处理器添加格式 + + # 设置屏幕stdout输出处理器 + formatter_stdout = logging.Formatter('%(name)s - %(levelname)s - %(message)s') + sh = logging.StreamHandler(stream=None) + sh.setFormatter(formatter_stdout) + + # 把处理器加到logger上 + logger.addHandler(fh) +# logger.addHandler(sh) diff --git a/report_interface.py b/report_interface.py index f767130..4b1c6af 100644 --- a/report_interface.py +++ b/report_interface.py @@ -13,12 +13,13 @@ import logging from tornado import gen import tornado.options import json -from apscheduler.schedulers.tornado import TornadoScheduler +from config.config import * +#from apscheduler.schedulers.tornado import TornadoScheduler from tasks import run_tasks from urllib.parse import unquote from config import * -sched = TornadoScheduler() +#sched = TornadoScheduler() define_logger("/data/logs/report_interface.log") log = logging.getLogger(__name__) tornado.options.define("port", default=5021, type=int, help="run server on the given port.") diff --git a/tasks.py b/tasks.py index 4b2f9d2..dc38e7f 100644 --- a/tasks.py +++ b/tasks.py @@ -7,15 +7,15 @@ from mytask.tasks_1004 import * from mytask.tasks_base import * from mytask.tasks_1016 import * from mytask.tasks_2001 import * -import config +from config.config import * import sys log = logging.getLogger(__name__) -DB = {'user': 'mytga', 'pswd': 'gzVwh4HGR68G', 'host': '10.10.3.5', 'db': 'games_report'} + def run_tasks(): - args = config.event_list + args = event_list pool = Pool(processes=cpu_count()) pool.map(simple_work, args) pool.close() @@ -41,22 +41,6 @@ def simple_work(line): cc.workflow(line) -# -# def get_args(): -# get_event_args = f"""select gameid,channel,event_type in report_event where in_used=1""" -# mydb = MysqlBase(**DB) -# data = mydb.query(get_event_args) -# args = list() -# if data: -# for line in data: -# temp = {} -# try: -# temp['gameid'], temp['channel'], temp['event_type'] = line -# args.append(temp) -# except Exception: -# log.error(f"get args from {line} error !", exc_info=True) -# return args - if __name__ == "__main__": # run_tasks()