promotion/ad_tasks.py
2019-09-17 16:39:16 +08:00

187 lines
7.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
from myredis.myredis import my_redis
import datetime
from mysql.mmysql import MysqlBase
from prod_config import mysql_promotion_config, log_path, ad_list_interface_port, BEGIN, END
from log.mylog import define_logger
import logging
import pdb
import json
import copy
define_logger(f"{log_path}/ad_tasks.log")
log = logging.getLogger(__name__)
limit = 100
BEGIN = '1999-01-01'
END = '3000-01-01'
mydb = MysqlBase(**mysql_promotion_config)
remove_list = ('ad_num', 'gameid')
def get_area_by_locationid(ldid):
area = []
sql = f"select area from location where id={ldid}"
data = mydb.query(sql)
if data:
return data[0][0].replace("[", "").replace("]", "").replace('"', "")
else:
log.error(f"get area from location failed!,sql={sql}")
return area
def split_ad_info(localtionid, line):
# 从locationid获取位置相关属性与ad信息打包产生提供给接口用的数据
new = copy.deepcopy(line)
location_sql = f"select x,y,x_offset,y_offset,type,mode,ld_property from location WHERE in_used=1 and id={localtionid}"
data = mydb.query(location_sql)
try:
new['x'], new['y'], new['x_offset'], new['y_offset'], new['type'], new['mode'], new['ld_property'] = data[0]
new['area'] = f"{new['area']},{new['x']},{new['y']},{new['x_offset']},{new['y_offset']}"
except Exception:
log.error(f"get localtion info by sql={location_sql} error!", exc_info=True)
return None
for item in remove_list:
new.pop(item)
return new
def sadd_adkey(key, line):
locationid = line['locationid']
redis_key = f"{line['id']}_{locationid}"
my_redis.sadd(key, redis_key)
my_redis.expire(key, 60*2)
#log.info(f"add {redis_key} 2 {key}!")
info_key = f"adinfo::{redis_key}::info"
new_line = split_ad_info(locationid, line)
if new_line:
my_redis.hmset(info_key, new_line)
my_redis.expire(info_key, 60 * 60*3)
#log.info(f"add info {new_line} 2 {info_key}!")
else:
log.error(f"split adinfo about locationid failed! localtionid={locationid} ")
def send_cache_data():
log.info("start update cache !")
now = datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")
all = []
get_data_sql = f"""select
id,
name,
ad_num,
ad_title,
ad_body,
ad_image,
jump_param,
ad_sort,
companyid,
gameid,
channelid,
jump_status,
locationid ,
ad_property,
createtime
from
ad
where
status=1
and in_used=1
and '{now}'> begin_time
and '{now}'<end_time ;
"""
data = mydb.query(get_data_sql)
if data:
for line in data:
if line:
item = {}
try:
item['id'], item['name'], item['ad_num'], item['ad_title'], item['ad_body'], item['ad_image'], item[
'jump_param'], item['ad_sort'], item['companyid'], item['gameid'], item['channelid'], item[
'jump_status'], item['locationid'], item['ad_property'],item['createtime'] = line
item['createtime'] = datetime.datetime.strftime(item['createtime'], "%Y-%m-%d %H:%M:%S")
if item.get('jump_param', "") and (item['ad_property'].find("jump_param") == -1):
try:
temp = {}
temp['jump_param'] = item.get('jump_param', "")
if not item['ad_property']:
item['ad_property'] = {}
if not isinstance(item.get('ad_property', {}), dict):
if not item.get('ad_property', {}):
item['ad_property'] = {}.update(temp)
else:
item['ad_property'] = json.loads(
item.get('ad_property', {}).replace("", '"').replace("", '"'))
item['ad_property'].update(temp)
item['ad_property'] = json.dumps(item['ad_property'])
except Exception:
log.error(f"write {item}", exc_info=True)
all.append(item)
except Exception:
log.error("split data failed", exc_info=True)
# 检查ID是否存在播放列表中以及播放次数是否完毕
if all:
for line in all:
if line:
locationids = json.loads(line.get('locationid'))
for locationid in locationids:
area_all = get_area_by_locationid(locationid)
for area in area_all.split(','):
line['locationid'] = locationid
line['area'] = area
key = f"ad::{line.get('gameid', 0)}_{locationid}_{area}::{line.get('channelid', 0)}::{area}::{locationid}"
if int(line['ad_num']) == 0:
sadd_adkey(key, line)
elif int(line['ad_num']) > 0:
num = my_redis.get(f"adnum::{line['id']}_{locationid}::num")
print(f" get {line['id']} {locationid} num was {num}!")
if not num:
num = 0
if int(line['ad_num']) > int(num):
sadd_adkey(key, line)
else:
redis_key = f"{line['id']}_{locationid}"
if my_redis.sismember(key, redis_key):
my_redis.srem(key, redis_key)
log.info(f"remove {redis_key} from {key}!")
else:
log.error(f"get ad_num from mysql failed! ad_num={line['ad_num']}")
# 删除过期的数据
log.info("remove expire data from cache!")
expire_sql = f"""select
id,gameid,channelid,locationid,status
from
ad
where
'{now}'> end_time
or status in (3,4) """
remove_data = mydb.query(expire_sql)
if remove_data:
for line in remove_data:
try:
id, gameid, channelid, locationids, status = line
for locationid in locationids.replace("[", "").replace("]", "").replace('"', "").split(','):
area_all = get_area_by_locationid(locationid)
for area in area_all.split(','):
key = f"ad::{gameid}_{locationid}::{channelid}::{area}::{locationid}"
values = f"{id}_{locationid}"
if my_redis.sismember(key, values):
my_redis.srem(key, values)
my_redis.expire(f"adinfo::{values}::info", 1)
log.info(f"remove {id} from {key} success!")
except Exception:
log.error("拆解过期数据出错!", exc_info=True)
if __name__ == "__main__":
log.info('start!')
send_cache_data()