调整模板之间的引用关系

This commit is contained in:
pengtao 2019-08-12 12:01:20 +08:00
parent 5e329b28ae
commit e48e09057d
13 changed files with 505 additions and 35 deletions

1
config/__init__.py Normal file
View File

@ -0,0 +1 @@
# -*- coding: utf-8 -*-

View File

@ -3,6 +3,7 @@
# gameid,channel,event_type
is_debug = True
DB = {'user': 'mytga', 'pswd': 'gzVwh4HGR68G', 'host': '10.10.3.5', 'db': 'games_report'}
adv_id_state = {'1': '重新拉取', '0': '显示'}
ad_type = {'1': '视屏启动', '2': '视频错误回调', '101': 'banner显示', '201': '插屏显示'}

View File

@ -1,10 +1,8 @@
# -*- coding: utf-8 -*-
from tga import GetFromTga
from mytask.tga import GetFromTga
import logging
import json
import pdb
log = logging.getLogger(__name__)

View File

@ -1,9 +1,6 @@
# -*- coding: utf-8 -*-
from tga import GetFromTga
from mytask.tga import GetFromTga
import logging
import json
import pdb
log = logging.getLogger(__name__)

View File

@ -1,9 +1,6 @@
# -*- coding: utf-8 -*-
from tga import GetFromTga
from mytask.tga import GetFromTga
import logging
import json
import pdb
log = logging.getLogger(__name__)
tap_items = {'1': '登录按钮', '2': '进入游戏主界面', '3': '单人模式', '4': '组队模式', '5': '角色', '6': '观看视频试用皮肤按钮',

View File

@ -1,7 +1,6 @@
# -*- coding: utf-8 -*-
from tga import GetFromTga
from mytask.tga import GetFromTga
import logging
import json
log = logging.getLogger(__name__)

View File

@ -4,7 +4,7 @@ from ops.plog import define_logger
import logging
import requests
import json
from config import is_debug, adv_id_state, ad_type
from config.config import is_debug, adv_id_state, ad_type
define_logger("/data/logs/reports.log")
log = logging.getLogger(__name__)

1
ops/__init__.py Normal file
View File

@ -0,0 +1 @@
# -*- coding: utf-8 -*-

171
ops/mmysql.py Normal file
View File

@ -0,0 +1,171 @@
#!/usr/bin/env python
# -*- coding:utf-8 -*-
"""
date: 2016/07/11
role: mysql的增删改查类
usage: m = mysqlBase(host='xxx',db='xxx',user='xxx',pwd='xxx') 实例化
m.insert('core',{'host_name':'ccc','process_name':'ddd','ip_addr':'192.168.136.41','status':'4'})
m.update('table',{'field1':'value1','field2':'value2'},'id=1') 更新表名 字段名 条件
m.delete('core','status=5 and id=12')
m.change("update core set a='aaa' where id=1") 可以多条插入
m.query("select * from core")
"""
import logging
import warnings
try:
import MySQLdb
except:
import pymysql
pymysql.install_as_MySQLdb()
import MySQLdb
log = logging.getLogger(__name__)
###mysql操作类
class MysqlBase:
###连接数据库
def __init__(self, **args):
###获取参数
self.host = args.get('host', 'localhost')
self.user = args.get('user')
self.pswd = args.get('pswd')
self.db = args.get('db', 'mysql')
self.port = args.get('port', '3306')
self.charset = args.get('charset', 'utf8')
try:
self.conn = MySQLdb.connect(host=self.host, user=self.user, passwd=self.pswd, db=self.db,
port=int(self.port), charset=self.charset)
self.curs = self.conn.cursor()
self.curs.execute('SET NAMES utf8')
except:
log.error('%s mysql connect error' % self.host)
raise ValueError('mysql connect error %s' % self.host)
###释放资源
def __del__(self):
self.curs.close()
self.conn.close()
###插入
def insert(self, table, data):
_field = ','.join(['`%s`' % (k_insert) for k_insert in data.keys()])
_value = ','.join(["'%s'" % (str(v_insert).replace("'", "\'")) for v_insert in data.values()])
###拼接成sql语句
_sql = 'INSERT INTO `%s`(%s) VALUES(%s)' % (table, _field, _value)
###执行
self.curs.lastrowid = 0
try:
self.curs.execute(_sql)
###提交
self.conn.commit() # log.info('%s insert ' % _sql)
except:
self.conn.rollback()
log.error('%s insert error' % _sql)
raise ValueError('112,insert error %s' % _sql)
return self.curs.lastrowid
###更新
def update(self, table, data, condition):
_field = ','.join(["`%s`='%s'" % (k_update, str(data[k_update]).replace("'", "\'")) for k_update in data])
_sql = 'UPDATE `%s` SET %s WHERE %s' % (table, _field, condition)
###执行
resNum = 0
try:
resNum = self.curs.execute(_sql)
###提交
self.conn.commit() # log.info('%s update ' % _sql)
except:
self.conn.rollback()
log.error('%s update error' % _sql)
raise ValueError('update error %s' % _sql)
return resNum
###删除
def delete(self, table, condition):
_sql = 'DELETE FROM `%s` WHERE %s' % (table, condition)
###执行
resNum = 0
try:
resNum = self.curs.execute(_sql)
###提交
self.conn.commit() # log.info('%s delete ' % _sql)
except:
self.conn.rollback()
log.error('%s delete error' % _sql)
raise ValueError('112,delete error %s' % _sql)
return resNum
###直接给修改语句执行
def change(self, sql, many=False):
###过滤unknow table的warning
warnings.filterwarnings('ignore')
resNum = 0
if many:
try:
###多条同时插入
resNum = self.curs.executemany(sql, many)
self.conn.commit() # log.info('%s exec ' % sql)
except:
self.conn.rollback()
log.error('%s exec error' % sql)
raise ValueError('exec error %s' % sql)
else:
try:
resNum = self.curs.execute(sql)
###提交
self.conn.commit() # log.info('%s exec ' % sql)
except:
self.conn.rollback()
log.error('%s exec error' % sql)
raise ValueError('112,exec error %s' % sql)
return resNum
###查询
def query(self, sql):
res = ''
try:
self.curs.execute(sql)
res = self.curs.fetchall() # log.info('%s query ' % sql)
except:
log.error('%s query error' % sql)
# raise ValueError('query error %s'% sql)
return res
if __name__ == "__main__":
args = dict()
args['host'] = '172.16.17.164'
args['user'] = 'miles'
args['pswd'] = 'aspect'
args['db'] = 'test'
sql_sel = "SELECT * FROM bigdata_host LIMIT 5"
m = MysqlBase(**args)
data = m.query(sql=sql_sel)
m.insert('bigdata_host',
{'hostname': 'ccc', 'remark': 'ddd', 'up_addr_p': '192.168.136.41', 'states': '4', 'enter_time': '2017-03-13'})
m.delete('bigdata_host', 'hostname="ccc"')

293
ops/mtga.py Normal file
View File

@ -0,0 +1,293 @@
# -*- coding: utf-8 -*-
import requests
import json
import datetime
import os
from tgasdk.sdk import *
import pdb
import pymysql
from ops.mmysql import MysqlBase
# 初始化命令pip install ThinkingDataSdk
class TgaUserData(object):
def __init__(self):
self.gameid = 0
self.accountid = ""
self.distinctid = ""
self._nickname = ""
self._sex = 0
self._avatar_url = ""
self._city = ""
self._province = ""
self._country = ""
self.createtime = ""
self.is_hide = 0
self.lastlogon = ""
self.firstlogonip = ""
# 未实现 先占坑
self.is_real = 1
self.unionid = ""
self.wid = ""
self.channel = 0
self.sid = ""
self.ptid = ""
self.from_appid = ""
self.gamescore_his = 0
self.gamescore_week = 0
self.gamescore_last = 0
def check_byte(self, i):
if not i:
return ""
elif isinstance(i, bytes):
return pymysql.escape_string(i.decode())
else:
return pymysql.escape_string(i)
@property
def nickname(self):
return self._nickname
@nickname.setter
def nickname(self, nickname):
self._nickname = self.check_byte(nickname)
@property
def sex(self):
return self._sex
@sex.setter
def sex(self, sex):
if sex:
try:
self._sex = int(sex)
except Exception as e:
print("ERROR get sex property from tga failed! output was {} ".format(e))
self._sex = 0
else:
self._sex = 0
@property
def city(self):
return self._city
@city.setter
def city(self, city):
self._city = self.check_byte(city)
@property
def avatar_url(self):
return self._avatar_url
@avatar_url.setter
def avatar_url(self, avatar_url):
self._avatar_url = self.check_byte(avatar_url)
@property
def province(self):
return self._province
@province.setter
def province(self, province):
self._province = self.check_byte(province)
@property
def country(self):
return self._country
@country.setter
def country(self, country):
self._country = self.check_byte(country)
@property
def distinctid(self):
return self._distinctid
@distinctid.setter
def distinctid(self, distinctid):
self._distinctid = self.check_byte(distinctid)
@property
def from_appid(self):
return self._from_appid
@from_appid.setter
def from_appid(self, from_appid):
self._from_appid = self.check_byte(from_appid)
class CheckWords(object):
def __init__(self):
pass
def run(self, item):
if not item:
return 0
if isinstance(item, (str, bytes, int, float)):
try:
return int(item)
except Exception as e:
print("return values failed,output was {}".format(e))
return 0
if isinstance(item, (list, tuple)):
return self.run(item[0])
# 按提供的gameid从tga数据库中查询并输出其对应的url,suffix,appid,api_keytgaid以dict形式存放
class GetTgaConfig():
def __init__(self):
self.url = "http://10.10.3.17:8992/querySql"
self.TGA = {'user': 'mytga', 'pswd': 'gzVwh4HGR68G', 'host': '10.10.3.5', 'db': 'tga'}
def get_api_key(self, gameid, channel=6001):
item = {}
item['url'] = self.url
sql = "select suffix,appid,api_secret,tgaid from tgainfo where gameid={} and channelid={} and " \
"in_used=1;".format(gameid, channel)
t = MysqlBase(**self.TGA)
data = t.query(sql)
if data:
item['suffix'], item['appid'], item['api_key'], item['tgaid'] = data[0]
return item
class FromTga:
def __init__(self, url, token):
if not token:
raise Exception("{0} token not found in env !")
self.url = url
self.token = token
self.output = "/data/logs/tga-report/"
def get_data(self, sql):
data = {'token': self.token, 'sql': sql}
r = requests.post(self.url, data)
if r.status_code != requests.codes.ok:
print("connect tga failed!")
return None
out = r.content.decode('utf-8')
if json.loads(out.split('\r\n')[0]).get('return_code', None) != 0:
# raise Exception("get data from tga failed!")
print("get data from tga failed!,sql was {}".format(sql))
return None
data_out = out.split('\r\n')[1:]
output = list()
for row in data_out:
if row:
try:
output.append(json.loads(row))
except Exception as e:
print("转化数据失败,{} 提示为{}".format(row, e))
return output
def init_tga_write(self, tgaid):
'''
from tgasdk.sdk import TGAnalytics, BatchConsumer, LoggingConsumer, AsyncBatchConsumer
也可引入TGAnalytics与指定的Consumer
'''
# now = datetime.date.today().strftime('%Y%m%d%H%M%S')
# # 初始化SDK
# filename = "{}/report_{}_{}.log".format(self.output, project, now)
# 检查目录是否存在,如不存在创建之
paths = "{}/{}".format(self.output, tgaid)
if not os.path.isdir(paths):
os.makedirs(paths, mode=0o755)
self.tga = TGAnalytics(LoggingConsumer(paths))
def _close_tga(self, tga):
self.tga.flush()
self.tga.close()
def _split_user_data(self, data):
distinct_id = data.get('distinct_id', None)
account_id = data.get('account_id', None)
if not (distinct_id or account_id):
print("distinct_id 或 account_id 必须有一则有值!")
return None
if distinct_id:
data.pop('distinct_id')
if account_id:
data.pop('account_id')
return distinct_id, account_id, data
def put_event_data(self, data, event_name="Payment"):
# tga = self._init_tga_write()
try:
distinct_id, account_id, new_data = self._split_user_data(data)
except Exception as e:
print("拆解数据错误,输出为{}.请检查!".format(e))
return False
# properties = {
# "#time": datetime.datetime.now(),
# "#ip": "192.168.1.1",
# "Product_Name": "月卡",
# "Price": 30,
# "OrderId": "abc_123"
# }
# 上传事件包含账号ID与访客ID
try:
self.tga.track(distinct_id, account_id, event_name, new_data)
except Exception as e:
print("write to tga failed,output was {}".format(e))
# self._close_tga(tga)
return False
# finally:
# self._close_tga(tga)
return True
def put_user_data(self, data, method='user_set'):
# tga = self._init_tga_write()
try:
distinct_id, account_id, new_data = self._split_user_data(data)
except Exception as e:
print("拆解数据错误,输出为{}.请检查!".format(e))
return False
try:
if method.lower() == "user_set":
self.tga.user_set(account_id=account_id, distinct_id=distinct_id, properties=new_data)
elif method.lower() == "user_setonce":
self.tga.user_setOnce(account_id=account_id, distinct_id=distinct_id, properties=new_data)
elif method.lower() == "user_add":
self.tga.user_add(account_id=account_id, distinct_id=distinct_id, properties=new_data)
elif method.lower() == "user_del":
self.tga.user_del(account_id=account_id, distinct_id=distinct_id)
else:
print("请提供用户操作类型 [user_set/user_setOnce/user_add/user_del] !")
return False
except Exception as e:
print("write to tga failed,output was {}".format(e))
return False
# finally:
# self._close_tga(tga)
return True
def main():
url = "http://10.10.3.17:8992/querySql"
# sql = "SELECT \"#server_time\",localuuid,ext FROM v_event_3 where \"#event_name\"='event_1_1'"
sql = "SELECT DISTINCT from_appid FROM v_event_22 WHERE \"$part_event\"='event_11_1' AND gameid='2001' AND \"$part_date\"='2019-06-18'"
token = "ESnhwwLtVu7zO2h6SSTEZ1jYagbOet0Kur0XnpG9fVJF5ROsqUkcNO0inVyFtQd1"
t = FromTga(url, token)
# t._init_tga_write()
# data={'account_id': 1012, 'distinct_id': 1012, 'gameid': 1012, 'from_appid': 'wx62d9035fd4fd2059', 'time': '2019-03-10',
# 'new_user': 2, 'active_user': 4, 'avg_runing_time': 0, 'old_user_login': 2, 'newuser_rungame': 2,
# 'newuser_rungame_rate': 100.0, 'newuser_qlty': 0}
# t.put_event_data(data)
print(json.dumps(t.get_data(sql)))
if __name__ == "__main__":
main()

27
ops/plog.py Normal file
View File

@ -0,0 +1,27 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import
import logging
def define_logger(filename="/data/logs/aa.log",debug=True):
logger = logging.getLogger("")
if debug==True:
logger.setLevel(logging.INFO)
else:
logger.setLevel(logging.ERROR)
# 设置输出格式
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# 设置日志文件处理器
fh = logging.FileHandler(filename)
fh.setFormatter(formatter) # 为这个处理器添加格式
# 设置屏幕stdout输出处理器
formatter_stdout = logging.Formatter('%(name)s - %(levelname)s - %(message)s')
sh = logging.StreamHandler(stream=None)
sh.setFormatter(formatter_stdout)
# 把处理器加到logger上
logger.addHandler(fh)
# logger.addHandler(sh)

View File

@ -13,12 +13,13 @@ import logging
from tornado import gen
import tornado.options
import json
from apscheduler.schedulers.tornado import TornadoScheduler
from config.config import *
#from apscheduler.schedulers.tornado import TornadoScheduler
from tasks import run_tasks
from urllib.parse import unquote
from config import *
sched = TornadoScheduler()
#sched = TornadoScheduler()
define_logger("/data/logs/report_interface.log")
log = logging.getLogger(__name__)
tornado.options.define("port", default=5021, type=int, help="run server on the given port.")

View File

@ -7,15 +7,15 @@ from mytask.tasks_1004 import *
from mytask.tasks_base import *
from mytask.tasks_1016 import *
from mytask.tasks_2001 import *
import config
from config.config import *
import sys
log = logging.getLogger(__name__)
DB = {'user': 'mytga', 'pswd': 'gzVwh4HGR68G', 'host': '10.10.3.5', 'db': 'games_report'}
def run_tasks():
args = config.event_list
args = event_list
pool = Pool(processes=cpu_count())
pool.map(simple_work, args)
pool.close()
@ -41,22 +41,6 @@ def simple_work(line):
cc.workflow(line)
#
# def get_args():
# get_event_args = f"""select gameid,channel,event_type in report_event where in_used=1"""
# mydb = MysqlBase(**DB)
# data = mydb.query(get_event_args)
# args = list()
# if data:
# for line in data:
# temp = {}
# try:
# temp['gameid'], temp['channel'], temp['event_type'] = line
# args.append(temp)
# except Exception:
# log.error(f"get args from {line} error !", exc_info=True)
# return args
if __name__ == "__main__":
# run_tasks()