From 30ffb7062e55e6cf004c25f75cb81c0203cc5692 Mon Sep 17 00:00:00 2001 From: pengtao Date: Fri, 17 Apr 2020 11:37:13 +0800 Subject: [PATCH] new --- __init__.py | 1 + add_path.py | 19 +++ aliyun_api.py | 85 +++++++++++++ clearn.py | 34 ++++++ clog.py | 15 +++ collect_data_ad.py | 86 +++++++++++++ common.py | 121 +++++++++++++++++++ gamedata2001.py | 38 ++++++ gamedatabase.py | 38 ++++++ mansible.py | 230 +++++++++++++++++++++++++++++++++++ mcrypto.py | 55 +++++++++ mftp.py | 106 ++++++++++++++++ mlog.py | 46 +++++++ mmail.py | 23 ++++ mmongo.py | 118 ++++++++++++++++++ mmysql.py | 175 +++++++++++++++++++++++++++ mping.py | 72 +++++++++++ mtelnet.py | 58 +++++++++ mtga.py | 293 +++++++++++++++++++++++++++++++++++++++++++++ myredis.py | 61 ++++++++++ myrequests.py | 24 ++++ plog.py | 37 ++++++ qcloud_api.py | 53 ++++++++ qcloud_api_v2.py | 104 ++++++++++++++++ qcloud_data_api.py | 65 ++++++++++ qcloud_sign.py | 35 ++++++ vcs.py | 52 ++++++++ 27 files changed, 2044 insertions(+) create mode 100644 __init__.py create mode 100644 add_path.py create mode 100644 aliyun_api.py create mode 100644 clearn.py create mode 100644 clog.py create mode 100644 collect_data_ad.py create mode 100644 common.py create mode 100644 gamedata2001.py create mode 100644 gamedatabase.py create mode 100644 mansible.py create mode 100644 mcrypto.py create mode 100644 mftp.py create mode 100644 mlog.py create mode 100644 mmail.py create mode 100644 mmongo.py create mode 100644 mmysql.py create mode 100644 mping.py create mode 100644 mtelnet.py create mode 100644 mtga.py create mode 100644 myredis.py create mode 100644 myrequests.py create mode 100644 plog.py create mode 100644 qcloud_api.py create mode 100644 qcloud_api_v2.py create mode 100644 qcloud_data_api.py create mode 100644 qcloud_sign.py create mode 100644 vcs.py diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..7c68785 --- /dev/null +++ b/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- \ No newline at end of file diff --git a/add_path.py b/add_path.py new file mode 100644 index 0000000..035f196 --- /dev/null +++ b/add_path.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- +import sysconfig +import os +import subprocess + +lib_path = sysconfig.get_paths()['purelib'] + +if not lib_path: + raise Exception("python lib like site-packages not found!") + +current_path = os.path.abspath('.') + +cmd = "ln -s {0} {1}/ops".format(current_path, lib_path) +cmdref = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, + shell=True) +output, error_info = cmdref.communicate() +if cmdref.returncode != 0: + error_msg = "run cmd {0} failed,error was {1}".format(cmd, error_info) + raise Exception(error_msg) diff --git a/aliyun_api.py b/aliyun_api.py new file mode 100644 index 0000000..07497cb --- /dev/null +++ b/aliyun_api.py @@ -0,0 +1,85 @@ +from oss2 import SizedFileAdapter, determine_part_size +from oss2.models import PartInfo +import oss2 +from aliyunsdkcore.client import AcsClient +from aliyunsdkcdn.request.v20180510 import RefreshObjectCachesRequest +from ops.mlog import log +import os +import json +import hashlib +import base64 + +""" +pip install oss2 aliyun-python-sdk-core aliyun-python-sdk-cdn + +阿里云api通用类 +""" + + +class AliApi(object): + def __init__(self, access_key_id=None, access_key_secret=None): + self.access_key_id = access_key_id or os.environ.get('ALI_ACCESS_KEY_ID') + self.access_key_secret = access_key_secret or os.environ.get('ALI_ACCESS_KEY_SECRET') + + def calculate_file_md5(self, file_name, block_size=64 * 1024): + """计算文件的MD5 + :param file_name: 文件名 + :param block_size: 计算MD5的数据块大小,默认64KB + :return 文件内容的MD5值 + """ + with open(file_name, 'rb') as f: + md5 = hashlib.md5() + while True: + data = f.read(block_size) + if not data: + break + md5.update(data) + + return base64.b64encode(md5.digest()).decode() + + def oss_upload(self, endpoint, bucket_name, file, key): + auth = oss2.Auth(self.access_key_id, self.access_key_secret) + bucket = oss2.Bucket(auth, endpoint, bucket_name) + + total_size = os.path.getsize(file) + # determine_part_size方法用来确定分片大小。 + part_size = determine_part_size(total_size, preferred_size=100 * 1024) + + # 初始化分片。 + upload_id = bucket.init_multipart_upload(key).upload_id + parts = [] + + try: + # 逐个上传分片。 + with open(file, 'rb') as fileobj: + part_number = 1 + offset = 0 + while offset < total_size: + num_to_upload = min(part_size, total_size - offset) + # SizedFileAdapter(fileobj, size)方法会生成一个新的文件对象,重新计算起始追加位置。 + result = bucket.upload_part(key, upload_id, part_number, SizedFileAdapter(fileobj, num_to_upload), + headers={'Content-MD5': self.calculate_file_md5(file)}) + parts.append(PartInfo(part_number, result.etag)) + + offset += num_to_upload + part_number += 1 + + # 完成分片上传。 + bucket.complete_multipart_upload(key, upload_id, parts) + log.info("{0} upload complete".format(file)) + except: + log.info("{0} upload failed".format(file)) + + def cdn_refresh_dir(self, refresh_url, region='cn-shanghai'): + client = AcsClient(self.access_key_id, self.access_key_secret, region) + + request = RefreshObjectCachesRequest.RefreshObjectCachesRequest() + request.set_ObjectPath(refresh_url) + if refresh_url.endswith('/'): + request.set_ObjectType('Directory') + else: + request.set_ObjectType('File') + response = client.do_action_with_exception(request) + rel = json.loads(response.decode('utf-8')) + return rel + diff --git a/clearn.py b/clearn.py new file mode 100644 index 0000000..6b4dd7b --- /dev/null +++ b/clearn.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- + +import os +import sys +import pdb + +current_path = sys.argv[1] +remove_list = ('pyc', 'old', 'log1') +remove_files = [] + + +def searchDirFile(rootDir, saveDir): + for dir_or_file in os.listdir(rootDir): + filePath = os.path.join(rootDir, dir_or_file) + + if os.path.isfile(filePath): + if os.path.basename(filePath).endswith() in remove_list: + print('imgBox fileName is ' + os.path.basename(filePath)) + remove_files.append(os.path.join(rootDir, filePath)) + else: + continue + elif os.path.isdir(filePath): + searchDirFile(filePath, saveDir) + else: + print('not file and dir ' + os.path.basename(filePath)) + + + +remove_files = searchDirFile(current_path, remove_files) + +pdb.set_trace() +print(f"{remove_files}") # if remove_files: +# for item in remove_files: +# os.remove(item) diff --git a/clog.py b/clog.py new file mode 100644 index 0000000..cbbaec4 --- /dev/null +++ b/clog.py @@ -0,0 +1,15 @@ +import logging.handlers + + +def get_logger(task_id): + logger = logging.getLogger(__file__) + logger.handlers.clear() + http_handler = logging.handlers.HTTPHandler( + 'ops.kingsome.cn', + '/api/log/?type=task&task_id={}'.format(task_id), + method='GET', + ) + logger.setLevel(logging.INFO) + logger.addHandler(http_handler) + + return logger diff --git a/collect_data_ad.py b/collect_data_ad.py new file mode 100644 index 0000000..4f52e35 --- /dev/null +++ b/collect_data_ad.py @@ -0,0 +1,86 @@ +# -*- coding: utf-8 -*- +import datetime +from ops.plog import define_logger +import logging +import os +from ops.mtga import FromTga, GetTgaConfig + +if os.path.isdir("/data/logs"): + define_logger("/data/logs/collect_data.log") +else: + define_logger("/tmp/collect_data.log") + +log = logging.getLogger(__name__) + + +class CollectGameDataAD: + def __init__(self, **kwargs): + self.gameid = kwargs.get('gameid', 0) + self.channelid = kwargs.get('channelid', 6001) + self.times = kwargs.get('times', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + self.date = datetime.datetime.strptime(self.times, "%Y-%m-%d %H:%M:%S").strftime("%Y-%m-%d") + self.ad_channel = kwargs.get('ad_channel', '') + + gc = GetTgaConfig() + self.url = gc.url + item = gc.get_api_key(self.gameid, self.channelid) + self.suffix = item.get('suffix', None) + self.tga = FromTga(url=self.url, token=item.get('api_key', None)) + + def _run_tga_sql(self, sql): + data = self.tga.get_data(sql) + try: + if data: + return int(data[0][0]) + else: + return 0 + except Exception: + log.error(f"get data ={data} error!", exc_info=True) + return 0 + + def get_DAU(self): + sql = f"""SELECT count(distinct \"#account_id\") FROM v_event_{self.suffix} where "$part_event"='event_11_1' AND gameid='{self.gameid}' AND ad_channel='{self.ad_channel}' AND "#server_time" BETWEEN TIMESTAMP '{self.date} 00:00:00' AND TIMESTAMP '{self.times}'""""" + return self._run_tga_sql(sql) + + + def get_registerUser(self): + sql = f"""SELECT count(distinct "#user_id") FROM v_user_{self.suffix} where gameid='{self.gameid}' AND ad_channel='{self.ad_channel}' AND account_register_time <= TIMESTAMP '{self.times}'""" + return self._run_tga_sql(sql) + + + def get_newuser(self): + sql = f"""SELECT count(distinct "#account_id") FROM v_event_{self.suffix} where gameid='{self.gameid}' and "$part_event"='event_11_1' and ad_channel='{self.ad_channel}' and account_register_date between TIMESTAMP'{self.date} 00:00:00' and TIMESTAMP'{self.times}'""" + return self._run_tga_sql(sql) + + def get_loginuser(self): + sql = f"""SELECT count(1) FROM v_event_{self.suffix} where gameid='{self.gameid}' and +"$part_event"='event_11_1' and ad_channel='{self.ad_channel}' and "#server_time" BETWEEN TIMESTAMP '{self.date} 00:00:00' AND TIMESTAMP '{self.times}'""" + return self._run_tga_sql(sql) + + def get_sharenum(self): + sql = f"""SELECT count(1) FROM v_event_{self.suffix} where "$part_event"='event_11_10' and gameid='{self.gameid}' and ad_channel='{self.ad_channel}' and "#server_time" BETWEEN TIMESTAMP '{self.date} 00:00:00' AND TIMESTAMP '{self.times}'""" + print(f"1 {sql}") + return self._run_tga_sql(sql) + + + def get_sharebypeople(self): + sql = f"""SELECT count(distinct \"#account_id\") FROM v_event_{self.suffix} where "$part_event"='event_11_10' and gameid='{self.gameid}' and ad_channel='{self.ad_channel}' and "#server_time" BETWEEN TIMESTAMP '{self.date} 00:00:00' AND TIMESTAMP '{self.times}'""" + return self._run_tga_sql(sql) + + def get_retain(self, next_step=1): + now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + btimes = (datetime.datetime.strptime(now, '%Y-%m-%d %H:%M:%S') - datetime.timedelta(days=next_step)).strftime('%Y-%m-%d') + login_sql = f"""select count(distinct "#account_id") from v_event_{self.suffix} where +"$part_event"='event_11_1' AND gameid='{self.gameid}' AND ad_channel='{self.ad_channel}' AND account_register_date +BETWEEN TIMESTAMP'{btimes} 00:00:00' AND TIMESTAMP'{btimes} 23:59:59' AND "$part_date"='{now[:10]}' """ + newuser_sql = f"""select count(distinct "#account_id") from v_event_{self.suffix} where gameid='{self.gameid}' AND ad_channel='{self.ad_channel}' AND account_register_date BETWEEN TIMESTAMP'{btimes} 00:00:00' AND TIMESTAMP'{btimes} 23:59:59'""" + loguser = self._run_tga_sql(login_sql) + newuser = self._run_tga_sql(newuser_sql) + try: + retain_rate = round((100 * loguser / newuser), 2) + except Exception: + retain_rate = 0 + log.error("collect retain failed!", exc_info=True) + print(f"2 {retain_rate} {loguser} {newuser}") + return retain_rate + diff --git a/common.py b/common.py new file mode 100644 index 0000000..d5787b6 --- /dev/null +++ b/common.py @@ -0,0 +1,121 @@ +# -*- coding: utf-8 -*- +import sys +from .mlog import log +import functools + + +def tarce(f): + @functools.wraps(f) + def decorated_function(*args, **kwargs): + print(f"func name was {__name__}, args was {f}, {args}, {kwargs}") + result = f(*args, **kwargs) + print(f"return was {result}!") + + return decorated_function + + +def send_ops_mail(sub, body, sendto): + import requests + mail_url = 'http://10.10.3.10:5011/mail/api/v2.0/alarm' + data = {"sub": sub, "content": body, "sendto": sendto} + r = requests.post(url=mail_url, data=data) + if r.status_code == 200: + log.info("send mail success!") + + else: + err_msg = "send mail failed ,output was %s" % r.content + log.error("send mail failed ,{0}".format(err_msg)) + + +def send_alter_mail(sub, body, sendto): + import requests + mail_url = 'http://10.10.3.16:50010/mail/api/v2.0/send' + data = {"sub": sub, "content": body, "sendto": sendto} + r = requests.post(url=mail_url, data=data) + if r.status_code == 200: + log.info("send mail success!") + else: + err_msg = "send mail failed ,output was %s" % r.content + log.error("send mail failed ,{0}".format(err_msg)) + + +def run_cmd(cmd): + import subprocess + msg = "Starting run: %s " % cmd + #log.info("run_cmd {0}".format(msg)) + cmdref = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) + output, error_info = cmdref.communicate() + if cmdref.returncode != 0: + if isinstance(error_info, list) or isinstance(error_info, tuple): + error_info = error_info[0] + #msg = "RUN %s ERROR,error info: %s" % (cmd, error_info) + #log.error(msg) + return False, error_info + + else: + msg = "run %s success" % cmd + #log.info(msg) + # print "Run Success!!" + return True, output + + +def str_datetime(step=0): + import datetime + return (datetime.datetime.now() - datetime.timedelta(int(step))).strftime("%Y-%m-%d %H:%M:%S") + + +def str_short_time(step=0): + import datetime + return (datetime.datetime.now() - datetime.timedelta(int(step))).strftime("%Y%m%d%H%M%S") + + +def write_files(filename, data): + with open(filename, 'a+') as f: + f.write(data) + f.write('\n') + + +def read_files(filename): + with open(filename, 'r') as f: + data = f.read().strip() + return data + + +def cal_crc32(data): + import binascii + return binascii.crc32(data.encode()) + + +def cal_db_num(data, db=16): + import binascii + data_crc = binascii.crc32(data.encode()) + return int(data_crc % db) + 1 + + +def to_str(bytes_or_str): + if isinstance(bytes_or_str, bytes): + value = bytes_or_str.decode('utf-8') + else: + value = bytes_or_str + return value + + +# 返回格式化时间的下一天 +def next_day(day, i=1): + import datetime + try: + begin = datetime.datetime.strptime(day, '%Y-%m-%d') + except: + raise Exception("PLS input time as '2019-03-01!'") + next_day = (begin + datetime.timedelta(days=i)) + return datetime.datetime.strftime(next_day, "%Y-%m-%d") + + +def class2dict(a): + try: + dd = dict((name, getattr(a, name)) for name in dir(a) if + (not name.startswith('_') and not name.startswith('__') and not callable(getattr(a, name)))) + except Exception as e: + print("{} 转换到dict失败 ,{}".format(a, e)) + return None + return dd # a=open(filename,'r').read() # time.strftime("%Y%m%d %H:%M:%S") # print( '20180807 15:23:29') # # time.strptime("20180702","%Y%m%d") # time.struct_time(tm_year=2018, tm_mon=7, tm_mday=2, tm_hour=0, tm_min=0, tm_sec=0, tm_wday=0, tm_yday=183, tm_isdst=-1) diff --git a/gamedata2001.py b/gamedata2001.py new file mode 100644 index 0000000..de14862 --- /dev/null +++ b/gamedata2001.py @@ -0,0 +1,38 @@ +# -*- coding: utf-8 -*- + +class Data2001(): + def __init__(self): + self.gameid = 0 + self.fromappid = "" # 渠道ID + self.date = "" # 报表日期 + self.dau = 0 # 游戏当日的活跃用户,整数 + self.registerUser = 0 # 累计注册 + self.loginUser = 0 # 游戏当日的用户启动次数,整数 + self.newUser = 0 # 游戏当日的初次注册的用户数量,整数 + self.timedelay = 0 # 游戏当日的用户人均停留时长,单位秒,整数 + self.shareNum = 0 # 游戏当日所有用户的分享总次数,整数 + self.sharePeople = 0 # 游戏当日有过分享行为的用户数量(去重),整数 + + self.regSession = 0 # 会话注册比率,百分比,记录到百分位 + self.dauSession = 0 # 会话活跃比率,百分比,记录到百分位 + self.adExposure = 0 # banner广告曝光量,整数 + self.adClick = 0 # banner广告点击量,整数 + self.adClickRate = 0 # banner广告点击率,百分比,记录到百分位 + self.adIncome = 0 # banner广告收入,记录到百分位 + + self.movieExposure = 0 # 激励式视频曝光量,整数 + self.movieClick = 0 # 激励式视频点击量,整数 + self.movieClickRate = 0 # 激励式视频点击率,整数 + self.movieIncome = 0 # 激励式视频广告收入,记录到百分位 + self.adTotalIncome = 0 # 广告总收入,记录到百分位 + self.vPay = 0 # 虚拟支付,记录到百分位 + + self.shareRate = 0 # 用户分享率,百分比,记录到百分位 + self.shareRatePeople = 0 # 分享用户人均分享次数,记录到百分位 + self.newConversionRateSession = 0 # 会话新增转化率,百分比,记录到百分位 + self.k = 0 # K值 数字,记录到百分位 + self.totalIncome = 0 # 总收入 记录到百分位 + self.ARPU = 0 # 每用户平均收入,记录到千分位 + self.newUserSession = 0 # 会话注册人数,整数 + self.dauActiveSession = 0 # DAU会话人数 + diff --git a/gamedatabase.py b/gamedatabase.py new file mode 100644 index 0000000..f602ee9 --- /dev/null +++ b/gamedatabase.py @@ -0,0 +1,38 @@ +# -*- coding: utf-8 -*- + +class Data(): + def __init__(self): + self.gameid = 0 + self.fromappid = "" # 渠道ID + self.date = "" # 报表日期 + self.dau = 0 # 游戏当日的活跃用户,整数 + self.registerUser = 0 # 累计注册 + self.loginUser = 0 # 游戏当日的用户启动次数,整数 + self.newUser = 0 # 游戏当日的初次注册的用户数量,整数 + self.timedelay = 0 # 游戏当日的用户人均停留时长,单位秒,整数 + self.shareNum = 0 # 游戏当日所有用户的分享总次数,整数 + self.sharePeople = 0 # 游戏当日有过分享行为的用户数量(去重),整数 + + self.regSession = 0 # 会话注册比率,百分比,记录到百分位 + self.dauSession = 0 # 会话活跃比率,百分比,记录到百分位 + self.adExposure = 0 # banner广告曝光量,整数 + self.adClick = 0 # banner广告点击量,整数 + self.adClickRate = 0 # banner广告点击率,百分比,记录到百分位 + self.adIncome = 0 # banner广告收入,记录到百分位 + + self.movieExposure = 0 # 激励式视频曝光量,整数 + self.movieClick = 0 # 激励式视频点击量,整数 + self.movieClickRate = 0 # 激励式视频点击率,整数 + self.movieIncome = 0 # 激励式视频广告收入,记录到百分位 + self.adTotalIncome = 0 # 广告总收入,记录到百分位 + self.vPay = 0 # 虚拟支付,记录到百分位 + + self.shareRate = 0 # 用户分享率,百分比,记录到百分位 + self.shareRatePeople = 0 # 分享用户人均分享次数,记录到百分位 + self.newConversionRateSession = 0 # 会话新增转化率,百分比,记录到百分位 + self.k = 0 # K值 数字,记录到百分位 + self.totalIncome = 0 # 总收入 记录到百分位 + self.ARPU = 0 # 每用户平均收入,记录到千分位 + self.newUserSession = 0 # 会话注册人数,整数 + self.dauActiveSession = 0 # DAU会话人数 + diff --git a/mansible.py b/mansible.py new file mode 100644 index 0000000..3f74591 --- /dev/null +++ b/mansible.py @@ -0,0 +1,230 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import os +import tempfile +from collections import namedtuple +from ansible.parsing.dataloader import DataLoader +from ansible.vars.manager import VariableManager +from ansible.inventory.manager import InventoryManager +from ansible.playbook.play import Play +from ansible.executor.playbook_executor import PlaybookExecutor +from ansible.executor.task_queue_manager import TaskQueueManager +from ansible.plugins.callback import CallbackBase +import json +import pdb +import copy +import time + + +def write_host(ip_addr): + host_file = "%s/host%s" % ('/tmp', time.strftime("%Y%m%d%H%M%s")) + with open(host_file, 'a+') as f: + for x in ip_addr.split(','): + f.writelines(x + '\n') + return host_file + + +class ResultsCallback(CallbackBase): + def __init__(self, *args, **kwargs): + super(ResultsCallback, self).__init__(*args, **kwargs) + self.task_ok = {} + self.task_unreachable = {} + self.task_failed = {} + self.task_skipped = {} + self.task_stats = {} + # self.host_ok = {} + # self.host_unreachable = {} + # self.host_failed = {} + + def v2_runner_on_unreachable(self, result): + self.task_unreachable[result._host.get_name()] = result + + def v2_runner_on_ok(self, result, *args, **kwargs): + self.task_ok[result._host.get_name()] = result + + def v2_runner_on_failed(self, result, *args, **kwargs): + self.task_failed[result._host.get_name()] = result + + def v2_runner_on_skipped(self, result, *args, **kwargs): + self.task_skipped[result._host.get_name()] = result + + def v2_runner_on_stats(self, result, *args, **kwargs): + self.task_stats[result._host.get_name()] = result + + +class AnsibleAPI(object): + def __init__(self, hostfile, *args, **kwargs): + self.loader = DataLoader() + self.results_callback = ResultsCallback() + # if not os.path.isfile(hostfile): + # raise Exception("%s file not found!" % hostfile) + self.inventory = InventoryManager(loader=self.loader, sources=[hostfile]) + self.variable_manager = VariableManager(loader=self.loader, inventory=self.inventory) + self.passwords = None + Options = namedtuple('Options', + ['connection', + 'remote_user', + 'ask_sudo_pass', + 'verbosity', + 'ack_pass', + 'module_path', + 'forks', + 'become', + 'become_method', + 'become_user', + 'check', + 'listhosts', + 'listtasks', + 'listtags', + 'syntax', + 'sudo_user', + 'sudo', + 'diff']) + # 初始化需要的对象 + self.options = Options(connection='smart', + remote_user=None, + ack_pass=None, + sudo_user=None, + forks=5, + sudo=None, + ask_sudo_pass=False, + verbosity=5, + module_path=None, + become=None, + become_method=None, + become_user=None, + check=False, + diff=False, + listhosts=None, + listtasks=None, + listtags=None, + syntax=None) + + @staticmethod + def deal_result(info): + host_ips = list(info.get('success').keys()) + info['success'] = host_ips + + error_ips = info.get('failed') + error_msg = {} + for key, value in error_ips.items(): + temp = {} + temp[key] = value.get('stderr') + error_msg.update(temp) + info['failed'] = error_msg + return info + # return json.dumps(info) + + def run(self, module_name, module_args): + play_source = dict( + name="Ansible Play", + hosts='all', + gather_facts='no', + tasks=[ + dict(action=dict(module=module_name, args=module_args), register='shell_out'), + # dict(action=dict(module='debug', args=dict(msg='{{shell_out.stdout}}'))) + ] + ) + play = Play().load(play_source, variable_manager=self.variable_manager, loader=self.loader) + + tqm = None + try: + tqm = TaskQueueManager( + inventory=self.inventory, + variable_manager=self.variable_manager, + loader=self.loader, + options=self.options, + passwords=self.passwords, + stdout_callback=self.results_callback, + ) + result = tqm.run(play) + finally: + if tqm is not None: + tqm.cleanup() + + ##定义字典用于接收或者处理结果 + result_raw = {'success': {}, 'failed': {}, 'unreachable': {}, 'skipped': {}, 'status': {}} + + # 循环打印这个结果,success,failed,unreachable需要每个都定义一个 + for host, result in self.results_callback.task_ok.items(): + result_raw['success'][host] = result._result + for host, result in self.results_callback.task_failed.items(): + result_raw['failed'][host] = result._result + for host, result in self.results_callback.task_unreachable.items(): + result_raw['unreachable'][host] = result._result + + result_full = copy.deepcopy(result_raw) + result_json = self.deal_result(result_raw) + if not result_json['failed']: + return result_json + else: + return result_full + # return result_raw + + def run_playbook(self, file, **kwargs): + if not os.path.isfile(file): + raise Exception("%s file not found!" % file) + try: + # extra_vars = {} # 额外的参数 sudoers.yml以及模板中的参数,它对应ansible-playbook test.yml --extra-vars "host='aa' name='cc' " + self.variable_manager.extra_vars = kwargs + + playbook = PlaybookExecutor(playbooks=['' + file], inventory=self.inventory, + variable_manager=self.variable_manager, + loader=self.loader, options=self.options, passwords=self.passwords) + + playbook._tqm._stdout_callback = self.results_callback + playbook.run() + except Exception as e: + print("error:", e.message) + + ##定义字典用于接收或者处理结果 + result_raw = {'success': {}, 'failed': {}, 'unreachable': {}, 'skipped': {}, 'status': {}} + + # 循环打印这个结果,success,failed,unreachable需要每个都定义一个 + for host, result in self.results_callback.task_ok.items(): + result_raw['success'][host] = result._result + for host, result in self.results_callback.task_failed.items(): + result_raw['failed'][host] = result._result + for host, result in self.results_callback.task_unreachable.items(): + result_raw['unreachable'][host] = result._result + + # for host, result in self.results_callback.task_skipped.items(): + # result_raw['skipped'][host] = result._result + # + # for host, result in self.results_callback.task_stats.items(): + # result_raw['status'][host] = result._result + + result_full = copy.deepcopy(result_raw) + result_json = self.deal_result(result_raw) + if not result_json['failed']: + return result_json + else: + return result_full + + +class AnsiInterface(AnsibleAPI): + def __init__(self, hostfile, *args, **kwargs): + super(AnsiInterface, self).__init__(hostfile, *args, **kwargs) + + def copy_file(self, src=None, dest=None): + """ + copy file + """ + module_args = "src=%s dest=%s" % (src, dest) + result = self.run('copy', module_args) + return result + + def exec_command(self, cmds): + """ + commands + """ + result = self.run('command', cmds) + return result + + def exec_script(self, path): + """ + 在远程主机执行shell命令或者.sh脚本 + """ + result = self.run('shell', path) + return result diff --git a/mcrypto.py b/mcrypto.py new file mode 100644 index 0000000..37c350c --- /dev/null +++ b/mcrypto.py @@ -0,0 +1,55 @@ +# -*- coding: utf-8 -*- + +# pip install pycryptodome + +from Crypto.Cipher import AES +from binascii import b2a_hex, a2b_hex + +AES_LENGTH = 16 + + +class Crypto(): + def __init__(self, key): + self.key = key + self.mode = AES.MODE_ECB + self.cryptor = AES.new(self.pad_key(self.key).encode(), self.mode) + + # 加密文本text必须为16的倍数,补足为16的倍数 + def pad(self, text): + while len(text) % AES_LENGTH != 0: + text += '\0' + return text + + # 加密密钥需要长达16位字符,所以进行空格拼接 + def pad_key(self,key): + while len(key) % AES_LENGTH != 0: + key += '\0' + return key + + # 加密 + def encrypt(self, text): + try: + # 这里密钥key 长度必须为16(AES-128)、24(AES-192)、或32(AES-256)Bytes 长度.目前AES-128足够用 + # 加密的字符需要转换为bytes + self.ciphertext = self.cryptor.encrypt(self.pad(text).encode()) + + # 因为AES加密时候得到的字符串不一定是ascii字符集的,输出到终端或者保存时候可能存在问题 + # 所以这里统一把加密后的字符串转化为16进制字符串 + return b2a_hex(self.ciphertext) + except Exception as e: + return e + + # 解密 + def decrypt(self, text): + try: + plain_text = self.cryptor.decrypt(a2b_hex(text)).decode() + return plain_text.rstrip('\0') + except Exception as e: + return e + + +if __name__ == '__main__': + pc = Crypto('kingsome') # 初始化密钥 + e = pc.encrypt("123456") + d = pc.decrypt(e) + print(e, d) diff --git a/mftp.py b/mftp.py new file mode 100644 index 0000000..4677794 --- /dev/null +++ b/mftp.py @@ -0,0 +1,106 @@ +#!/usr/bin/python +# -*- coding:utf-8 _*- +""" +@author: pengtao +@file: mftp.py +@time: 2020/04/07 +""" + +import os +import sys +import time +import socket +import ftplib + + +class MyFTP: + def __init__(self, data): + self.host = data['host'] + self.port = data.get('port', 21) + self.ftp_dirs = data['ftp_dirs'] + self.user = data['user'] + self.passwd = data['passwd'] + + def get_file_size(self, type='text'): + with ftplib.FTP(self.ftp_dirs) as ftp: + try: + ftp.login(self.user, self.passwd) + if type == 'text': + size = ftp.size('debian/README') + elif type == "binary": + # TYPE A for ASCII mode + ftp.sendcmd('TYPE I') + size = ftp.size('debian/ls-lR.gz') + else: + return None + except ftplib.all_errors as e: + print('FTP error:', e) + return size + + def create_director(self): + with ftplib.FTP(self.ftp_dirs) as ftp: + try: + ftp.login(self.user, self.passwd) + wdir = ftp.pwd() + print(wdir) + + ftp.cwd('debian') + + wdir2 = ftp.pwd() + print(wdir2) + + except ftplib.all_errors as e: + print('FTP error:', e) + + def upload(self): + with ftplib.FTP(self.ftp_dirs) as ftp: + filename = 'README' + try: + ftp.login('user7', 's$cret') + ftp.cwd(self.ftp_path) + ftp.set_pasv(1) + local_file = os.path.join(self.local, filename) + with open(local_file, 'rb') as fp: + res = ftp.storlines("STOR " + filename, fp) + if not res.startswith('226 Transfer complete'): + print('Upload failed') + except ftplib.all_errors as e: + print('FTP error:', e) + + def download(self): + with ftplib.FTP(self.ftp_dirs) as ftp: + file_orig = '/debian/README' + file_copy = 'README' + try: + ftp.login() + with open(file_copy, 'w') as fp: + res = ftp.retrlines('RETR ' + file_orig, fp.write) + if not res.startswith('226 Transfer complete'): + print('Download failed') + if os.path.isfile(file_copy): + os.remove(file_copy) + + except ftplib.all_errors as e: + print('FTP error:', e) + + if os.path.isfile(file_copy): + os.remove(file_copy) + + +if __name__ == "__main__": + data = {'host': '10.10.3.10', 'user': 'ftp', 'passwd': 'kingsome2018'} + my_ftp = MyFTP(data) + + # 下载单个文件 + my_ftp.download_file("G:/ftp_test/XTCLauncher.apk", "/App/AutoUpload/ouyangpeng/I12/Release/XTCLauncher.apk") + + # 下载目录 + # my_ftp.download_file_tree("G:/ftp_test/", "App/AutoUpload/ouyangpeng/I12/") + + # 上传单个文件 + # my_ftp.upload_file("G:/ftp_test/Release/XTCLauncher.apk", "/App/AutoUpload/ouyangpeng/I12/Release/XTCLauncher.apk") + + # 上传目录 + # my_ftp.upload_file_tree("G:/ftp_test/", "/App/AutoUpload/ouyangpeng/I12/") + + my_ftp.close() diff --git a/mlog.py b/mlog.py new file mode 100644 index 0000000..c8ce71e --- /dev/null +++ b/mlog.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +import sys +import logging + +logging.basicConfig(level=logging.INFO, + format='%(asctime)s %(filename)s : LINE %(lineno)-4d %(levelname)s %(message)s', + datefmt='%Y-%m-%d %A %H:%M:%S', filename="{0}.log".format(sys.argv[0]), filemode='a') +console = logging.StreamHandler() +console.setLevel(logging.DEBUG) # 屏幕显示的级别 +formatter = logging.Formatter('%(asctime)s %(filename)s : LINE %(lineno)-4d %(levelname)s %(message)s') +console.setFormatter(formatter) +logging.getLogger().addHandler(console) +log = logging.getLogger(__name__) + + +def get_log(): + logging.basicConfig(level=logging.INFO, + format='%(asctime)s %(filename)s : LINE %(lineno)-4d %(levelname)s %(message)s', + datefmt='%Y-%m-%d %A %H:%M:%S', filename="/data/logs/1.log", filemode='a') + console = logging.StreamHandler() + console.setLevel(logging.DEBUG) # 屏幕显示的级别 + formatter = logging.Formatter('%(asctime)s %(filename)s : LINE %(lineno)-4d %(levelname)s %(message)s') + console.setFormatter(formatter) + logging.getLogger().addHandler(console) + log = logging.getLogger(__name__) + return log + + +def define_logger(filename="/data/logs/mylog.log"): + logger = logging.getLogger("main") + logger.setLevel(logging.INFO) + # 设置输出格式 + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + + # 设置日志文件处理器 + fh = logging.FileHandler(filename) + fh.setFormatter(formatter) # 为这个处理器添加格式 + + # 设置屏幕stdout输出处理器 + sh = logging.StreamHandler(stream=None) + sh.setFormatter(formatter) + + # 把处理器加到logger上 + logger.addHandler(fh) + logger.addHandler(sh) + diff --git a/mmail.py b/mmail.py new file mode 100644 index 0000000..900f390 --- /dev/null +++ b/mmail.py @@ -0,0 +1,23 @@ +# -*- coding: utf-8 -*- + +def send_mail(sub, body, send_to): + import requests + url = "http://10.10.3.16:50010/mail/api/v2.0/send" + data = {"sub": sub, "content": body, "sendto": send_to} + r = requests.post(url=url, data=data) + if r.status_code == requests.codes.ok: + print(True) + else: + print(False) + + +def send_ops_mail(sub, body, send_to): + import requests + url = "http://10.10.3.10:5011/mail/api/v2.0/alarm" + data = {"sub": sub, "content": body, "sendto": send_to} + r = requests.post(url=url, data=data) + if r.status_code == requests.codes.ok: + print(True) + else: + print(False) + diff --git a/mmongo.py b/mmongo.py new file mode 100644 index 0000000..f3b0396 --- /dev/null +++ b/mmongo.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python +# -*- coding:utf-8 -*- + +import pymongo +import re +from bson.json_util import loads # load MongoDB Extended JSON data + + +class MongodbBase: + + def __init__(self, **args): + self.host = args.get('host') + try: + self.port = int(args.get('port')) + except Exception as e: + raise Exception("get args about port failed,{}".format(e)) + self.dbname = args.get('dbname') + self.collname = args.get('collname', self.dbname) + + try: + # 建立mongodb连接 选择对应的集合 + self.client = pymongo.MongoClient(self.host, self.port) + self.db = self.client[self.dbname] + self.coll = self.db[self.collname] + except Exception as e: + raise ValueError( + 'mongodb connect {}{}{}{} error,out was {1}'.format(self.host, self.port, self.db, self.coll, e)) + + def __del__(self): + self.client.close() + + def select(self, myquery): + try: + data = self.coll.find(myquery={}) + return data + except Exception as e: + print("get data error ,{}".format(e)) + return None + + # 查询 + def query(self, qc=list()): + try: + if isinstance(qc, list): + rel = self.coll.aggregate(qc) + return rel + else: + raise ValueError("arg type is not list!") + except Exception as e: + return e + + # 更新 + def update(self, match_data, update_data): + try: + rel = self.coll.update_many(match_data, {"$set": update_data}) + return "匹配到{0}条数据,修改{1}条数据".format(rel.matched_count, rel.modified_count) + except Exception as e: + return e + + # 插入 + def insert(self, inert_data): + try: + if isinstance(inert_data, list): + rel = self.coll.insert_many(inert_data) + return rel.inserted_ids + else: + raise ValueError("arg type is not list!") + except Exception as e: + return e + + # 导入json数据 + def insert_file(self, file): + try: + ids = self.coll.distinct('_id') + f = open(file, 'r', encoding='utf-8') + data = list() + tmp_data = list() + for i in f: + d = loads(i) + if d['_id'] in ids: + tmp_data.append(d['_id']) + else: + data.append(d) + if tmp_data: + return "数据中包含已存在ObjectId" + else: + rel = self.coll.insert_many(data) + return rel.acknowledged + except Exception as e: + return e + + # 删除 + def delete(self, del_data): + try: + rel = self.coll.delete_many(del_data) + return "{0}个条目已删除".format(rel.deleted_count) + except Exception as e: + return e + + +if __name__ == "__main__": + args = dict() + args['host'] = '127.0.0.1' + # args['host'] = '10.10.5.4' + args['port'] = 27017 + # args['dbname'] = 'dalmatian-production' + args['dbname'] = 'test' + # args['collname'] = 'redemption_record' + match_data = {'ppp': '99999999'} + update_data = {'ppp': '7777777'} + + # 查询条件 + qc = [{"$addFields": {"ouid": {"$toObjectId": "$uid"}}}, + {'$lookup': {'from': 'users', 'localField': 'ouid', 'foreignField': '_id', 'as': 'userinfo'}}] + m = MongodbBase(**args) + # r = m.update(match_data, update_data) + r = m.query() + ii = m.insert_file(file='random_users.json') + print(ii) # for i in r: # print(i) diff --git a/mmysql.py b/mmysql.py new file mode 100644 index 0000000..6f9c5fe --- /dev/null +++ b/mmysql.py @@ -0,0 +1,175 @@ +#!/usr/bin/env python +# -*- coding:utf-8 -*- + +""" +date: 2016/07/11 +role: mysql的增删改查类 +usage: m = mysqlBase(host='xxx',db='xxx',user='xxx',pwd='xxx') 实例化 + m.insert('core',{'host_name':'ccc','process_name':'ddd','ip_addr':'192.168.136.41','status':'4'}) + m.update('table',{'field1':'value1','field2':'value2'},'id=1') 更新表名 字段名:值 条件 + m.delete('core','status=5 and id=12') + m.change("update core set a='aaa' where id=1") 可以多条插入 + m.query("select * from core") +""" +from .mlog import log +import warnings + +try: + import MySQLdb +except: + import pymysql + + pymysql.install_as_MySQLdb() + import MySQLdb + + +###mysql操作类 +class MysqlBase: + + ###连接数据库 + def __init__(self, **args): + + ###获取参数 + self.host = args.get('host', 'localhost') + self.user = args.get('user') + self.pswd = args.get('pswd') + self.db = args.get('db', 'mysql') + self.port = args.get('port', '3306') + self.charset = args.get('charset', 'utf8') + + try: + self.conn = MySQLdb.connect(host=self.host, user=self.user, passwd=self.pswd, db=self.db, + port=int(self.port), charset=self.charset) + + self.curs = self.conn.cursor() + + self.curs.execute('SET NAMES utf8') + except: + log.error('%s mysql connect error' % self.host) + raise ValueError('mysql connect error %s' % self.host) + + ###释放资源 + def __del__(self): + self.curs.close() + self.conn.close() + + ###插入 + + def insert(self, table, data): + _field = ','.join(['`%s`' % (k_insert) for k_insert in data.keys()]) + _value = ','.join(["'%s'" % (str(v_insert).replace("'", "\'")) for v_insert in data.values()]) + ###拼接成sql语句 + _sql = 'INSERT INTO `%s`(%s) VALUES(%s)' % (table, _field, _value) + + ###执行 + self.curs.lastrowid = 0 + try: + self.curs.execute(_sql) + ###提交 + self.conn.commit() + # log.info('%s insert ' % _sql) + + except: + self.conn.rollback() + log.error('%s insert error' % _sql) + raise ValueError('112,insert error %s' % _sql) + + return self.curs.lastrowid + + ###更新 + def update(self, table, data, condition): + _field = ','.join(["`%s`='%s'" % (k_update, str(data[k_update]).replace("'", "\'")) for k_update in data]) + + _sql = 'UPDATE `%s` SET %s WHERE %s' % (table, _field, condition) + ###执行 + resNum = 0 + try: + resNum = self.curs.execute(_sql) + ###提交 + self.conn.commit() + # log.info('%s update ' % _sql) + except Exception: + log.error("run {_sql} failed",exc_info=True) + self.conn.rollback() + # log.error('%s update error' % _sql) + raise ValueError('update error %s' % _sql) + + return resNum + + ###删除 + def delete(self, table, condition): + _sql = 'DELETE FROM `%s` WHERE %s' % (table, condition) + + ###执行 + resNum = 0 + try: + resNum = self.curs.execute(_sql) + ###提交 + self.conn.commit() + # log.info('%s delete ' % _sql) + + except: + self.conn.rollback() + log.error('%s delete error' % _sql) + raise ValueError('112,delete error %s' % _sql) + + return resNum + + ###直接给修改语句执行 + def change(self, sql, many=False): + ###过滤unknow table的warning + warnings.filterwarnings('ignore') + resNum = 0 + if many: + try: + ###多条同时插入 + resNum = self.curs.executemany(sql, many) + self.conn.commit() + # log.info('%s exec ' % sql) + + except: + self.conn.rollback() + log.error('%s exec error' % sql) + raise ValueError('exec error %s' % sql) + else: + try: + resNum = self.curs.execute(sql) + ###提交 + self.conn.commit() + # log.info('%s exec ' % sql) + + except: + self.conn.rollback() + log.error('%s exec error' % sql) + raise ValueError('112,exec error %s' % sql) + + return resNum + + ###查询 + def query(self, sql): + res = '' + try: + self.curs.execute(sql) + res = self.curs.fetchall() + # log.info('%s query ' % sql) + + except: + log.error('%s query error' % sql) + + # raise ValueError('query error %s'% sql) + + return res + + +if __name__ == "__main__": + args = dict() + args['host'] = '172.16.17.164' + args['user'] = 'miles' + args['pswd'] = 'aspect' + args['db'] = 'test' + sql_sel = "select * from bigdata_host limit 5" + m = MysqlBase(**args) + data = m.query(sql=sql_sel) + m.insert('bigdata_host', {'hostname': 'ccc', 'remark': 'ddd', 'up_addr_p': '192.168.136.41', 'states': '4', + 'enter_time': '2017-03-13'}) + m.delete('bigdata_host', 'hostname="ccc"') diff --git a/mping.py b/mping.py new file mode 100644 index 0000000..c1c4e25 --- /dev/null +++ b/mping.py @@ -0,0 +1,72 @@ +#!/usr/bin/python +# -*- coding:utf-8 _*- +""" +@author: pengtao +@file: mping.py +@time: 2020/03/26 +""" + +import os +import socket +import struct +import array + + +class Pinger(object): + def __init__(self, timeout=3): + self.timeout = timeout + self.__id = os.getpid() + self.__data = struct.pack('h', 1) # h代表2个字节与头部8个字节组成偶数可进行最短校验 + + @property + def __icmpSocket(self): # 返回一个可以利用的icmp原对象,当做属性使用 + icmp = socket.getprotobyname("icmp") # 指定服务 + sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp) # socket.SOCK_RAW原生包 + return sock + + + def __doCksum(self, packet): # 校验和运算 + words = array.array('h', packet) # 将包分割成2个字节为一组的网络序列 + sum = 0 + for word in words: + sum += (word & 0xffff) # 每2个字节相加 + sum = (sum >> 16) + (sum & 0xffff) # 因为sum有可能溢出16位所以将最高位和低位sum相加重复二遍 + sum += (sum >> 16) # 为什么这里的sum不需要再 & 0xffff 因为这里的sum已经是16位的不会溢出,可以手动测试超过65535的十进制数字就溢出了 + return (~sum) & 0xffff # 最后取反返回完成校验 + + @property + def __icmpPacket(self): # icmp包的构造 + header = struct.pack('bbHHh', 8, 0, 0, self.__id, 0) + packet = header + self.__data + cksum = self.__doCksum(packet) + header = struct.pack('bbHHh', 8, 0, cksum, self.__id, 0) # 将校验带入原有包,这里才组成头部,数据部分只是用来做校验所以返回的时候需要返回头部和数据相加 + return header + self.__data + + + def sendPing(self, target_host): + + try: + socket.gethostbyname(target_host) + + sock = self.__icmpSocket + sock.settimeout(self.timeout) + + packet = self.__icmpPacket + + sock.sendto(packet, (target_host, 1)) # 发送icmp包 + + ac_ip = sock.recvfrom(1024)[1][0] + print('[+] %s active' % (ac_ip)) + except Exception: + sock.close() + return False + return True + + +def main(): + pp = Pinger() + pp.sendPing('192.168.100.20') + + +if __name__ == '__main__': + main() diff --git a/mtelnet.py b/mtelnet.py new file mode 100644 index 0000000..5989c2c --- /dev/null +++ b/mtelnet.py @@ -0,0 +1,58 @@ +import logging +import telnetlib +import time + + +class TelnetClient(): + def __init__(self,): + self.tn = telnetlib.Telnet() + + # 此函数实现telnet登录主机 + def login_host(self,host_ip,username,password): + try: + # self.tn = telnetlib.Telnet(host_ip,port=23) + self.tn.open(host_ip,port=23) + except: + logging.warning('%s网络连接失败'%host_ip) + return False + # 等待login出现后输入用户名,最多等待10秒 + self.tn.read_until(b'login: ',timeout=10) + self.tn.write(username.encode('ascii') + b'\n') + # 等待Password出现后输入用户名,最多等待10秒 + self.tn.read_until(b'Password: ',timeout=10) + self.tn.write(password.encode('ascii') + b'\n') + # 延时两秒再收取返回结果,给服务端足够响应时间 + time.sleep(2) + # 获取登录结果 + # read_very_eager()获取到的是的是上次获取之后本次获取之前的所有输出 + command_result = self.tn.read_very_eager().decode('ascii') + if 'Login incorrect' not in command_result: + logging.warning('%s登录成功'%host_ip) + return True + else: + logging.warning('%s登录失败,用户名或密码错误'%host_ip) + return False + + # 此函数实现执行传过来的命令,并输出其执行结果 + def execute_some_command(self,command): + # 执行命令 + self.tn.write(command.encode('ascii')+b'\n') + time.sleep(2) + # 获取命令结果 + command_result = self.tn.read_very_eager().decode('ascii') + logging.warning('命令执行结果:\n%s' % command_result) + + # 退出telnet + def logout_host(self): + self.tn.write(b"exit\n") + +if __name__ == '__main__': + host_ip = '192.168.220.129' + username = 'root' + password = 'abcd1234' + command = 'whoami' + telnet_client = TelnetClient() + # 如果登录结果返加True,则执行命令,然后退出 + if telnet_client.login_host(host_ip,username,password): + telnet_client.execute_some_command(command) + telnet_client.logout_host() \ No newline at end of file diff --git a/mtga.py b/mtga.py new file mode 100644 index 0000000..b1497d2 --- /dev/null +++ b/mtga.py @@ -0,0 +1,293 @@ +# -*- coding: utf-8 -*- + +import requests +import json +import datetime +import os +from tgasdk.sdk import * +import pdb +import pymysql +from ops.mmysql import MysqlBase + + +# 初始化命令:pip install ThinkingDataSdk + +class TgaUserData(object): + def __init__(self): + self.gameid = 0 + self.accountid = "" + self.distinctid = "" + self._nickname = "" + self._sex = 0 + self._avatar_url = "" + self._city = "" + self._province = "" + self._country = "" + self.createtime = "" + self.is_hide = 0 + self.lastlogon = "" + self.firstlogonip = "" + # 未实现 先占坑 + self.is_real = 1 + self.unionid = "" + self.wid = "" + self.channel = 0 + self.sid = "" + self.ptid = "" + self.from_appid = "" + self.gamescore_his = 0 + self.gamescore_week = 0 + self.gamescore_last = 0 + + def check_byte(self, i): + if not i: + return "" + elif isinstance(i, bytes): + return pymysql.escape_string(i.decode()) + else: + return pymysql.escape_string(i) + + @property + def nickname(self): + return self._nickname + + @nickname.setter + def nickname(self, nickname): + self._nickname = self.check_byte(nickname) + + @property + def sex(self): + return self._sex + + @sex.setter + def sex(self, sex): + if sex: + try: + self._sex = int(sex) + except Exception as e: + print("ERROR get sex property from tga failed! output was {} ".format(e)) + self._sex = 0 + else: + self._sex = 0 + + @property + def city(self): + return self._city + + @city.setter + def city(self, city): + self._city = self.check_byte(city) + + @property + def avatar_url(self): + return self._avatar_url + + @avatar_url.setter + def avatar_url(self, avatar_url): + self._avatar_url = self.check_byte(avatar_url) + + @property + def province(self): + return self._province + + @province.setter + def province(self, province): + self._province = self.check_byte(province) + + @property + def country(self): + return self._country + + @country.setter + def country(self, country): + self._country = self.check_byte(country) + + @property + def distinctid(self): + return self._distinctid + + @distinctid.setter + def distinctid(self, distinctid): + self._distinctid = self.check_byte(distinctid) + + @property + def from_appid(self): + return self._from_appid + + @from_appid.setter + def from_appid(self, from_appid): + self._from_appid = self.check_byte(from_appid) + + +class CheckWords(object): + def __init__(self): + pass + + def run(self, item): + if not item: + return 0 + if isinstance(item, (str, bytes, int, float)): + try: + return int(item) + except Exception as e: + print("return values failed,output was {}".format(e)) + return 0 + + if isinstance(item, (list, tuple)): + return self.run(item[0]) + + +# 按提供的gameid从tga数据库中查询并输出其对应的url,suffix,appid,api_key,tgaid以dict形式存放 +class GetTgaConfig(): + def __init__(self): + self.url = "http://10.10.3.17:8992/querySql" + self.TGA = {'user': 'mytga', 'pswd': 'gzVwh4HGR68G', 'host': '10.10.3.5', 'db': 'tga'} + + def get_api_key(self, gameid, channel=6001): + item = {} + item['url'] = self.url + sql = "select suffix,appid,api_secret,tgaid from tgainfo where gameid={} and channelid={} and " \ + "in_used=1;".format(gameid, channel) + t = MysqlBase(**self.TGA) + data = t.query(sql) + if data: + item['suffix'], item['appid'], item['api_key'], item['tgaid'] = data[0] + return item + + +class FromTga: + def __init__(self, url, token): + if not token: + raise Exception("{0} token not found in env !") + self.url = url + self.token = token + self.output = "/data/logs/tga-report/" + + def get_data(self, sql): + data = {'token': self.token, 'sql': sql} + r = requests.post(self.url, data) + if r.status_code != requests.codes.ok: + print("connect tga failed!") + return None + + out = r.content.decode('utf-8') + if json.loads(out.split('\r\n')[0]).get('return_code', None) != 0: + # raise Exception("get data from tga failed!") + print("get data from tga failed!,sql was {}".format(sql)) + return None + + data_out = out.split('\r\n')[1:] + output = list() + for row in data_out: + if row: + try: + output.append(json.loads(row)) + except Exception as e: + print("转化数据失败,{} 提示为{}".format(row, e)) + return output + + def init_tga_write(self, tgaid): + ''' + from tgasdk.sdk import TGAnalytics, BatchConsumer, LoggingConsumer, AsyncBatchConsumer + 也可引入TGAnalytics与指定的Consumer + ''' + # now = datetime.date.today().strftime('%Y%m%d%H%M%S') + # # 初始化SDK + # filename = "{}/report_{}_{}.log".format(self.output, project, now) + # 检查目录是否存在,如不存在创建之 + paths = "{}/{}".format(self.output, tgaid) + if not os.path.isdir(paths): + os.makedirs(paths, mode=0o755) + + self.tga = TGAnalytics(LoggingConsumer(paths)) + + def _close_tga(self, tga): + self.tga.flush() + self.tga.close() + + def _split_user_data(self, data): + distinct_id = data.get('distinct_id', None) + account_id = data.get('account_id', None) + + if not (distinct_id or account_id): + print("distinct_id 或 account_id 必须有一则有值!") + return None + + if distinct_id: + data.pop('distinct_id') + if account_id: + data.pop('account_id') + + return distinct_id, account_id, data + + def put_event_data(self, data, event_name="Payment"): + # tga = self._init_tga_write() + try: + distinct_id, account_id, new_data = self._split_user_data(data) + except Exception as e: + print("拆解数据错误,输出为{}.请检查!".format(e)) + return False + + # properties = { + # "#time": datetime.datetime.now(), + # "#ip": "192.168.1.1", + # "Product_Name": "月卡", + # "Price": 30, + # "OrderId": "abc_123" + # } + + # 上传事件,包含账号ID与访客ID + try: + self.tga.track(distinct_id, account_id, event_name, new_data) + except Exception as e: + print("write to tga failed,output was {}".format(e)) + # self._close_tga(tga) + return False + # finally: + # self._close_tga(tga) + return True + + def put_user_data(self, data, method='user_set'): + # tga = self._init_tga_write() + try: + distinct_id, account_id, new_data = self._split_user_data(data) + except Exception as e: + print("拆解数据错误,输出为{}.请检查!".format(e)) + return False + try: + if method.lower() == "user_set": + self.tga.user_set(account_id=account_id, distinct_id=distinct_id, properties=new_data) + elif method.lower() == "user_setonce": + self.tga.user_setOnce(account_id=account_id, distinct_id=distinct_id, properties=new_data) + elif method.lower() == "user_add": + self.tga.user_add(account_id=account_id, distinct_id=distinct_id, properties=new_data) + elif method.lower() == "user_del": + self.tga.user_del(account_id=account_id, distinct_id=distinct_id) + else: + print("请提供用户操作类型 [user_set/user_setOnce/user_add/user_del] !") + return False + except Exception as e: + print("write to tga failed,output was {}".format(e)) + return False + # finally: + # self._close_tga(tga) + return True + + + +def main(): + url = "http://10.10.3.17:8992/querySql" + # sql = "SELECT \"#server_time\",localuuid,ext FROM v_event_3 where \"#event_name\"='event_1_1'" + sql = "SELECT distinct from_appid FROM v_event_22 where \"$part_event\"='event_11_1' and gameid='2001' and \"$part_date\"='2019-06-18'" + token = "ESnhwwLtVu7zO2h6SSTEZ1jYagbOet0Kur0XnpG9fVJF5ROsqUkcNO0inVyFtQd1" + t = FromTga(url, token) + # t._init_tga_write() + # data={'account_id': 1012, 'distinct_id': 1012, 'gameid': 1012, 'from_appid': 'wx62d9035fd4fd2059', 'time': '2019-03-10', + # 'new_user': 2, 'active_user': 4, 'avg_runing_time': 0, 'old_user_login': 2, 'newuser_rungame': 2, + # 'newuser_rungame_rate': 100.0, 'newuser_qlty': 0} + # t.put_event_data(data) + print(json.dumps(t.get_data(sql))) + + +if __name__ == "__main__": + main() diff --git a/myredis.py b/myredis.py new file mode 100644 index 0000000..445aa0c --- /dev/null +++ b/myredis.py @@ -0,0 +1,61 @@ +# -*- coding: utf-8 -*- +import redis +import json +import logging +import pdb + +log = logging.getLogger(__name__) + +dev_conf = {"host": "10.10.9.5", "port": "6379"} + + +class Myredis(): + def __init__(self, conf): + if conf.get('passwd', None): + self.r = redis.Redis(host=conf['host'], port=conf['port'], password=conf['passwd'], charset='utf8', + db=conf.get('db') or 0, + decode_responses=True) + else: + self.r = redis.Redis(host=conf['host'], port=conf['port'], charset='utf8', db=conf.get('db') or 0, + decode_responses=True) + + def get_keys(self, keyword): + return self.r.keys(keyword) + + def get_one_key(self, key): + if self.r.type(key) == 'hash': + return self.r.hgetall(key) + elif self.r.type(key) == 'list': + return self.r.lrange(key) + elif self.r.type(key) == 'zset': + return self.r.smembers(key) + elif self.r.type(key) == 'string': + return self.r.get(key) + else: + # log.error(f"{key} {self.r.type(key)} not in define!") + return None + + def set_redis(self, k, v): + if isinstance(v, str): + self.r.set(k, v) + elif isinstance(v, list): + for i in v: + self.r.lpush(k, i) + elif isinstance(v, dict): + self.r.hmset(k, v) + elif isinstance(v, set): + for i in v: + self.r.sadd(k, i) + else: + log.error(f"{v} type not in define!") + + +def main(): + mr = Myredis(dev_conf) + keys = mr.get_keys("*") + for k in keys: + print(f"key={k},values={mr.get_one_key(k)}") + + +if __name__ == "__main__": + main() diff --git a/myrequests.py b/myrequests.py new file mode 100644 index 0000000..6fec055 --- /dev/null +++ b/myrequests.py @@ -0,0 +1,24 @@ +# -*- coding: utf-8 -*- +import requests +import json + + +def myrequests(url, data, methods="get"): + requests.adapters.DEFAULT_RETRIES = 5 # 增加重连次数 + s = requests.session() + s.keep_alive = False # 关闭多余连接 + + if methods == "get": + return s.get(url=url, data=data) + elif methods == "post": + if isinstance(data, "str"): + headers = {'Content-Type': 'application/json'} + return s.get(url=url, headers=headers, data=json.dumps(data)) + elif isinstance(data, ("tuple", "dict", "list")): + return s.get(url=url, json=data) + else: + print(f"pls input data not {type(data)}\t{data}") + return False + else: + print(f"pls input methods not {methods}") + return False diff --git a/plog.py b/plog.py new file mode 100644 index 0000000..8cd8de8 --- /dev/null +++ b/plog.py @@ -0,0 +1,37 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import +import logging + + +def ini_log(filename="/data/logs/aa.log"): + logging.basicConfig(level=logging.ERROR, + format='%(asctime)s %(filename)s : LINE %(lineno)-4d %(levelname)s %(message)s', + datefmt='%Y-%m-%d %A %H:%M:%S', filename=filename, filemode='a') + console = logging.StreamHandler() + console.setLevel(logging.info) # 屏幕显示的级别 + formatter = logging.Formatter('%(filename)s : LINE %(lineno)-4d %(levelname)s %(message)s') + console.setFormatter(formatter) + logging.getLogger().addHandler(console) + +def define_logger(filename="/data/logs/aa.log",debug=True): + logger = logging.getLogger("") + if debug==True: + logger.setLevel(logging.INFO) + else: + logger.setLevel(logging.ERROR) + + # 设置输出格式 + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + + # 设置日志文件处理器 + fh = logging.FileHandler(filename) + fh.setFormatter(formatter) # 为这个处理器添加格式 + + # 设置屏幕stdout输出处理器 + formatter_stdout = logging.Formatter('%(name)s - %(levelname)s - %(message)s') + sh = logging.StreamHandler(stream=None) + sh.setFormatter(formatter_stdout) + + # 把处理器加到logger上 + logger.addHandler(fh) +# logger.addHandler(sh) diff --git a/qcloud_api.py b/qcloud_api.py new file mode 100644 index 0000000..d1515aa --- /dev/null +++ b/qcloud_api.py @@ -0,0 +1,53 @@ +# -*- coding: utf8 -*- +# author:tan +# date:2018-12-10 + +""" +腾讯云api通用类 +""" + +from ops.qcloud_sign import QcSign +import requests +import json +import random +import time +import os + + +# 腾讯云api操作 +class QcApi(object): + def __init__(self, endpoint, data, v='3.0'): + self.secret_id = os.environ.get('SECRET_ID') + self.secret_key = os.environ.get('SECRET_KEY') + self.endpoint = endpoint + self.method = "POST" + self.version = v + + self.data = dict() + for i in data: + self.data[i] = data.get(i) + + # 公共参数 + if self.version == '2.0': + if not data.get('Region'): + self.data['Region'] = 'bj' + else: + if not data.get('Region'): + self.data['Region'] = 'ap-beijing' + if self.version != '3.0': + self.data['Version'] = self.version + else: + self.data['Version'] = '2017-03-12' + self.data['SecretId'] = self.secret_id + self.data['Timestamp'] = int(time.time()) # 时间戳 + self.data['Nonce'] = random.randint(1000, 1000000) # 随机正整数 + sign = QcSign(self.secret_key, self.method, self.endpoint, self.data, self.version) + self.data['Signature'] = sign.sign_str() + + def workflow(self): + try: + r = requests.post('https://' + self.endpoint, data=self.data, timeout=10) + rel = json.loads(r.content.decode('utf-8')) + return rel + except: + return False \ No newline at end of file diff --git a/qcloud_api_v2.py b/qcloud_api_v2.py new file mode 100644 index 0000000..a964412 --- /dev/null +++ b/qcloud_api_v2.py @@ -0,0 +1,104 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import +import os + +from tencentcloud.common import credential +from tencentcloud.common.profile.client_profile import ClientProfile +from tencentcloud.common.profile.http_profile import HttpProfile +from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException +from tencentcloud.cvm.v20170312 import cvm_client, models +from ops.mmysql import MysqlBase + + +class Qclound(): + def __init__(self, area="ap-beijing"): + secretid = os.getenv("miles_id") + secretkey = os.getenv("miles_key") + + cred = credential.Credential(secretid, secretkey) + httpProfile = HttpProfile() + httpProfile.endpoint = "cvm.tencentcloudapi.com" + + clientProfile = ClientProfile() + clientProfile.httpProfile = httpProfile + self.client = cvm_client.CvmClient(cred, area, clientProfile) + + def start_instance(self, instanceid): + try: + req = models.StartInstancesRequest() + params = '{"InstanceIds":["%s"]}' % instanceid + req.from_json_string(params) + + resp = self.client.StartInstances(req) + print(resp.to_json_string()) + data = resp.to_json_string() + return True, data + + except TencentCloudSDKException as err: + print(err) + return False, err + + def stop_instance(self, instanceid): + try: + req = models.StopInstancesRequest() + params = '{"InstanceIds":["%s"],"StoppedMode":"STOP_CHARGING"}' % instanceid + req.from_json_string(params) + + resp = self.client.StopInstances(req) + print(resp.to_json_string()) + data = resp.to_json_string() + return True, data + except TencentCloudSDKException as err: + print(err) + return True, err + + def desc_instance(self, instanceid): + try: + req = models.DescribeInstancesStatusRequest() + params = '{"InstanceIds":["%s"]}' % instanceid + req.from_json_string(params) + + resp = self.client.DescribeInstancesStatus(req) + print(resp.to_json_string()) + data = resp.InstanceStatusSet[0].InstanceState + return True, data + except TencentCloudSDKException as err: + print(err) + return True, err + + def get_all_instance(self): + all = {} + try: + req = models.DescribeInstancesRequest() + for i in range(10): + params = '{"Offset":%s,"Limit":20}' % str(i) + req.from_json_string(params) + resp = self.client.DescribeInstances(req) + if resp: + for item in resp.InstanceSet: + if item: + all[item.PrivateIpAddresses[0]] = item.InstanceId + else: + break + except TencentCloudSDKException as err: + print(err) + return all + + def get_instanceid(self, ip): + all = self.get_all_instance() + if ip in all.keys(): + return all[ip] + else: + return None + + +if __name__ == "__main__": + qc = Qclound('ap-beijing') + # aa = qc.get_all_instance() + #print(aa) + id = qc.get_instanceid('10.10.7.17') + print("2 {}".format(id)) + if id: + qc.desc_instance(id) + qc.stop_instance(id) + #qc.start_instance(id) diff --git a/qcloud_data_api.py b/qcloud_data_api.py new file mode 100644 index 0000000..f45d410 --- /dev/null +++ b/qcloud_data_api.py @@ -0,0 +1,65 @@ +# -*- coding: utf-8 -*- +import requests +import redis +import json +import pdb + +TOKEN_REDIS = {'host': '10.10.3.10', 'port': 6379, 'db': 3} + + +class QcloudDataApi(): + def __init__(self, appid, secret): + self.appid = appid + self.secret = secret + pool = redis.ConnectionPool(host=TOKEN_REDIS['host'], port=TOKEN_REDIS['port'], db=TOKEN_REDIS['db'], + decode_responses=True) + self.redis = redis.Redis(connection_pool=pool) + self.token = self.get_access_token() + + def get_access_token(self): + token = self.redis.get(self.appid) + if not token: + token_url = "https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={}&secret={" \ + "}".format(self.appid, self.secret) + r = requests.get(token_url) + if r.status_code == 200: + data = r.json() + try: + self.redis.set(self.appid, data['access_token']) + self.redis.expire(self.appid, data['expires_in']) + token = data['access_token'] + except Exception as e: + print("write token to redis failed ,{}!".format(e)) + token = None + return token + + def get_data(self): + if not self.token: + print("get token failed ,pls check it first!") + return False + self.get_getUserPortrait() + + def get_getUserPortrait(self): + url1="https://api.weixin.qq.com/datacube/getweanalysisappiddailyretaininfo?access_token={}".format(self.token) + url2="https://api.weixin.qq.com/datacube/getweanalysisappiddailyvisittrend?access_token={}".format(self.token) + url = "https://api.weixin.qq.com/datacube/getweanalysisappiduserportrait?access_token={}".format(self.token) + post_data = {} + post_data['access_token'] = self.token + post_data['end_date'] = '20190617' + post_data['begin_date'] = '20190610' + headers = {'Content-Type': 'application/json'} + r = requests.post(url=url2, headers=headers, data=json.dumps(post_data)) + if r.status_code == 200: + data = r.json() + print("{}".format(data)) + + +def main(): + appid = "wxf567b46cecf6ec58" + secret = "dea5e1a99debd567eb4532a032bdc62d" + qc = QcloudDataApi(appid, secret) + qc.get_data() + + +if __name__ == "__main__": + main() diff --git a/qcloud_sign.py b/qcloud_sign.py new file mode 100644 index 0000000..544f8f3 --- /dev/null +++ b/qcloud_sign.py @@ -0,0 +1,35 @@ +# -*- coding: utf8 -*- +# author:tan +# date:2018-12-10 +# https://cloud.tencent.com/document/api/213/15693 + +""" +腾讯云api登陆鉴权 +""" + +import base64 +import hashlib +import hmac + + +# 接口鉴权 +class QcSign: + def __init__(self, secret_key, method, endpoint, params, v='3.0'): + self.method = method + self.endpoint = endpoint + self.sign_method = hashlib.sha1 + self.secret_key = secret_key + self.params = params + self.version = v + + # 生成签名串 + def sign_str(self): + if self.version == '2.0': + s = self.method + self.endpoint + "?" + else: + s = self.method + self.endpoint + "/?" + query_str = "&".join("%s=%s" % (k, self.params[k]) for k in sorted(self.params)) + # 拼接签名原文字符串 请求方法 + 请求主机 +请求路径 + ? + 请求字符串 + string_to_sign = s + query_str + hmac_str = hmac.new(self.secret_key.encode("utf8"), string_to_sign.encode("utf8"), self.sign_method).digest() + return base64.b64encode(hmac_str) \ No newline at end of file diff --git a/vcs.py b/vcs.py new file mode 100644 index 0000000..77e4c80 --- /dev/null +++ b/vcs.py @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*- +from .mlog import log +from .common import run_cmd +import os +import shutil + + +class GitRepository(): + def __init__(self): + self.base_dir = "/data/publish/git" + + def checkout(self, git_addr, project, tag): + git_dir = "{0}/{1}".format(self.base_dir, tag) + if os.path.isdir(git_dir): + shutil.rmtree(git_dir) + + clone_cmd = "cd {0} && git clone --recursive --branch {2} {1} {2} ".format(self.base_dir, git_addr, tag) + status, out = run_cmd(clone_cmd) + if status: + log.info("git clone {0} success".format(project)) + else: + log.error("git clone {0} Failed,out was {1}".format(project, out)) + raise Exception("git clone Failed") + + # tag_dir = "{0}/{1}".format(self.base_dir, tag) + # get_tag_cmd = "cd {0} && git checkout {1} && git submodule update ".format(tag_dir, tag) + # status, out = run_cmd(get_tag_cmd) + # if status: + # log.info("git tag {0} {1} success".format(project, tag)) + # else: + # log.error("git tag {0} {1} Failed,out was {2}".format(project, tag, out)) + # raise Exception("git tag Failed") + + return True + + +class SubversionRepository(): + def __init__(self): + self.base_dir = "/data/publish/svn" + + def checkout(self, svn_add, project, tag, git_dir=None): + # 实际tag无用,只记录,显示 + if not git_dir: + git_dir = "{0}/{1}".format(self.base_dir, project) + check_out_cmd = "svn export --force --no-auth-cache --non-interactive {0} {1}".format(svn_add, git_dir) + status, out = run_cmd(check_out_cmd) + if status: + log.info("svn co {0} ,tag {1} success".format(project, tag)) + return True + else: + log.error("svn co {0} {1} Failed,out was {2}".format(project, tag, out)) + raise Exception("SVN checkout Failed!")