168 lines
6.7 KiB
Python
168 lines
6.7 KiB
Python
# -*- coding: utf-8 -*-
|
||
from myredis.myredis import my_redis
|
||
import datetime
|
||
from mysql.mmysql import MysqlBase
|
||
from prod_config import mysql_promotion_config, log_path, info_expire, adkey_expire
|
||
from log.mylog import define_logger
|
||
import logging
|
||
import json
|
||
import copy
|
||
from ops.base import split_list
|
||
from collections import defaultdict
|
||
|
||
|
||
define_logger(f"{log_path}/ad_tasks.log")
|
||
log = logging.getLogger(__name__)
|
||
|
||
limit = 100
|
||
|
||
|
||
remove_list = ('ad_num', 'gameid')
|
||
|
||
|
||
def get_area_by_locationid(ldid):
|
||
mydb = MysqlBase(**mysql_promotion_config)
|
||
area = []
|
||
sql = f"select area from location where id={ldid}"
|
||
data = mydb.query(sql)
|
||
if data:
|
||
return data[0][0].strip('[]').replace('"', "")
|
||
else:
|
||
log.error(f"get area from location failed!,sql={sql}")
|
||
return area
|
||
|
||
|
||
def split_ad_info(localtionid, line):
|
||
# 从locationid获取位置相关属性,与ad信息打包,产生提供给接口用的数据
|
||
mydb = MysqlBase(**mysql_promotion_config)
|
||
# log.info(f"split_ad_info {localtionid} {line} ")
|
||
new = copy.deepcopy(line)
|
||
location_sql = f"select x,y,x_offset,y_offset,type,mode,ld_property from location WHERE id={localtionid}"
|
||
data = mydb.query(location_sql)
|
||
try:
|
||
new['x'], new['y'], new['x_offset'], new['y_offset'], new['type'], new['mode'], new['ld_property'] = data[0]
|
||
new['area'] = f"{new['area']},{new['x']},{new['y']},{new['x_offset']},{new['y_offset']}"
|
||
except Exception:
|
||
log.error(f"get localtion info by sql={location_sql} error!", exc_info=True)
|
||
return None
|
||
for item in remove_list:
|
||
new.pop(item)
|
||
return new
|
||
|
||
|
||
def produce_task():
|
||
# log.info("start produce cache !")
|
||
mydb = MysqlBase(**mysql_promotion_config)
|
||
now = datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")
|
||
run_status = 1
|
||
all = []
|
||
get_data_sql = f"""select
|
||
id,
|
||
name,
|
||
ad_num,
|
||
ad_title,
|
||
ad_body,
|
||
ad_image,
|
||
jump_param,
|
||
ad_sort,
|
||
companyid,
|
||
gameid,
|
||
channelid,
|
||
jump_status,
|
||
locationid ,
|
||
ad_property,
|
||
createtime
|
||
from
|
||
ad
|
||
where
|
||
status=1
|
||
and in_used=1
|
||
and '{now}'> begin_time
|
||
and '{now}'<end_time ;
|
||
"""
|
||
|
||
#log.info(f"select sql= {get_data_sql}")
|
||
data = mydb.query(get_data_sql)
|
||
#log.info(f"data = {data}")
|
||
if data:
|
||
for line in data:
|
||
if line:
|
||
try:
|
||
field = (
|
||
'id', 'name', 'ad_num', 'ad_title', 'ad_body', 'ad_image', 'jump_param', 'ad_sort', 'companyid',
|
||
'gameid', 'channelid', 'jump_status', 'locationid', 'ad_property', 'createtime')
|
||
item = dict()
|
||
for k, v in zip(field, line):
|
||
item[k] = v
|
||
|
||
item['createtime'] = datetime.datetime.strftime(item['createtime'], "%Y-%m-%d %H:%M:%S")
|
||
if item:
|
||
if item.get('jump_param', "") and (item['ad_property'].find("jump_param") == -1):
|
||
try:
|
||
temp = {}
|
||
temp['jump_param'] = item.get('jump_param', "")
|
||
if not item['ad_property']:
|
||
item['ad_property'] = {}
|
||
if not isinstance(item.get('ad_property', {}), dict):
|
||
if not item.get('ad_property', {}):
|
||
item['ad_property'] = {}.update(temp)
|
||
else:
|
||
item['ad_property'] = json.loads(
|
||
item.get('ad_property', {}).replace("‘", '"').replace("’", '"'))
|
||
item['ad_property'].update(temp)
|
||
item['ad_property'] = json.dumps(item['ad_property'])
|
||
except Exception:
|
||
run_status = 0
|
||
log.error(f"write {item}", exc_info=True)
|
||
all.append(item)
|
||
except Exception:
|
||
log.error(f"split data failed,data={line}", exc_info=True)
|
||
# 检查ID是否存在播放列表中,以及播放次数是否完毕
|
||
adlist = defaultdict(list)
|
||
adinfo = dict()
|
||
if all:
|
||
for line in all:
|
||
#log.info(f"get from sql was {all}")
|
||
if line:
|
||
try:
|
||
locationids = json.loads(line.get('locationid'))
|
||
for locationid in str(locationids).strip('[]').split(','):
|
||
locationid = locationid.strip()
|
||
area_all = get_area_by_locationid(locationid)
|
||
for area in area_all.split(','):
|
||
area = area.strip()
|
||
line['locationid'] = locationid
|
||
line['area'] = area
|
||
str01 = f"{line.get('gameid', 0)}_{locationid}_{area}"
|
||
key = "adlist::{0}::{1}::{2}::{3}".format(str01, line['channelid'], area, locationid)
|
||
redis_key = f"{line['id']}_{locationid}_{area}"
|
||
adinfo[redis_key] = split_ad_info(locationid, line)
|
||
if int(line['ad_num']) == 0:
|
||
adlist[key].append(redis_key)
|
||
elif int(line['ad_num']) > 0:
|
||
num = my_redis.get(f"adnum::{line['id']}_{locationid}::num") or 0
|
||
print(f" get {line['id']} {locationid} num was {num}!")
|
||
if int(line['ad_num']) > int(num):
|
||
adlist[key].append(adlist)
|
||
else:
|
||
log.error(f"get ad_num from mysql failed! ad_num={line['ad_num']}")
|
||
except Exception:
|
||
log.error(f"split {line} failed!", exc_info=True)
|
||
run_status = 0
|
||
|
||
if adlist:
|
||
for key in adlist.keys():
|
||
my_redis.set(key, json.dumps(adlist[key]))
|
||
my_redis.expire(key, adkey_expire)
|
||
|
||
for key in adinfo:
|
||
info_key = f"adinfo::{key}::info"
|
||
my_redis.hmset(info_key, adinfo[key])
|
||
my_redis.expire(info_key, info_expire)
|
||
|
||
log.info(f"produce {len(adlist)} 2 redis!")
|
||
my_redis.set("ad_produce", run_status)
|
||
now = datetime.datetime.now()
|
||
timestamp = int(datetime.datetime.timestamp(now))
|
||
my_redis.set("ad_produce_time", timestamp)
|