调整缓存数据生成脚本

This commit is contained in:
pengtao 2019-09-29 11:47:47 +08:00
parent ec58deeece
commit ee0e3164e2
4 changed files with 201 additions and 183 deletions

View File

@ -1,187 +1,6 @@
# -*- coding: utf-8 -*-
from myredis.myredis import my_redis
import datetime
from mysql.mmysql import MysqlBase
from prod_config import mysql_promotion_config, log_path, info_expire, adkey_expire
from log.mylog import define_logger
import logging
import json
import copy
define_logger(f"{log_path}/ad_tasks.log")
log = logging.getLogger(__name__)
limit = 100
mydb = MysqlBase(**mysql_promotion_config)
remove_list = ('ad_num', 'gameid')
def get_area_by_locationid(ldid):
area = []
sql = f"select area from location where id={ldid}"
data = mydb.query(sql)
if data:
return data[0][0].strip('[]').replace('"', "")
else:
log.error(f"get area from location failed!,sql={sql}")
return area
def split_ad_info(localtionid, line):
# 从locationid获取位置相关属性与ad信息打包产生提供给接口用的数据
new = copy.deepcopy(line)
location_sql = f"select x,y,x_offset,y_offset,type,mode,ld_property from location WHERE id={localtionid}"
data = mydb.query(location_sql)
try:
new['x'], new['y'], new['x_offset'], new['y_offset'], new['type'], new['mode'], new['ld_property'] = data[0]
new['area'] = f"{new['area']},{new['x']},{new['y']},{new['x_offset']},{new['y_offset']}"
except Exception:
log.error(f"get localtion info by sql={location_sql} error!", exc_info=True)
return None
for item in remove_list:
new.pop(item)
return new
def sadd_adkey(key, line):
locationid = line['locationid']
redis_key = f"{line['id']}_{locationid}"
my_redis.sadd(key, redis_key)
my_redis.expire(key, adkey_expire)
#log.info(f"add {redis_key} 2 {key}!")
info_key = f"adinfo::{redis_key}::info"
new_line = split_ad_info(locationid, line)
if new_line:
my_redis.hmset(info_key, new_line)
my_redis.expire(info_key, info_expire)
#log.info(f"add info {new_line} 2 {info_key}!")
else:
log.error(f"split adinfo about locationid failed! localtionid={locationid} ")
def send_cache_data():
log.info("start produce cache !")
now = datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")
all = []
get_data_sql = f"""select
id,
name,
ad_num,
ad_title,
ad_body,
ad_image,
jump_param,
ad_sort,
companyid,
gameid,
channelid,
jump_status,
locationid ,
ad_property,
createtime
from
ad
where
status=1
and in_used=1
and '{now}'> begin_time
and '{now}'<end_time ;
"""
data = mydb.query(get_data_sql)
if data:
for line in data:
if line:
item = {}
try:
item['id'], item['name'], item['ad_num'], item['ad_title'], item['ad_body'], item['ad_image'], item[
'jump_param'], item['ad_sort'], item['companyid'], item['gameid'], item['channelid'], item[
'jump_status'], item['locationid'], item['ad_property'],item['createtime'] = line
item['createtime'] = datetime.datetime.strftime(item['createtime'], "%Y-%m-%d %H:%M:%S")
if item.get('jump_param', "") and (item['ad_property'].find("jump_param") == -1):
try:
temp = {}
temp['jump_param'] = item.get('jump_param', "")
if not item['ad_property']:
item['ad_property'] = {}
if not isinstance(item.get('ad_property', {}), dict):
if not item.get('ad_property', {}):
item['ad_property'] = {}.update(temp)
else:
item['ad_property'] = json.loads(
item.get('ad_property', {}).replace("", '"').replace("", '"'))
item['ad_property'].update(temp)
item['ad_property'] = json.dumps(item['ad_property'])
except Exception:
log.error(f"write {item}", exc_info=True)
all.append(item)
except Exception:
log.error("split data failed", exc_info=True)
# 检查ID是否存在播放列表中以及播放次数是否完毕
if all:
for line in all:
if line:
locationids = json.loads(line.get('locationid'))
for locationid in str(locationids).strip('[]').split(','):
locationid = locationid.strip()
area_all = get_area_by_locationid(locationid)
for area in area_all.split(','):
area = area.strip()
line['locationid'] = locationid
line['area'] = area
key = f"ad::{line.get('gameid', 0)}_{locationid}_{area}::{line.get('channelid', 0)}::{area}::{locationid}"
if int(line['ad_num']) == 0:
sadd_adkey(key, line)
elif int(line['ad_num']) > 0:
num = my_redis.get(f"adnum::{line['id']}_{locationid}::num")
print(f" get {line['id']} {locationid} num was {num}!")
if not num:
num = 0
if int(line['ad_num']) > int(num):
sadd_adkey(key, line)
else:
redis_key = f"{line['id']}_{locationid}"
if my_redis.sismember(key, redis_key):
my_redis.srem(key, redis_key)
log.info(f"remove {redis_key} from {key}!")
else:
log.error(f"get ad_num from mysql failed! ad_num={line['ad_num']}")
log.info("end produce cache!")
# # 删除过期的数据
# log.info("remove expire data from cache!")
# expire_sql = f"""select
# id,gameid,channelid,locationid,status
# from
# ad
# where
# '{now}'> end_time
# or status in (3,4) or in_used=0"""
# remove_data = mydb.query(expire_sql)
# if remove_data:
# for line in remove_data:
# try:
# id, gameid, channelid, locationids, status = line
# for locationid in locationids.replace("[", "").replace("]", "").replace('"', "").split(','):
# locationid = locationid.strip()
# area_all = get_area_by_locationid(locationid)
# if area_all:
# for area in area_all.split(','):
# area = area.strip()
# key = f"ad::{gameid}_{locationid}::{channelid}::{area}::{locationid}"
# values = f"{id}_{locationid}"
# if my_redis.sismember(key, values):
# my_redis.srem(key, values)
# my_redis.expire(f"adinfo::{values}::info", 1)
# log.info(f"remove {id} from {key} success!")
# except Exception:
# log.error("拆解过期数据出错!", exc_info=True)
from ops.ad_produce import produce_task
if __name__ == "__main__":
#log.info('start!')
send_cache_data()
produce_task()

32
clear_redis.py Normal file
View File

@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
from myredis.myredis import my_redis
from log.mylog import define_logger
import logging
define_logger("/data/logs/ops/clear_redis.log")
log = logging.getLogger(__name__)
def find_key(partten):
keys = []
for key in my_redis.keys(pattern=partten):
if key:
keys.append(key)
return keys
def clear_keys(keys):
for key in keys:
my_redis.expire(key, 1)
log.info(f"expirt {len(keys)} success!")
def main():
partten = "ad::*::num"
keys = find_key(partten)
if keys:
clear_keys(keys)
if __name__ == "__main__":
main()

154
ops/ad_produce.py Normal file
View File

@ -0,0 +1,154 @@
# -*- coding: utf-8 -*-
from myredis.myredis import my_redis
import datetime
from mysql.mmysql import MysqlBase
from prod_config import mysql_promotion_config, log_path, info_expire, adkey_expire
from log.mylog import define_logger
import logging
import json
import copy
from ops.base import split_list
define_logger(f"{log_path}/ad_tasks.log")
log = logging.getLogger(__name__)
limit = 100
mydb = MysqlBase(**mysql_promotion_config)
remove_list = ('ad_num', 'gameid')
def get_area_by_locationid(ldid):
area = []
sql = f"select area from location where id={ldid}"
data = mydb.query(sql)
if data:
return data[0][0].strip('[]').replace('"', "")
else:
log.error(f"get area from location failed!,sql={sql}")
return area
def split_ad_info(localtionid, line):
# 从locationid获取位置相关属性与ad信息打包产生提供给接口用的数据
new = copy.deepcopy(line)
location_sql = f"select x,y,x_offset,y_offset,type,mode,ld_property from location WHERE id={localtionid}"
data = mydb.query(location_sql)
try:
new['x'], new['y'], new['x_offset'], new['y_offset'], new['type'], new['mode'], new['ld_property'] = data[0]
new['area'] = f"{new['area']},{new['x']},{new['y']},{new['x_offset']},{new['y_offset']}"
except Exception:
log.error(f"get localtion info by sql={location_sql} error!", exc_info=True)
return None
for item in remove_list:
new.pop(item)
return new
def sadd_adkey(key, line):
locationid = line['locationid']
redis_key = f"{line['id']}_{locationid}"
my_redis.sadd(key, redis_key)
my_redis.expire(key, adkey_expire)
# log.info(f"add {redis_key} 2 {key}!")
info_key = f"adinfo::{redis_key}::info"
new_line = split_ad_info(locationid, line)
if new_line:
my_redis.hmset(info_key, new_line)
my_redis.expire(info_key, info_expire) # log.info(f"add info {new_line} 2 {info_key}!")
else:
log.error(f"split adinfo about locationid failed! localtionid={locationid} ")
def produce_task():
log.info("start produce cache !")
now = datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")
all = []
get_data_sql = f"""select
id,
name,
ad_num,
ad_title,
ad_body,
ad_image,
jump_param,
ad_sort,
companyid,
gameid,
channelid,
jump_status,
locationid ,
ad_property,
createtime
from
ad
where
status=1
and in_used=1
and '{now}'> begin_time
and '{now}'<end_time ;
"""
data = mydb.query(get_data_sql)
if data:
for line in data:
if line:
try:
field = ['id', 'name', 'ad_num', 'ad_title', 'ad_body', 'ad_image', 'jump_param', 'ad_sort',
'companyid', 'gameid', 'channelid', 'jump_status', 'locationid', 'ad_property', 'createtime']
item = split_list(field, line)
if item:
if item.get('jump_param', "") and (item['ad_property'].find("jump_param") == -1):
try:
temp = {}
temp['jump_param'] = item.get('jump_param', "")
if not item['ad_property']:
item['ad_property'] = {}
if not isinstance(item.get('ad_property', {}), dict):
if not item.get('ad_property', {}):
item['ad_property'] = {}.update(temp)
else:
item['ad_property'] = json.loads(
item.get('ad_property', {}).replace("", '"').replace("", '"'))
item['ad_property'].update(temp)
item['ad_property'] = json.dumps(item['ad_property'])
except Exception:
log.error(f"write {item}", exc_info=True)
all.append(item)
except Exception:
log.error("split data failed", exc_info=True)
# 检查ID是否存在播放列表中以及播放次数是否完毕
if all:
for line in all:
if line:
try:
locationids = json.loads(line.get('locationid'))
for locationid in str(locationids).strip('[]').split(','):
locationid = locationid.strip()
area_all = get_area_by_locationid(locationid)
for area in area_all.split(','):
area = area.strip()
line['locationid'] = locationid
line['area'] = area
str01 = f"{line.get('gameid', 0)}_{locationid}_{area}"
key = "ad::{0}::{1}::{2}".format(str01, area, locationid)
if int(line['ad_num']) == 0:
sadd_adkey(key, line)
elif int(line['ad_num']) > 0:
num = my_redis.get(f"adnum::{line['id']}_{locationid}::num")
print(f" get {line['id']} {locationid} num was {num}!")
if not num:
num = 0
if int(line['ad_num']) > int(num):
sadd_adkey(key, line)
else:
redis_key = f"{line['id']}_{locationid}"
if my_redis.sismember(key, redis_key):
my_redis.srem(key, redis_key)
log.info(f"remove {redis_key} from {key}!")
else:
log.error(f"get ad_num from mysql failed! ad_num={line['ad_num']}")
except Exception:
log.error(f"split {line} failed!", exc_info=True)
log.info("end produce cache!")

View File

@ -11,3 +11,16 @@ def my_quote(data):
return urlencode(data)
elif isinstance(data, str):
return quote(data)
def split_list(field, data_list):
data = dict()
if not data_list:
return None
if not (isinstance(field, list) and isinstance(data_list, list)):
return None
if len(field) != len(data_list):
return None
for i, v in enumerate(data_list):
data[v] = field[i]
return data