122 lines
3.6 KiB
Python
122 lines
3.6 KiB
Python
# -*- coding: utf-8 -*-
|
|
import sys
|
|
from .mlog import log
|
|
import functools
|
|
|
|
|
|
def tarce(f):
|
|
@functools.wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
print(f"func name was {__name__}, args was {f}, {args}, {kwargs}")
|
|
result = f(*args, **kwargs)
|
|
print(f"return was {result}!")
|
|
|
|
return decorated_function
|
|
|
|
|
|
def send_ops_mail(sub, body, sendto):
|
|
import requests
|
|
mail_url = 'http://10.10.3.10:5011/mail/api/v2.0/alarm'
|
|
data = {"sub": sub, "content": body, "sendto": sendto}
|
|
r = requests.post(url=mail_url, data=data)
|
|
if r.status_code == 200:
|
|
log.info("send mail success!")
|
|
|
|
else:
|
|
err_msg = "send mail failed ,output was %s" % r.content
|
|
log.error("send mail failed ,{0}".format(err_msg))
|
|
|
|
|
|
def send_alter_mail(sub, body, sendto):
|
|
import requests
|
|
mail_url = 'http://10.10.3.16:50010/mail/api/v2.0/send'
|
|
data = {"sub": sub, "content": body, "sendto": sendto}
|
|
r = requests.post(url=mail_url, data=data)
|
|
if r.status_code == 200:
|
|
log.info("send mail success!")
|
|
else:
|
|
err_msg = "send mail failed ,output was %s" % r.content
|
|
log.error("send mail failed ,{0}".format(err_msg))
|
|
|
|
|
|
def run_cmd(cmd):
|
|
import subprocess
|
|
msg = "Starting run: %s " % cmd
|
|
#log.info("run_cmd {0}".format(msg))
|
|
cmdref = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
|
|
output, error_info = cmdref.communicate()
|
|
if cmdref.returncode != 0:
|
|
if isinstance(error_info, list) or isinstance(error_info, tuple):
|
|
error_info = error_info[0]
|
|
#msg = "RUN %s ERROR,error info: %s" % (cmd, error_info)
|
|
#log.error(msg)
|
|
return False, error_info
|
|
|
|
else:
|
|
msg = "run %s success" % cmd
|
|
#log.info(msg)
|
|
# print "Run Success!!"
|
|
return True, output
|
|
|
|
|
|
def str_datetime(step=0):
|
|
import datetime
|
|
return (datetime.datetime.now() - datetime.timedelta(int(step))).strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
|
|
def str_short_time(step=0):
|
|
import datetime
|
|
return (datetime.datetime.now() - datetime.timedelta(int(step))).strftime("%Y%m%d%H%M%S")
|
|
|
|
|
|
def write_files(filename, data):
|
|
with open(filename, 'a+') as f:
|
|
f.write(data)
|
|
f.write('\n')
|
|
|
|
|
|
def read_files(filename):
|
|
with open(filename, 'r') as f:
|
|
data = f.read().strip()
|
|
return data
|
|
|
|
|
|
def cal_crc32(data):
|
|
import binascii
|
|
return binascii.crc32(data.encode())
|
|
|
|
|
|
def cal_db_num(data, db=16):
|
|
import binascii
|
|
data_crc = binascii.crc32(data.encode())
|
|
return int(data_crc % db) + 1
|
|
|
|
|
|
def to_str(bytes_or_str):
|
|
if isinstance(bytes_or_str, bytes):
|
|
value = bytes_or_str.decode('utf-8')
|
|
else:
|
|
value = bytes_or_str
|
|
return value
|
|
|
|
|
|
# 返回格式化时间的下一天
|
|
def next_day(day, i=1):
|
|
import datetime
|
|
try:
|
|
begin = datetime.datetime.strptime(day, '%Y-%m-%d')
|
|
except:
|
|
raise Exception("PLS input time as '2019-03-01!'")
|
|
next_day = (begin + datetime.timedelta(days=i))
|
|
return datetime.datetime.strftime(next_day, "%Y-%m-%d")
|
|
|
|
|
|
def class2dict(a):
|
|
try:
|
|
dd = dict((name, getattr(a, name)) for name in dir(a) if
|
|
(not name.startswith('_') and not name.startswith('__') and not callable(getattr(a, name))))
|
|
except Exception as e:
|
|
print("{} 转换到dict失败 ,{}".format(a, e))
|
|
return None
|
|
return dd # a=open(filename,'r').read() # time.strftime("%Y%m%d %H:%M:%S") # print( '20180807 15:23:29') # # time.strptime("20180702","%Y%m%d") # time.struct_time(tm_year=2018, tm_mon=7, tm_mday=2, tm_hour=0, tm_min=0, tm_sec=0, tm_wday=0, tm_yday=183, tm_isdst=-1)
|