commit 30ffb7062e55e6cf004c25f75cb81c0203cc5692 Author: pengtao Date: Fri Apr 17 11:37:13 2020 +0800 new diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..7c68785 --- /dev/null +++ b/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- \ No newline at end of file diff --git a/add_path.py b/add_path.py new file mode 100644 index 0000000..035f196 --- /dev/null +++ b/add_path.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- +import sysconfig +import os +import subprocess + +lib_path = sysconfig.get_paths()['purelib'] + +if not lib_path: + raise Exception("python lib like site-packages not found!") + +current_path = os.path.abspath('.') + +cmd = "ln -s {0} {1}/ops".format(current_path, lib_path) +cmdref = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, + shell=True) +output, error_info = cmdref.communicate() +if cmdref.returncode != 0: + error_msg = "run cmd {0} failed,error was {1}".format(cmd, error_info) + raise Exception(error_msg) diff --git a/aliyun_api.py b/aliyun_api.py new file mode 100644 index 0000000..07497cb --- /dev/null +++ b/aliyun_api.py @@ -0,0 +1,85 @@ +from oss2 import SizedFileAdapter, determine_part_size +from oss2.models import PartInfo +import oss2 +from aliyunsdkcore.client import AcsClient +from aliyunsdkcdn.request.v20180510 import RefreshObjectCachesRequest +from ops.mlog import log +import os +import json +import hashlib +import base64 + +""" +pip install oss2 aliyun-python-sdk-core aliyun-python-sdk-cdn + +阿里云api通用类 +""" + + +class AliApi(object): + def __init__(self, access_key_id=None, access_key_secret=None): + self.access_key_id = access_key_id or os.environ.get('ALI_ACCESS_KEY_ID') + self.access_key_secret = access_key_secret or os.environ.get('ALI_ACCESS_KEY_SECRET') + + def calculate_file_md5(self, file_name, block_size=64 * 1024): + """计算文件的MD5 + :param file_name: 文件名 + :param block_size: 计算MD5的数据块大小,默认64KB + :return 文件内容的MD5值 + """ + with open(file_name, 'rb') as f: + md5 = hashlib.md5() + while True: + data = f.read(block_size) + if not data: + break + md5.update(data) + + return base64.b64encode(md5.digest()).decode() + + def oss_upload(self, endpoint, bucket_name, file, key): + auth = oss2.Auth(self.access_key_id, self.access_key_secret) + bucket = oss2.Bucket(auth, endpoint, bucket_name) + + total_size = os.path.getsize(file) + # determine_part_size方法用来确定分片大小。 + part_size = determine_part_size(total_size, preferred_size=100 * 1024) + + # 初始化分片。 + upload_id = bucket.init_multipart_upload(key).upload_id + parts = [] + + try: + # 逐个上传分片。 + with open(file, 'rb') as fileobj: + part_number = 1 + offset = 0 + while offset < total_size: + num_to_upload = min(part_size, total_size - offset) + # SizedFileAdapter(fileobj, size)方法会生成一个新的文件对象,重新计算起始追加位置。 + result = bucket.upload_part(key, upload_id, part_number, SizedFileAdapter(fileobj, num_to_upload), + headers={'Content-MD5': self.calculate_file_md5(file)}) + parts.append(PartInfo(part_number, result.etag)) + + offset += num_to_upload + part_number += 1 + + # 完成分片上传。 + bucket.complete_multipart_upload(key, upload_id, parts) + log.info("{0} upload complete".format(file)) + except: + log.info("{0} upload failed".format(file)) + + def cdn_refresh_dir(self, refresh_url, region='cn-shanghai'): + client = AcsClient(self.access_key_id, self.access_key_secret, region) + + request = RefreshObjectCachesRequest.RefreshObjectCachesRequest() + request.set_ObjectPath(refresh_url) + if refresh_url.endswith('/'): + request.set_ObjectType('Directory') + else: + request.set_ObjectType('File') + response = client.do_action_with_exception(request) + rel = json.loads(response.decode('utf-8')) + return rel + diff --git a/clearn.py b/clearn.py new file mode 100644 index 0000000..6b4dd7b --- /dev/null +++ b/clearn.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- + +import os +import sys +import pdb + +current_path = sys.argv[1] +remove_list = ('pyc', 'old', 'log1') +remove_files = [] + + +def searchDirFile(rootDir, saveDir): + for dir_or_file in os.listdir(rootDir): + filePath = os.path.join(rootDir, dir_or_file) + + if os.path.isfile(filePath): + if os.path.basename(filePath).endswith() in remove_list: + print('imgBox fileName is ' + os.path.basename(filePath)) + remove_files.append(os.path.join(rootDir, filePath)) + else: + continue + elif os.path.isdir(filePath): + searchDirFile(filePath, saveDir) + else: + print('not file and dir ' + os.path.basename(filePath)) + + + +remove_files = searchDirFile(current_path, remove_files) + +pdb.set_trace() +print(f"{remove_files}") # if remove_files: +# for item in remove_files: +# os.remove(item) diff --git a/clog.py b/clog.py new file mode 100644 index 0000000..cbbaec4 --- /dev/null +++ b/clog.py @@ -0,0 +1,15 @@ +import logging.handlers + + +def get_logger(task_id): + logger = logging.getLogger(__file__) + logger.handlers.clear() + http_handler = logging.handlers.HTTPHandler( + 'ops.kingsome.cn', + '/api/log/?type=task&task_id={}'.format(task_id), + method='GET', + ) + logger.setLevel(logging.INFO) + logger.addHandler(http_handler) + + return logger diff --git a/collect_data_ad.py b/collect_data_ad.py new file mode 100644 index 0000000..4f52e35 --- /dev/null +++ b/collect_data_ad.py @@ -0,0 +1,86 @@ +# -*- coding: utf-8 -*- +import datetime +from ops.plog import define_logger +import logging +import os +from ops.mtga import FromTga, GetTgaConfig + +if os.path.isdir("/data/logs"): + define_logger("/data/logs/collect_data.log") +else: + define_logger("/tmp/collect_data.log") + +log = logging.getLogger(__name__) + + +class CollectGameDataAD: + def __init__(self, **kwargs): + self.gameid = kwargs.get('gameid', 0) + self.channelid = kwargs.get('channelid', 6001) + self.times = kwargs.get('times', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + self.date = datetime.datetime.strptime(self.times, "%Y-%m-%d %H:%M:%S").strftime("%Y-%m-%d") + self.ad_channel = kwargs.get('ad_channel', '') + + gc = GetTgaConfig() + self.url = gc.url + item = gc.get_api_key(self.gameid, self.channelid) + self.suffix = item.get('suffix', None) + self.tga = FromTga(url=self.url, token=item.get('api_key', None)) + + def _run_tga_sql(self, sql): + data = self.tga.get_data(sql) + try: + if data: + return int(data[0][0]) + else: + return 0 + except Exception: + log.error(f"get data ={data} error!", exc_info=True) + return 0 + + def get_DAU(self): + sql = f"""SELECT count(distinct \"#account_id\") FROM v_event_{self.suffix} where "$part_event"='event_11_1' AND gameid='{self.gameid}' AND ad_channel='{self.ad_channel}' AND "#server_time" BETWEEN TIMESTAMP '{self.date} 00:00:00' AND TIMESTAMP '{self.times}'""""" + return self._run_tga_sql(sql) + + + def get_registerUser(self): + sql = f"""SELECT count(distinct "#user_id") FROM v_user_{self.suffix} where gameid='{self.gameid}' AND ad_channel='{self.ad_channel}' AND account_register_time <= TIMESTAMP '{self.times}'""" + return self._run_tga_sql(sql) + + + def get_newuser(self): + sql = f"""SELECT count(distinct "#account_id") FROM v_event_{self.suffix} where gameid='{self.gameid}' and "$part_event"='event_11_1' and ad_channel='{self.ad_channel}' and account_register_date between TIMESTAMP'{self.date} 00:00:00' and TIMESTAMP'{self.times}'""" + return self._run_tga_sql(sql) + + def get_loginuser(self): + sql = f"""SELECT count(1) FROM v_event_{self.suffix} where gameid='{self.gameid}' and +"$part_event"='event_11_1' and ad_channel='{self.ad_channel}' and "#server_time" BETWEEN TIMESTAMP '{self.date} 00:00:00' AND TIMESTAMP '{self.times}'""" + return self._run_tga_sql(sql) + + def get_sharenum(self): + sql = f"""SELECT count(1) FROM v_event_{self.suffix} where "$part_event"='event_11_10' and gameid='{self.gameid}' and ad_channel='{self.ad_channel}' and "#server_time" BETWEEN TIMESTAMP '{self.date} 00:00:00' AND TIMESTAMP '{self.times}'""" + print(f"1 {sql}") + return self._run_tga_sql(sql) + + + def get_sharebypeople(self): + sql = f"""SELECT count(distinct \"#account_id\") FROM v_event_{self.suffix} where "$part_event"='event_11_10' and gameid='{self.gameid}' and ad_channel='{self.ad_channel}' and "#server_time" BETWEEN TIMESTAMP '{self.date} 00:00:00' AND TIMESTAMP '{self.times}'""" + return self._run_tga_sql(sql) + + def get_retain(self, next_step=1): + now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + btimes = (datetime.datetime.strptime(now, '%Y-%m-%d %H:%M:%S') - datetime.timedelta(days=next_step)).strftime('%Y-%m-%d') + login_sql = f"""select count(distinct "#account_id") from v_event_{self.suffix} where +"$part_event"='event_11_1' AND gameid='{self.gameid}' AND ad_channel='{self.ad_channel}' AND account_register_date +BETWEEN TIMESTAMP'{btimes} 00:00:00' AND TIMESTAMP'{btimes} 23:59:59' AND "$part_date"='{now[:10]}' """ + newuser_sql = f"""select count(distinct "#account_id") from v_event_{self.suffix} where gameid='{self.gameid}' AND ad_channel='{self.ad_channel}' AND account_register_date BETWEEN TIMESTAMP'{btimes} 00:00:00' AND TIMESTAMP'{btimes} 23:59:59'""" + loguser = self._run_tga_sql(login_sql) + newuser = self._run_tga_sql(newuser_sql) + try: + retain_rate = round((100 * loguser / newuser), 2) + except Exception: + retain_rate = 0 + log.error("collect retain failed!", exc_info=True) + print(f"2 {retain_rate} {loguser} {newuser}") + return retain_rate + diff --git a/common.py b/common.py new file mode 100644 index 0000000..d5787b6 --- /dev/null +++ b/common.py @@ -0,0 +1,121 @@ +# -*- coding: utf-8 -*- +import sys +from .mlog import log +import functools + + +def tarce(f): + @functools.wraps(f) + def decorated_function(*args, **kwargs): + print(f"func name was {__name__}, args was {f}, {args}, {kwargs}") + result = f(*args, **kwargs) + print(f"return was {result}!") + + return decorated_function + + +def send_ops_mail(sub, body, sendto): + import requests + mail_url = 'http://10.10.3.10:5011/mail/api/v2.0/alarm' + data = {"sub": sub, "content": body, "sendto": sendto} + r = requests.post(url=mail_url, data=data) + if r.status_code == 200: + log.info("send mail success!") + + else: + err_msg = "send mail failed ,output was %s" % r.content + log.error("send mail failed ,{0}".format(err_msg)) + + +def send_alter_mail(sub, body, sendto): + import requests + mail_url = 'http://10.10.3.16:50010/mail/api/v2.0/send' + data = {"sub": sub, "content": body, "sendto": sendto} + r = requests.post(url=mail_url, data=data) + if r.status_code == 200: + log.info("send mail success!") + else: + err_msg = "send mail failed ,output was %s" % r.content + log.error("send mail failed ,{0}".format(err_msg)) + + +def run_cmd(cmd): + import subprocess + msg = "Starting run: %s " % cmd + #log.info("run_cmd {0}".format(msg)) + cmdref = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) + output, error_info = cmdref.communicate() + if cmdref.returncode != 0: + if isinstance(error_info, list) or isinstance(error_info, tuple): + error_info = error_info[0] + #msg = "RUN %s ERROR,error info: %s" % (cmd, error_info) + #log.error(msg) + return False, error_info + + else: + msg = "run %s success" % cmd + #log.info(msg) + # print "Run Success!!" + return True, output + + +def str_datetime(step=0): + import datetime + return (datetime.datetime.now() - datetime.timedelta(int(step))).strftime("%Y-%m-%d %H:%M:%S") + + +def str_short_time(step=0): + import datetime + return (datetime.datetime.now() - datetime.timedelta(int(step))).strftime("%Y%m%d%H%M%S") + + +def write_files(filename, data): + with open(filename, 'a+') as f: + f.write(data) + f.write('\n') + + +def read_files(filename): + with open(filename, 'r') as f: + data = f.read().strip() + return data + + +def cal_crc32(data): + import binascii + return binascii.crc32(data.encode()) + + +def cal_db_num(data, db=16): + import binascii + data_crc = binascii.crc32(data.encode()) + return int(data_crc % db) + 1 + + +def to_str(bytes_or_str): + if isinstance(bytes_or_str, bytes): + value = bytes_or_str.decode('utf-8') + else: + value = bytes_or_str + return value + + +# 返回格式化时间的下一天 +def next_day(day, i=1): + import datetime + try: + begin = datetime.datetime.strptime(day, '%Y-%m-%d') + except: + raise Exception("PLS input time as '2019-03-01!'") + next_day = (begin + datetime.timedelta(days=i)) + return datetime.datetime.strftime(next_day, "%Y-%m-%d") + + +def class2dict(a): + try: + dd = dict((name, getattr(a, name)) for name in dir(a) if + (not name.startswith('_') and not name.startswith('__') and not callable(getattr(a, name)))) + except Exception as e: + print("{} 转换到dict失败 ,{}".format(a, e)) + return None + return dd # a=open(filename,'r').read() # time.strftime("%Y%m%d %H:%M:%S") # print( '20180807 15:23:29') # # time.strptime("20180702","%Y%m%d") # time.struct_time(tm_year=2018, tm_mon=7, tm_mday=2, tm_hour=0, tm_min=0, tm_sec=0, tm_wday=0, tm_yday=183, tm_isdst=-1) diff --git a/gamedata2001.py b/gamedata2001.py new file mode 100644 index 0000000..de14862 --- /dev/null +++ b/gamedata2001.py @@ -0,0 +1,38 @@ +# -*- coding: utf-8 -*- + +class Data2001(): + def __init__(self): + self.gameid = 0 + self.fromappid = "" # 渠道ID + self.date = "" # 报表日期 + self.dau = 0 # 游戏当日的活跃用户,整数 + self.registerUser = 0 # 累计注册 + self.loginUser = 0 # 游戏当日的用户启动次数,整数 + self.newUser = 0 # 游戏当日的初次注册的用户数量,整数 + self.timedelay = 0 # 游戏当日的用户人均停留时长,单位秒,整数 + self.shareNum = 0 # 游戏当日所有用户的分享总次数,整数 + self.sharePeople = 0 # 游戏当日有过分享行为的用户数量(去重),整数 + + self.regSession = 0 # 会话注册比率,百分比,记录到百分位 + self.dauSession = 0 # 会话活跃比率,百分比,记录到百分位 + self.adExposure = 0 # banner广告曝光量,整数 + self.adClick = 0 # banner广告点击量,整数 + self.adClickRate = 0 # banner广告点击率,百分比,记录到百分位 + self.adIncome = 0 # banner广告收入,记录到百分位 + + self.movieExposure = 0 # 激励式视频曝光量,整数 + self.movieClick = 0 # 激励式视频点击量,整数 + self.movieClickRate = 0 # 激励式视频点击率,整数 + self.movieIncome = 0 # 激励式视频广告收入,记录到百分位 + self.adTotalIncome = 0 # 广告总收入,记录到百分位 + self.vPay = 0 # 虚拟支付,记录到百分位 + + self.shareRate = 0 # 用户分享率,百分比,记录到百分位 + self.shareRatePeople = 0 # 分享用户人均分享次数,记录到百分位 + self.newConversionRateSession = 0 # 会话新增转化率,百分比,记录到百分位 + self.k = 0 # K值 数字,记录到百分位 + self.totalIncome = 0 # 总收入 记录到百分位 + self.ARPU = 0 # 每用户平均收入,记录到千分位 + self.newUserSession = 0 # 会话注册人数,整数 + self.dauActiveSession = 0 # DAU会话人数 + diff --git a/gamedatabase.py b/gamedatabase.py new file mode 100644 index 0000000..f602ee9 --- /dev/null +++ b/gamedatabase.py @@ -0,0 +1,38 @@ +# -*- coding: utf-8 -*- + +class Data(): + def __init__(self): + self.gameid = 0 + self.fromappid = "" # 渠道ID + self.date = "" # 报表日期 + self.dau = 0 # 游戏当日的活跃用户,整数 + self.registerUser = 0 # 累计注册 + self.loginUser = 0 # 游戏当日的用户启动次数,整数 + self.newUser = 0 # 游戏当日的初次注册的用户数量,整数 + self.timedelay = 0 # 游戏当日的用户人均停留时长,单位秒,整数 + self.shareNum = 0 # 游戏当日所有用户的分享总次数,整数 + self.sharePeople = 0 # 游戏当日有过分享行为的用户数量(去重),整数 + + self.regSession = 0 # 会话注册比率,百分比,记录到百分位 + self.dauSession = 0 # 会话活跃比率,百分比,记录到百分位 + self.adExposure = 0 # banner广告曝光量,整数 + self.adClick = 0 # banner广告点击量,整数 + self.adClickRate = 0 # banner广告点击率,百分比,记录到百分位 + self.adIncome = 0 # banner广告收入,记录到百分位 + + self.movieExposure = 0 # 激励式视频曝光量,整数 + self.movieClick = 0 # 激励式视频点击量,整数 + self.movieClickRate = 0 # 激励式视频点击率,整数 + self.movieIncome = 0 # 激励式视频广告收入,记录到百分位 + self.adTotalIncome = 0 # 广告总收入,记录到百分位 + self.vPay = 0 # 虚拟支付,记录到百分位 + + self.shareRate = 0 # 用户分享率,百分比,记录到百分位 + self.shareRatePeople = 0 # 分享用户人均分享次数,记录到百分位 + self.newConversionRateSession = 0 # 会话新增转化率,百分比,记录到百分位 + self.k = 0 # K值 数字,记录到百分位 + self.totalIncome = 0 # 总收入 记录到百分位 + self.ARPU = 0 # 每用户平均收入,记录到千分位 + self.newUserSession = 0 # 会话注册人数,整数 + self.dauActiveSession = 0 # DAU会话人数 + diff --git a/mansible.py b/mansible.py new file mode 100644 index 0000000..3f74591 --- /dev/null +++ b/mansible.py @@ -0,0 +1,230 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import os +import tempfile +from collections import namedtuple +from ansible.parsing.dataloader import DataLoader +from ansible.vars.manager import VariableManager +from ansible.inventory.manager import InventoryManager +from ansible.playbook.play import Play +from ansible.executor.playbook_executor import PlaybookExecutor +from ansible.executor.task_queue_manager import TaskQueueManager +from ansible.plugins.callback import CallbackBase +import json +import pdb +import copy +import time + + +def write_host(ip_addr): + host_file = "%s/host%s" % ('/tmp', time.strftime("%Y%m%d%H%M%s")) + with open(host_file, 'a+') as f: + for x in ip_addr.split(','): + f.writelines(x + '\n') + return host_file + + +class ResultsCallback(CallbackBase): + def __init__(self, *args, **kwargs): + super(ResultsCallback, self).__init__(*args, **kwargs) + self.task_ok = {} + self.task_unreachable = {} + self.task_failed = {} + self.task_skipped = {} + self.task_stats = {} + # self.host_ok = {} + # self.host_unreachable = {} + # self.host_failed = {} + + def v2_runner_on_unreachable(self, result): + self.task_unreachable[result._host.get_name()] = result + + def v2_runner_on_ok(self, result, *args, **kwargs): + self.task_ok[result._host.get_name()] = result + + def v2_runner_on_failed(self, result, *args, **kwargs): + self.task_failed[result._host.get_name()] = result + + def v2_runner_on_skipped(self, result, *args, **kwargs): + self.task_skipped[result._host.get_name()] = result + + def v2_runner_on_stats(self, result, *args, **kwargs): + self.task_stats[result._host.get_name()] = result + + +class AnsibleAPI(object): + def __init__(self, hostfile, *args, **kwargs): + self.loader = DataLoader() + self.results_callback = ResultsCallback() + # if not os.path.isfile(hostfile): + # raise Exception("%s file not found!" % hostfile) + self.inventory = InventoryManager(loader=self.loader, sources=[hostfile]) + self.variable_manager = VariableManager(loader=self.loader, inventory=self.inventory) + self.passwords = None + Options = namedtuple('Options', + ['connection', + 'remote_user', + 'ask_sudo_pass', + 'verbosity', + 'ack_pass', + 'module_path', + 'forks', + 'become', + 'become_method', + 'become_user', + 'check', + 'listhosts', + 'listtasks', + 'listtags', + 'syntax', + 'sudo_user', + 'sudo', + 'diff']) + # 初始化需要的对象 + self.options = Options(connection='smart', + remote_user=None, + ack_pass=None, + sudo_user=None, + forks=5, + sudo=None, + ask_sudo_pass=False, + verbosity=5, + module_path=None, + become=None, + become_method=None, + become_user=None, + check=False, + diff=False, + listhosts=None, + listtasks=None, + listtags=None, + syntax=None) + + @staticmethod + def deal_result(info): + host_ips = list(info.get('success').keys()) + info['success'] = host_ips + + error_ips = info.get('failed') + error_msg = {} + for key, value in error_ips.items(): + temp = {} + temp[key] = value.get('stderr') + error_msg.update(temp) + info['failed'] = error_msg + return info + # return json.dumps(info) + + def run(self, module_name, module_args): + play_source = dict( + name="Ansible Play", + hosts='all', + gather_facts='no', + tasks=[ + dict(action=dict(module=module_name, args=module_args), register='shell_out'), + # dict(action=dict(module='debug', args=dict(msg='{{shell_out.stdout}}'))) + ] + ) + play = Play().load(play_source, variable_manager=self.variable_manager, loader=self.loader) + + tqm = None + try: + tqm = TaskQueueManager( + inventory=self.inventory, + variable_manager=self.variable_manager, + loader=self.loader, + options=self.options, + passwords=self.passwords, + stdout_callback=self.results_callback, + ) + result = tqm.run(play) + finally: + if tqm is not None: + tqm.cleanup() + + ##定义字典用于接收或者处理结果 + result_raw = {'success': {}, 'failed': {}, 'unreachable': {}, 'skipped': {}, 'status': {}} + + # 循环打印这个结果,success,failed,unreachable需要每个都定义一个 + for host, result in self.results_callback.task_ok.items(): + result_raw['success'][host] = result._result + for host, result in self.results_callback.task_failed.items(): + result_raw['failed'][host] = result._result + for host, result in self.results_callback.task_unreachable.items(): + result_raw['unreachable'][host] = result._result + + result_full = copy.deepcopy(result_raw) + result_json = self.deal_result(result_raw) + if not result_json['failed']: + return result_json + else: + return result_full + # return result_raw + + def run_playbook(self, file, **kwargs): + if not os.path.isfile(file): + raise Exception("%s file not found!" % file) + try: + # extra_vars = {} # 额外的参数 sudoers.yml以及模板中的参数,它对应ansible-playbook test.yml --extra-vars "host='aa' name='cc' " + self.variable_manager.extra_vars = kwargs + + playbook = PlaybookExecutor(playbooks=['' + file], inventory=self.inventory, + variable_manager=self.variable_manager, + loader=self.loader, options=self.options, passwords=self.passwords) + + playbook._tqm._stdout_callback = self.results_callback + playbook.run() + except Exception as e: + print("error:", e.message) + + ##定义字典用于接收或者处理结果 + result_raw = {'success': {}, 'failed': {}, 'unreachable': {}, 'skipped': {}, 'status': {}} + + # 循环打印这个结果,success,failed,unreachable需要每个都定义一个 + for host, result in self.results_callback.task_ok.items(): + result_raw['success'][host] = result._result + for host, result in self.results_callback.task_failed.items(): + result_raw['failed'][host] = result._result + for host, result in self.results_callback.task_unreachable.items(): + result_raw['unreachable'][host] = result._result + + # for host, result in self.results_callback.task_skipped.items(): + # result_raw['skipped'][host] = result._result + # + # for host, result in self.results_callback.task_stats.items(): + # result_raw['status'][host] = result._result + + result_full = copy.deepcopy(result_raw) + result_json = self.deal_result(result_raw) + if not result_json['failed']: + return result_json + else: + return result_full + + +class AnsiInterface(AnsibleAPI): + def __init__(self, hostfile, *args, **kwargs): + super(AnsiInterface, self).__init__(hostfile, *args, **kwargs) + + def copy_file(self, src=None, dest=None): + """ + copy file + """ + module_args = "src=%s dest=%s" % (src, dest) + result = self.run('copy', module_args) + return result + + def exec_command(self, cmds): + """ + commands + """ + result = self.run('command', cmds) + return result + + def exec_script(self, path): + """ + 在远程主机执行shell命令或者.sh脚本 + """ + result = self.run('shell', path) + return result diff --git a/mcrypto.py b/mcrypto.py new file mode 100644 index 0000000..37c350c --- /dev/null +++ b/mcrypto.py @@ -0,0 +1,55 @@ +# -*- coding: utf-8 -*- + +# pip install pycryptodome + +from Crypto.Cipher import AES +from binascii import b2a_hex, a2b_hex + +AES_LENGTH = 16 + + +class Crypto(): + def __init__(self, key): + self.key = key + self.mode = AES.MODE_ECB + self.cryptor = AES.new(self.pad_key(self.key).encode(), self.mode) + + # 加密文本text必须为16的倍数,补足为16的倍数 + def pad(self, text): + while len(text) % AES_LENGTH != 0: + text += '\0' + return text + + # 加密密钥需要长达16位字符,所以进行空格拼接 + def pad_key(self,key): + while len(key) % AES_LENGTH != 0: + key += '\0' + return key + + # 加密 + def encrypt(self, text): + try: + # 这里密钥key 长度必须为16(AES-128)、24(AES-192)、或32(AES-256)Bytes 长度.目前AES-128足够用 + # 加密的字符需要转换为bytes + self.ciphertext = self.cryptor.encrypt(self.pad(text).encode()) + + # 因为AES加密时候得到的字符串不一定是ascii字符集的,输出到终端或者保存时候可能存在问题 + # 所以这里统一把加密后的字符串转化为16进制字符串 + return b2a_hex(self.ciphertext) + except Exception as e: + return e + + # 解密 + def decrypt(self, text): + try: + plain_text = self.cryptor.decrypt(a2b_hex(text)).decode() + return plain_text.rstrip('\0') + except Exception as e: + return e + + +if __name__ == '__main__': + pc = Crypto('kingsome') # 初始化密钥 + e = pc.encrypt("123456") + d = pc.decrypt(e) + print(e, d) diff --git a/mftp.py b/mftp.py new file mode 100644 index 0000000..4677794 --- /dev/null +++ b/mftp.py @@ -0,0 +1,106 @@ +#!/usr/bin/python +# -*- coding:utf-8 _*- +""" +@author: pengtao +@file: mftp.py +@time: 2020/04/07 +""" + +import os +import sys +import time +import socket +import ftplib + + +class MyFTP: + def __init__(self, data): + self.host = data['host'] + self.port = data.get('port', 21) + self.ftp_dirs = data['ftp_dirs'] + self.user = data['user'] + self.passwd = data['passwd'] + + def get_file_size(self, type='text'): + with ftplib.FTP(self.ftp_dirs) as ftp: + try: + ftp.login(self.user, self.passwd) + if type == 'text': + size = ftp.size('debian/README') + elif type == "binary": + # TYPE A for ASCII mode + ftp.sendcmd('TYPE I') + size = ftp.size('debian/ls-lR.gz') + else: + return None + except ftplib.all_errors as e: + print('FTP error:', e) + return size + + def create_director(self): + with ftplib.FTP(self.ftp_dirs) as ftp: + try: + ftp.login(self.user, self.passwd) + wdir = ftp.pwd() + print(wdir) + + ftp.cwd('debian') + + wdir2 = ftp.pwd() + print(wdir2) + + except ftplib.all_errors as e: + print('FTP error:', e) + + def upload(self): + with ftplib.FTP(self.ftp_dirs) as ftp: + filename = 'README' + try: + ftp.login('user7', 's$cret') + ftp.cwd(self.ftp_path) + ftp.set_pasv(1) + local_file = os.path.join(self.local, filename) + with open(local_file, 'rb') as fp: + res = ftp.storlines("STOR " + filename, fp) + if not res.startswith('226 Transfer complete'): + print('Upload failed') + except ftplib.all_errors as e: + print('FTP error:', e) + + def download(self): + with ftplib.FTP(self.ftp_dirs) as ftp: + file_orig = '/debian/README' + file_copy = 'README' + try: + ftp.login() + with open(file_copy, 'w') as fp: + res = ftp.retrlines('RETR ' + file_orig, fp.write) + if not res.startswith('226 Transfer complete'): + print('Download failed') + if os.path.isfile(file_copy): + os.remove(file_copy) + + except ftplib.all_errors as e: + print('FTP error:', e) + + if os.path.isfile(file_copy): + os.remove(file_copy) + + +if __name__ == "__main__": + data = {'host': '10.10.3.10', 'user': 'ftp', 'passwd': 'kingsome2018'} + my_ftp = MyFTP(data) + + # 下载单个文件 + my_ftp.download_file("G:/ftp_test/XTCLauncher.apk", "/App/AutoUpload/ouyangpeng/I12/Release/XTCLauncher.apk") + + # 下载目录 + # my_ftp.download_file_tree("G:/ftp_test/", "App/AutoUpload/ouyangpeng/I12/") + + # 上传单个文件 + # my_ftp.upload_file("G:/ftp_test/Release/XTCLauncher.apk", "/App/AutoUpload/ouyangpeng/I12/Release/XTCLauncher.apk") + + # 上传目录 + # my_ftp.upload_file_tree("G:/ftp_test/", "/App/AutoUpload/ouyangpeng/I12/") + + my_ftp.close() diff --git a/mlog.py b/mlog.py new file mode 100644 index 0000000..c8ce71e --- /dev/null +++ b/mlog.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +import sys +import logging + +logging.basicConfig(level=logging.INFO, + format='%(asctime)s %(filename)s : LINE %(lineno)-4d %(levelname)s %(message)s', + datefmt='%Y-%m-%d %A %H:%M:%S', filename="{0}.log".format(sys.argv[0]), filemode='a') +console = logging.StreamHandler() +console.setLevel(logging.DEBUG) # 屏幕显示的级别 +formatter = logging.Formatter('%(asctime)s %(filename)s : LINE %(lineno)-4d %(levelname)s %(message)s') +console.setFormatter(formatter) +logging.getLogger().addHandler(console) +log = logging.getLogger(__name__) + + +def get_log(): + logging.basicConfig(level=logging.INFO, + format='%(asctime)s %(filename)s : LINE %(lineno)-4d %(levelname)s %(message)s', + datefmt='%Y-%m-%d %A %H:%M:%S', filename="/data/logs/1.log", filemode='a') + console = logging.StreamHandler() + console.setLevel(logging.DEBUG) # 屏幕显示的级别 + formatter = logging.Formatter('%(asctime)s %(filename)s : LINE %(lineno)-4d %(levelname)s %(message)s') + console.setFormatter(formatter) + logging.getLogger().addHandler(console) + log = logging.getLogger(__name__) + return log + + +def define_logger(filename="/data/logs/mylog.log"): + logger = logging.getLogger("main") + logger.setLevel(logging.INFO) + # 设置输出格式 + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + + # 设置日志文件处理器 + fh = logging.FileHandler(filename) + fh.setFormatter(formatter) # 为这个处理器添加格式 + + # 设置屏幕stdout输出处理器 + sh = logging.StreamHandler(stream=None) + sh.setFormatter(formatter) + + # 把处理器加到logger上 + logger.addHandler(fh) + logger.addHandler(sh) + diff --git a/mmail.py b/mmail.py new file mode 100644 index 0000000..900f390 --- /dev/null +++ b/mmail.py @@ -0,0 +1,23 @@ +# -*- coding: utf-8 -*- + +def send_mail(sub, body, send_to): + import requests + url = "http://10.10.3.16:50010/mail/api/v2.0/send" + data = {"sub": sub, "content": body, "sendto": send_to} + r = requests.post(url=url, data=data) + if r.status_code == requests.codes.ok: + print(True) + else: + print(False) + + +def send_ops_mail(sub, body, send_to): + import requests + url = "http://10.10.3.10:5011/mail/api/v2.0/alarm" + data = {"sub": sub, "content": body, "sendto": send_to} + r = requests.post(url=url, data=data) + if r.status_code == requests.codes.ok: + print(True) + else: + print(False) + diff --git a/mmongo.py b/mmongo.py new file mode 100644 index 0000000..f3b0396 --- /dev/null +++ b/mmongo.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python +# -*- coding:utf-8 -*- + +import pymongo +import re +from bson.json_util import loads # load MongoDB Extended JSON data + + +class MongodbBase: + + def __init__(self, **args): + self.host = args.get('host') + try: + self.port = int(args.get('port')) + except Exception as e: + raise Exception("get args about port failed,{}".format(e)) + self.dbname = args.get('dbname') + self.collname = args.get('collname', self.dbname) + + try: + # 建立mongodb连接 选择对应的集合 + self.client = pymongo.MongoClient(self.host, self.port) + self.db = self.client[self.dbname] + self.coll = self.db[self.collname] + except Exception as e: + raise ValueError( + 'mongodb connect {}{}{}{} error,out was {1}'.format(self.host, self.port, self.db, self.coll, e)) + + def __del__(self): + self.client.close() + + def select(self, myquery): + try: + data = self.coll.find(myquery={}) + return data + except Exception as e: + print("get data error ,{}".format(e)) + return None + + # 查询 + def query(self, qc=list()): + try: + if isinstance(qc, list): + rel = self.coll.aggregate(qc) + return rel + else: + raise ValueError("arg type is not list!") + except Exception as e: + return e + + # 更新 + def update(self, match_data, update_data): + try: + rel = self.coll.update_many(match_data, {"$set": update_data}) + return "匹配到{0}条数据,修改{1}条数据".format(rel.matched_count, rel.modified_count) + except Exception as e: + return e + + # 插入 + def insert(self, inert_data): + try: + if isinstance(inert_data, list): + rel = self.coll.insert_many(inert_data) + return rel.inserted_ids + else: + raise ValueError("arg type is not list!") + except Exception as e: + return e + + # 导入json数据 + def insert_file(self, file): + try: + ids = self.coll.distinct('_id') + f = open(file, 'r', encoding='utf-8') + data = list() + tmp_data = list() + for i in f: + d = loads(i) + if d['_id'] in ids: + tmp_data.append(d['_id']) + else: + data.append(d) + if tmp_data: + return "数据中包含已存在ObjectId" + else: + rel = self.coll.insert_many(data) + return rel.acknowledged + except Exception as e: + return e + + # 删除 + def delete(self, del_data): + try: + rel = self.coll.delete_many(del_data) + return "{0}个条目已删除".format(rel.deleted_count) + except Exception as e: + return e + + +if __name__ == "__main__": + args = dict() + args['host'] = '127.0.0.1' + # args['host'] = '10.10.5.4' + args['port'] = 27017 + # args['dbname'] = 'dalmatian-production' + args['dbname'] = 'test' + # args['collname'] = 'redemption_record' + match_data = {'ppp': '99999999'} + update_data = {'ppp': '7777777'} + + # 查询条件 + qc = [{"$addFields": {"ouid": {"$toObjectId": "$uid"}}}, + {'$lookup': {'from': 'users', 'localField': 'ouid', 'foreignField': '_id', 'as': 'userinfo'}}] + m = MongodbBase(**args) + # r = m.update(match_data, update_data) + r = m.query() + ii = m.insert_file(file='random_users.json') + print(ii) # for i in r: # print(i) diff --git a/mmysql.py b/mmysql.py new file mode 100644 index 0000000..6f9c5fe --- /dev/null +++ b/mmysql.py @@ -0,0 +1,175 @@ +#!/usr/bin/env python +# -*- coding:utf-8 -*- + +""" +date: 2016/07/11 +role: mysql的增删改查类 +usage: m = mysqlBase(host='xxx',db='xxx',user='xxx',pwd='xxx') 实例化 + m.insert('core',{'host_name':'ccc','process_name':'ddd','ip_addr':'192.168.136.41','status':'4'}) + m.update('table',{'field1':'value1','field2':'value2'},'id=1') 更新表名 字段名:值 条件 + m.delete('core','status=5 and id=12') + m.change("update core set a='aaa' where id=1") 可以多条插入 + m.query("select * from core") +""" +from .mlog import log +import warnings + +try: + import MySQLdb +except: + import pymysql + + pymysql.install_as_MySQLdb() + import MySQLdb + + +###mysql操作类 +class MysqlBase: + + ###连接数据库 + def __init__(self, **args): + + ###获取参数 + self.host = args.get('host', 'localhost') + self.user = args.get('user') + self.pswd = args.get('pswd') + self.db = args.get('db', 'mysql') + self.port = args.get('port', '3306') + self.charset = args.get('charset', 'utf8') + + try: + self.conn = MySQLdb.connect(host=self.host, user=self.user, passwd=self.pswd, db=self.db, + port=int(self.port), charset=self.charset) + + self.curs = self.conn.cursor() + + self.curs.execute('SET NAMES utf8') + except: + log.error('%s mysql connect error' % self.host) + raise ValueError('mysql connect error %s' % self.host) + + ###释放资源 + def __del__(self): + self.curs.close() + self.conn.close() + + ###插入 + + def insert(self, table, data): + _field = ','.join(['`%s`' % (k_insert) for k_insert in data.keys()]) + _value = ','.join(["'%s'" % (str(v_insert).replace("'", "\'")) for v_insert in data.values()]) + ###拼接成sql语句 + _sql = 'INSERT INTO `%s`(%s) VALUES(%s)' % (table, _field, _value) + + ###执行 + self.curs.lastrowid = 0 + try: + self.curs.execute(_sql) + ###提交 + self.conn.commit() + # log.info('%s insert ' % _sql) + + except: + self.conn.rollback() + log.error('%s insert error' % _sql) + raise ValueError('112,insert error %s' % _sql) + + return self.curs.lastrowid + + ###更新 + def update(self, table, data, condition): + _field = ','.join(["`%s`='%s'" % (k_update, str(data[k_update]).replace("'", "\'")) for k_update in data]) + + _sql = 'UPDATE `%s` SET %s WHERE %s' % (table, _field, condition) + ###执行 + resNum = 0 + try: + resNum = self.curs.execute(_sql) + ###提交 + self.conn.commit() + # log.info('%s update ' % _sql) + except Exception: + log.error("run {_sql} failed",exc_info=True) + self.conn.rollback() + # log.error('%s update error' % _sql) + raise ValueError('update error %s' % _sql) + + return resNum + + ###删除 + def delete(self, table, condition): + _sql = 'DELETE FROM `%s` WHERE %s' % (table, condition) + + ###执行 + resNum = 0 + try: + resNum = self.curs.execute(_sql) + ###提交 + self.conn.commit() + # log.info('%s delete ' % _sql) + + except: + self.conn.rollback() + log.error('%s delete error' % _sql) + raise ValueError('112,delete error %s' % _sql) + + return resNum + + ###直接给修改语句执行 + def change(self, sql, many=False): + ###过滤unknow table的warning + warnings.filterwarnings('ignore') + resNum = 0 + if many: + try: + ###多条同时插入 + resNum = self.curs.executemany(sql, many) + self.conn.commit() + # log.info('%s exec ' % sql) + + except: + self.conn.rollback() + log.error('%s exec error' % sql) + raise ValueError('exec error %s' % sql) + else: + try: + resNum = self.curs.execute(sql) + ###提交 + self.conn.commit() + # log.info('%s exec ' % sql) + + except: + self.conn.rollback() + log.error('%s exec error' % sql) + raise ValueError('112,exec error %s' % sql) + + return resNum + + ###查询 + def query(self, sql): + res = '' + try: + self.curs.execute(sql) + res = self.curs.fetchall() + # log.info('%s query ' % sql) + + except: + log.error('%s query error' % sql) + + # raise ValueError('query error %s'% sql) + + return res + + +if __name__ == "__main__": + args = dict() + args['host'] = '172.16.17.164' + args['user'] = 'miles' + args['pswd'] = 'aspect' + args['db'] = 'test' + sql_sel = "select * from bigdata_host limit 5" + m = MysqlBase(**args) + data = m.query(sql=sql_sel) + m.insert('bigdata_host', {'hostname': 'ccc', 'remark': 'ddd', 'up_addr_p': '192.168.136.41', 'states': '4', + 'enter_time': '2017-03-13'}) + m.delete('bigdata_host', 'hostname="ccc"') diff --git a/mping.py b/mping.py new file mode 100644 index 0000000..c1c4e25 --- /dev/null +++ b/mping.py @@ -0,0 +1,72 @@ +#!/usr/bin/python +# -*- coding:utf-8 _*- +""" +@author: pengtao +@file: mping.py +@time: 2020/03/26 +""" + +import os +import socket +import struct +import array + + +class Pinger(object): + def __init__(self, timeout=3): + self.timeout = timeout + self.__id = os.getpid() + self.__data = struct.pack('h', 1) # h代表2个字节与头部8个字节组成偶数可进行最短校验 + + @property + def __icmpSocket(self): # 返回一个可以利用的icmp原对象,当做属性使用 + icmp = socket.getprotobyname("icmp") # 指定服务 + sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp) # socket.SOCK_RAW原生包 + return sock + + + def __doCksum(self, packet): # 校验和运算 + words = array.array('h', packet) # 将包分割成2个字节为一组的网络序列 + sum = 0 + for word in words: + sum += (word & 0xffff) # 每2个字节相加 + sum = (sum >> 16) + (sum & 0xffff) # 因为sum有可能溢出16位所以将最高位和低位sum相加重复二遍 + sum += (sum >> 16) # 为什么这里的sum不需要再 & 0xffff 因为这里的sum已经是16位的不会溢出,可以手动测试超过65535的十进制数字就溢出了 + return (~sum) & 0xffff # 最后取反返回完成校验 + + @property + def __icmpPacket(self): # icmp包的构造 + header = struct.pack('bbHHh', 8, 0, 0, self.__id, 0) + packet = header + self.__data + cksum = self.__doCksum(packet) + header = struct.pack('bbHHh', 8, 0, cksum, self.__id, 0) # 将校验带入原有包,这里才组成头部,数据部分只是用来做校验所以返回的时候需要返回头部和数据相加 + return header + self.__data + + + def sendPing(self, target_host): + + try: + socket.gethostbyname(target_host) + + sock = self.__icmpSocket + sock.settimeout(self.timeout) + + packet = self.__icmpPacket + + sock.sendto(packet, (target_host, 1)) # 发送icmp包 + + ac_ip = sock.recvfrom(1024)[1][0] + print('[+] %s active' % (ac_ip)) + except Exception: + sock.close() + return False + return True + + +def main(): + pp = Pinger() + pp.sendPing('192.168.100.20') + + +if __name__ == '__main__': + main() diff --git a/mtelnet.py b/mtelnet.py new file mode 100644 index 0000000..5989c2c --- /dev/null +++ b/mtelnet.py @@ -0,0 +1,58 @@ +import logging +import telnetlib +import time + + +class TelnetClient(): + def __init__(self,): + self.tn = telnetlib.Telnet() + + # 此函数实现telnet登录主机 + def login_host(self,host_ip,username,password): + try: + # self.tn = telnetlib.Telnet(host_ip,port=23) + self.tn.open(host_ip,port=23) + except: + logging.warning('%s网络连接失败'%host_ip) + return False + # 等待login出现后输入用户名,最多等待10秒 + self.tn.read_until(b'login: ',timeout=10) + self.tn.write(username.encode('ascii') + b'\n') + # 等待Password出现后输入用户名,最多等待10秒 + self.tn.read_until(b'Password: ',timeout=10) + self.tn.write(password.encode('ascii') + b'\n') + # 延时两秒再收取返回结果,给服务端足够响应时间 + time.sleep(2) + # 获取登录结果 + # read_very_eager()获取到的是的是上次获取之后本次获取之前的所有输出 + command_result = self.tn.read_very_eager().decode('ascii') + if 'Login incorrect' not in command_result: + logging.warning('%s登录成功'%host_ip) + return True + else: + logging.warning('%s登录失败,用户名或密码错误'%host_ip) + return False + + # 此函数实现执行传过来的命令,并输出其执行结果 + def execute_some_command(self,command): + # 执行命令 + self.tn.write(command.encode('ascii')+b'\n') + time.sleep(2) + # 获取命令结果 + command_result = self.tn.read_very_eager().decode('ascii') + logging.warning('命令执行结果:\n%s' % command_result) + + # 退出telnet + def logout_host(self): + self.tn.write(b"exit\n") + +if __name__ == '__main__': + host_ip = '192.168.220.129' + username = 'root' + password = 'abcd1234' + command = 'whoami' + telnet_client = TelnetClient() + # 如果登录结果返加True,则执行命令,然后退出 + if telnet_client.login_host(host_ip,username,password): + telnet_client.execute_some_command(command) + telnet_client.logout_host() \ No newline at end of file diff --git a/mtga.py b/mtga.py new file mode 100644 index 0000000..b1497d2 --- /dev/null +++ b/mtga.py @@ -0,0 +1,293 @@ +# -*- coding: utf-8 -*- + +import requests +import json +import datetime +import os +from tgasdk.sdk import * +import pdb +import pymysql +from ops.mmysql import MysqlBase + + +# 初始化命令:pip install ThinkingDataSdk + +class TgaUserData(object): + def __init__(self): + self.gameid = 0 + self.accountid = "" + self.distinctid = "" + self._nickname = "" + self._sex = 0 + self._avatar_url = "" + self._city = "" + self._province = "" + self._country = "" + self.createtime = "" + self.is_hide = 0 + self.lastlogon = "" + self.firstlogonip = "" + # 未实现 先占坑 + self.is_real = 1 + self.unionid = "" + self.wid = "" + self.channel = 0 + self.sid = "" + self.ptid = "" + self.from_appid = "" + self.gamescore_his = 0 + self.gamescore_week = 0 + self.gamescore_last = 0 + + def check_byte(self, i): + if not i: + return "" + elif isinstance(i, bytes): + return pymysql.escape_string(i.decode()) + else: + return pymysql.escape_string(i) + + @property + def nickname(self): + return self._nickname + + @nickname.setter + def nickname(self, nickname): + self._nickname = self.check_byte(nickname) + + @property + def sex(self): + return self._sex + + @sex.setter + def sex(self, sex): + if sex: + try: + self._sex = int(sex) + except Exception as e: + print("ERROR get sex property from tga failed! output was {} ".format(e)) + self._sex = 0 + else: + self._sex = 0 + + @property + def city(self): + return self._city + + @city.setter + def city(self, city): + self._city = self.check_byte(city) + + @property + def avatar_url(self): + return self._avatar_url + + @avatar_url.setter + def avatar_url(self, avatar_url): + self._avatar_url = self.check_byte(avatar_url) + + @property + def province(self): + return self._province + + @province.setter + def province(self, province): + self._province = self.check_byte(province) + + @property + def country(self): + return self._country + + @country.setter + def country(self, country): + self._country = self.check_byte(country) + + @property + def distinctid(self): + return self._distinctid + + @distinctid.setter + def distinctid(self, distinctid): + self._distinctid = self.check_byte(distinctid) + + @property + def from_appid(self): + return self._from_appid + + @from_appid.setter + def from_appid(self, from_appid): + self._from_appid = self.check_byte(from_appid) + + +class CheckWords(object): + def __init__(self): + pass + + def run(self, item): + if not item: + return 0 + if isinstance(item, (str, bytes, int, float)): + try: + return int(item) + except Exception as e: + print("return values failed,output was {}".format(e)) + return 0 + + if isinstance(item, (list, tuple)): + return self.run(item[0]) + + +# 按提供的gameid从tga数据库中查询并输出其对应的url,suffix,appid,api_key,tgaid以dict形式存放 +class GetTgaConfig(): + def __init__(self): + self.url = "http://10.10.3.17:8992/querySql" + self.TGA = {'user': 'mytga', 'pswd': 'gzVwh4HGR68G', 'host': '10.10.3.5', 'db': 'tga'} + + def get_api_key(self, gameid, channel=6001): + item = {} + item['url'] = self.url + sql = "select suffix,appid,api_secret,tgaid from tgainfo where gameid={} and channelid={} and " \ + "in_used=1;".format(gameid, channel) + t = MysqlBase(**self.TGA) + data = t.query(sql) + if data: + item['suffix'], item['appid'], item['api_key'], item['tgaid'] = data[0] + return item + + +class FromTga: + def __init__(self, url, token): + if not token: + raise Exception("{0} token not found in env !") + self.url = url + self.token = token + self.output = "/data/logs/tga-report/" + + def get_data(self, sql): + data = {'token': self.token, 'sql': sql} + r = requests.post(self.url, data) + if r.status_code != requests.codes.ok: + print("connect tga failed!") + return None + + out = r.content.decode('utf-8') + if json.loads(out.split('\r\n')[0]).get('return_code', None) != 0: + # raise Exception("get data from tga failed!") + print("get data from tga failed!,sql was {}".format(sql)) + return None + + data_out = out.split('\r\n')[1:] + output = list() + for row in data_out: + if row: + try: + output.append(json.loads(row)) + except Exception as e: + print("转化数据失败,{} 提示为{}".format(row, e)) + return output + + def init_tga_write(self, tgaid): + ''' + from tgasdk.sdk import TGAnalytics, BatchConsumer, LoggingConsumer, AsyncBatchConsumer + 也可引入TGAnalytics与指定的Consumer + ''' + # now = datetime.date.today().strftime('%Y%m%d%H%M%S') + # # 初始化SDK + # filename = "{}/report_{}_{}.log".format(self.output, project, now) + # 检查目录是否存在,如不存在创建之 + paths = "{}/{}".format(self.output, tgaid) + if not os.path.isdir(paths): + os.makedirs(paths, mode=0o755) + + self.tga = TGAnalytics(LoggingConsumer(paths)) + + def _close_tga(self, tga): + self.tga.flush() + self.tga.close() + + def _split_user_data(self, data): + distinct_id = data.get('distinct_id', None) + account_id = data.get('account_id', None) + + if not (distinct_id or account_id): + print("distinct_id 或 account_id 必须有一则有值!") + return None + + if distinct_id: + data.pop('distinct_id') + if account_id: + data.pop('account_id') + + return distinct_id, account_id, data + + def put_event_data(self, data, event_name="Payment"): + # tga = self._init_tga_write() + try: + distinct_id, account_id, new_data = self._split_user_data(data) + except Exception as e: + print("拆解数据错误,输出为{}.请检查!".format(e)) + return False + + # properties = { + # "#time": datetime.datetime.now(), + # "#ip": "192.168.1.1", + # "Product_Name": "月卡", + # "Price": 30, + # "OrderId": "abc_123" + # } + + # 上传事件,包含账号ID与访客ID + try: + self.tga.track(distinct_id, account_id, event_name, new_data) + except Exception as e: + print("write to tga failed,output was {}".format(e)) + # self._close_tga(tga) + return False + # finally: + # self._close_tga(tga) + return True + + def put_user_data(self, data, method='user_set'): + # tga = self._init_tga_write() + try: + distinct_id, account_id, new_data = self._split_user_data(data) + except Exception as e: + print("拆解数据错误,输出为{}.请检查!".format(e)) + return False + try: + if method.lower() == "user_set": + self.tga.user_set(account_id=account_id, distinct_id=distinct_id, properties=new_data) + elif method.lower() == "user_setonce": + self.tga.user_setOnce(account_id=account_id, distinct_id=distinct_id, properties=new_data) + elif method.lower() == "user_add": + self.tga.user_add(account_id=account_id, distinct_id=distinct_id, properties=new_data) + elif method.lower() == "user_del": + self.tga.user_del(account_id=account_id, distinct_id=distinct_id) + else: + print("请提供用户操作类型 [user_set/user_setOnce/user_add/user_del] !") + return False + except Exception as e: + print("write to tga failed,output was {}".format(e)) + return False + # finally: + # self._close_tga(tga) + return True + + + +def main(): + url = "http://10.10.3.17:8992/querySql" + # sql = "SELECT \"#server_time\",localuuid,ext FROM v_event_3 where \"#event_name\"='event_1_1'" + sql = "SELECT distinct from_appid FROM v_event_22 where \"$part_event\"='event_11_1' and gameid='2001' and \"$part_date\"='2019-06-18'" + token = "ESnhwwLtVu7zO2h6SSTEZ1jYagbOet0Kur0XnpG9fVJF5ROsqUkcNO0inVyFtQd1" + t = FromTga(url, token) + # t._init_tga_write() + # data={'account_id': 1012, 'distinct_id': 1012, 'gameid': 1012, 'from_appid': 'wx62d9035fd4fd2059', 'time': '2019-03-10', + # 'new_user': 2, 'active_user': 4, 'avg_runing_time': 0, 'old_user_login': 2, 'newuser_rungame': 2, + # 'newuser_rungame_rate': 100.0, 'newuser_qlty': 0} + # t.put_event_data(data) + print(json.dumps(t.get_data(sql))) + + +if __name__ == "__main__": + main() diff --git a/myredis.py b/myredis.py new file mode 100644 index 0000000..445aa0c --- /dev/null +++ b/myredis.py @@ -0,0 +1,61 @@ +# -*- coding: utf-8 -*- +import redis +import json +import logging +import pdb + +log = logging.getLogger(__name__) + +dev_conf = {"host": "10.10.9.5", "port": "6379"} + + +class Myredis(): + def __init__(self, conf): + if conf.get('passwd', None): + self.r = redis.Redis(host=conf['host'], port=conf['port'], password=conf['passwd'], charset='utf8', + db=conf.get('db') or 0, + decode_responses=True) + else: + self.r = redis.Redis(host=conf['host'], port=conf['port'], charset='utf8', db=conf.get('db') or 0, + decode_responses=True) + + def get_keys(self, keyword): + return self.r.keys(keyword) + + def get_one_key(self, key): + if self.r.type(key) == 'hash': + return self.r.hgetall(key) + elif self.r.type(key) == 'list': + return self.r.lrange(key) + elif self.r.type(key) == 'zset': + return self.r.smembers(key) + elif self.r.type(key) == 'string': + return self.r.get(key) + else: + # log.error(f"{key} {self.r.type(key)} not in define!") + return None + + def set_redis(self, k, v): + if isinstance(v, str): + self.r.set(k, v) + elif isinstance(v, list): + for i in v: + self.r.lpush(k, i) + elif isinstance(v, dict): + self.r.hmset(k, v) + elif isinstance(v, set): + for i in v: + self.r.sadd(k, i) + else: + log.error(f"{v} type not in define!") + + +def main(): + mr = Myredis(dev_conf) + keys = mr.get_keys("*") + for k in keys: + print(f"key={k},values={mr.get_one_key(k)}") + + +if __name__ == "__main__": + main() diff --git a/myrequests.py b/myrequests.py new file mode 100644 index 0000000..6fec055 --- /dev/null +++ b/myrequests.py @@ -0,0 +1,24 @@ +# -*- coding: utf-8 -*- +import requests +import json + + +def myrequests(url, data, methods="get"): + requests.adapters.DEFAULT_RETRIES = 5 # 增加重连次数 + s = requests.session() + s.keep_alive = False # 关闭多余连接 + + if methods == "get": + return s.get(url=url, data=data) + elif methods == "post": + if isinstance(data, "str"): + headers = {'Content-Type': 'application/json'} + return s.get(url=url, headers=headers, data=json.dumps(data)) + elif isinstance(data, ("tuple", "dict", "list")): + return s.get(url=url, json=data) + else: + print(f"pls input data not {type(data)}\t{data}") + return False + else: + print(f"pls input methods not {methods}") + return False diff --git a/plog.py b/plog.py new file mode 100644 index 0000000..8cd8de8 --- /dev/null +++ b/plog.py @@ -0,0 +1,37 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import +import logging + + +def ini_log(filename="/data/logs/aa.log"): + logging.basicConfig(level=logging.ERROR, + format='%(asctime)s %(filename)s : LINE %(lineno)-4d %(levelname)s %(message)s', + datefmt='%Y-%m-%d %A %H:%M:%S', filename=filename, filemode='a') + console = logging.StreamHandler() + console.setLevel(logging.info) # 屏幕显示的级别 + formatter = logging.Formatter('%(filename)s : LINE %(lineno)-4d %(levelname)s %(message)s') + console.setFormatter(formatter) + logging.getLogger().addHandler(console) + +def define_logger(filename="/data/logs/aa.log",debug=True): + logger = logging.getLogger("") + if debug==True: + logger.setLevel(logging.INFO) + else: + logger.setLevel(logging.ERROR) + + # 设置输出格式 + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + + # 设置日志文件处理器 + fh = logging.FileHandler(filename) + fh.setFormatter(formatter) # 为这个处理器添加格式 + + # 设置屏幕stdout输出处理器 + formatter_stdout = logging.Formatter('%(name)s - %(levelname)s - %(message)s') + sh = logging.StreamHandler(stream=None) + sh.setFormatter(formatter_stdout) + + # 把处理器加到logger上 + logger.addHandler(fh) +# logger.addHandler(sh) diff --git a/qcloud_api.py b/qcloud_api.py new file mode 100644 index 0000000..d1515aa --- /dev/null +++ b/qcloud_api.py @@ -0,0 +1,53 @@ +# -*- coding: utf8 -*- +# author:tan +# date:2018-12-10 + +""" +腾讯云api通用类 +""" + +from ops.qcloud_sign import QcSign +import requests +import json +import random +import time +import os + + +# 腾讯云api操作 +class QcApi(object): + def __init__(self, endpoint, data, v='3.0'): + self.secret_id = os.environ.get('SECRET_ID') + self.secret_key = os.environ.get('SECRET_KEY') + self.endpoint = endpoint + self.method = "POST" + self.version = v + + self.data = dict() + for i in data: + self.data[i] = data.get(i) + + # 公共参数 + if self.version == '2.0': + if not data.get('Region'): + self.data['Region'] = 'bj' + else: + if not data.get('Region'): + self.data['Region'] = 'ap-beijing' + if self.version != '3.0': + self.data['Version'] = self.version + else: + self.data['Version'] = '2017-03-12' + self.data['SecretId'] = self.secret_id + self.data['Timestamp'] = int(time.time()) # 时间戳 + self.data['Nonce'] = random.randint(1000, 1000000) # 随机正整数 + sign = QcSign(self.secret_key, self.method, self.endpoint, self.data, self.version) + self.data['Signature'] = sign.sign_str() + + def workflow(self): + try: + r = requests.post('https://' + self.endpoint, data=self.data, timeout=10) + rel = json.loads(r.content.decode('utf-8')) + return rel + except: + return False \ No newline at end of file diff --git a/qcloud_api_v2.py b/qcloud_api_v2.py new file mode 100644 index 0000000..a964412 --- /dev/null +++ b/qcloud_api_v2.py @@ -0,0 +1,104 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import +import os + +from tencentcloud.common import credential +from tencentcloud.common.profile.client_profile import ClientProfile +from tencentcloud.common.profile.http_profile import HttpProfile +from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException +from tencentcloud.cvm.v20170312 import cvm_client, models +from ops.mmysql import MysqlBase + + +class Qclound(): + def __init__(self, area="ap-beijing"): + secretid = os.getenv("miles_id") + secretkey = os.getenv("miles_key") + + cred = credential.Credential(secretid, secretkey) + httpProfile = HttpProfile() + httpProfile.endpoint = "cvm.tencentcloudapi.com" + + clientProfile = ClientProfile() + clientProfile.httpProfile = httpProfile + self.client = cvm_client.CvmClient(cred, area, clientProfile) + + def start_instance(self, instanceid): + try: + req = models.StartInstancesRequest() + params = '{"InstanceIds":["%s"]}' % instanceid + req.from_json_string(params) + + resp = self.client.StartInstances(req) + print(resp.to_json_string()) + data = resp.to_json_string() + return True, data + + except TencentCloudSDKException as err: + print(err) + return False, err + + def stop_instance(self, instanceid): + try: + req = models.StopInstancesRequest() + params = '{"InstanceIds":["%s"],"StoppedMode":"STOP_CHARGING"}' % instanceid + req.from_json_string(params) + + resp = self.client.StopInstances(req) + print(resp.to_json_string()) + data = resp.to_json_string() + return True, data + except TencentCloudSDKException as err: + print(err) + return True, err + + def desc_instance(self, instanceid): + try: + req = models.DescribeInstancesStatusRequest() + params = '{"InstanceIds":["%s"]}' % instanceid + req.from_json_string(params) + + resp = self.client.DescribeInstancesStatus(req) + print(resp.to_json_string()) + data = resp.InstanceStatusSet[0].InstanceState + return True, data + except TencentCloudSDKException as err: + print(err) + return True, err + + def get_all_instance(self): + all = {} + try: + req = models.DescribeInstancesRequest() + for i in range(10): + params = '{"Offset":%s,"Limit":20}' % str(i) + req.from_json_string(params) + resp = self.client.DescribeInstances(req) + if resp: + for item in resp.InstanceSet: + if item: + all[item.PrivateIpAddresses[0]] = item.InstanceId + else: + break + except TencentCloudSDKException as err: + print(err) + return all + + def get_instanceid(self, ip): + all = self.get_all_instance() + if ip in all.keys(): + return all[ip] + else: + return None + + +if __name__ == "__main__": + qc = Qclound('ap-beijing') + # aa = qc.get_all_instance() + #print(aa) + id = qc.get_instanceid('10.10.7.17') + print("2 {}".format(id)) + if id: + qc.desc_instance(id) + qc.stop_instance(id) + #qc.start_instance(id) diff --git a/qcloud_data_api.py b/qcloud_data_api.py new file mode 100644 index 0000000..f45d410 --- /dev/null +++ b/qcloud_data_api.py @@ -0,0 +1,65 @@ +# -*- coding: utf-8 -*- +import requests +import redis +import json +import pdb + +TOKEN_REDIS = {'host': '10.10.3.10', 'port': 6379, 'db': 3} + + +class QcloudDataApi(): + def __init__(self, appid, secret): + self.appid = appid + self.secret = secret + pool = redis.ConnectionPool(host=TOKEN_REDIS['host'], port=TOKEN_REDIS['port'], db=TOKEN_REDIS['db'], + decode_responses=True) + self.redis = redis.Redis(connection_pool=pool) + self.token = self.get_access_token() + + def get_access_token(self): + token = self.redis.get(self.appid) + if not token: + token_url = "https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={}&secret={" \ + "}".format(self.appid, self.secret) + r = requests.get(token_url) + if r.status_code == 200: + data = r.json() + try: + self.redis.set(self.appid, data['access_token']) + self.redis.expire(self.appid, data['expires_in']) + token = data['access_token'] + except Exception as e: + print("write token to redis failed ,{}!".format(e)) + token = None + return token + + def get_data(self): + if not self.token: + print("get token failed ,pls check it first!") + return False + self.get_getUserPortrait() + + def get_getUserPortrait(self): + url1="https://api.weixin.qq.com/datacube/getweanalysisappiddailyretaininfo?access_token={}".format(self.token) + url2="https://api.weixin.qq.com/datacube/getweanalysisappiddailyvisittrend?access_token={}".format(self.token) + url = "https://api.weixin.qq.com/datacube/getweanalysisappiduserportrait?access_token={}".format(self.token) + post_data = {} + post_data['access_token'] = self.token + post_data['end_date'] = '20190617' + post_data['begin_date'] = '20190610' + headers = {'Content-Type': 'application/json'} + r = requests.post(url=url2, headers=headers, data=json.dumps(post_data)) + if r.status_code == 200: + data = r.json() + print("{}".format(data)) + + +def main(): + appid = "wxf567b46cecf6ec58" + secret = "dea5e1a99debd567eb4532a032bdc62d" + qc = QcloudDataApi(appid, secret) + qc.get_data() + + +if __name__ == "__main__": + main() diff --git a/qcloud_sign.py b/qcloud_sign.py new file mode 100644 index 0000000..544f8f3 --- /dev/null +++ b/qcloud_sign.py @@ -0,0 +1,35 @@ +# -*- coding: utf8 -*- +# author:tan +# date:2018-12-10 +# https://cloud.tencent.com/document/api/213/15693 + +""" +腾讯云api登陆鉴权 +""" + +import base64 +import hashlib +import hmac + + +# 接口鉴权 +class QcSign: + def __init__(self, secret_key, method, endpoint, params, v='3.0'): + self.method = method + self.endpoint = endpoint + self.sign_method = hashlib.sha1 + self.secret_key = secret_key + self.params = params + self.version = v + + # 生成签名串 + def sign_str(self): + if self.version == '2.0': + s = self.method + self.endpoint + "?" + else: + s = self.method + self.endpoint + "/?" + query_str = "&".join("%s=%s" % (k, self.params[k]) for k in sorted(self.params)) + # 拼接签名原文字符串 请求方法 + 请求主机 +请求路径 + ? + 请求字符串 + string_to_sign = s + query_str + hmac_str = hmac.new(self.secret_key.encode("utf8"), string_to_sign.encode("utf8"), self.sign_method).digest() + return base64.b64encode(hmac_str) \ No newline at end of file diff --git a/vcs.py b/vcs.py new file mode 100644 index 0000000..77e4c80 --- /dev/null +++ b/vcs.py @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*- +from .mlog import log +from .common import run_cmd +import os +import shutil + + +class GitRepository(): + def __init__(self): + self.base_dir = "/data/publish/git" + + def checkout(self, git_addr, project, tag): + git_dir = "{0}/{1}".format(self.base_dir, tag) + if os.path.isdir(git_dir): + shutil.rmtree(git_dir) + + clone_cmd = "cd {0} && git clone --recursive --branch {2} {1} {2} ".format(self.base_dir, git_addr, tag) + status, out = run_cmd(clone_cmd) + if status: + log.info("git clone {0} success".format(project)) + else: + log.error("git clone {0} Failed,out was {1}".format(project, out)) + raise Exception("git clone Failed") + + # tag_dir = "{0}/{1}".format(self.base_dir, tag) + # get_tag_cmd = "cd {0} && git checkout {1} && git submodule update ".format(tag_dir, tag) + # status, out = run_cmd(get_tag_cmd) + # if status: + # log.info("git tag {0} {1} success".format(project, tag)) + # else: + # log.error("git tag {0} {1} Failed,out was {2}".format(project, tag, out)) + # raise Exception("git tag Failed") + + return True + + +class SubversionRepository(): + def __init__(self): + self.base_dir = "/data/publish/svn" + + def checkout(self, svn_add, project, tag, git_dir=None): + # 实际tag无用,只记录,显示 + if not git_dir: + git_dir = "{0}/{1}".format(self.base_dir, project) + check_out_cmd = "svn export --force --no-auth-cache --non-interactive {0} {1}".format(svn_add, git_dir) + status, out = run_cmd(check_out_cmd) + if status: + log.info("svn co {0} ,tag {1} success".format(project, tag)) + return True + else: + log.error("svn co {0} {1} Failed,out was {2}".format(project, tag, out)) + raise Exception("SVN checkout Failed!")