192 lines
6.6 KiB
Python
192 lines
6.6 KiB
Python
# -*- coding: utf-8 -*-
|
||
from myredis.myredis import my_redis
|
||
import datetime
|
||
from mysql.mmysql import MysqlBase
|
||
from prod_config import mysql_promotion_config
|
||
from log.mylog import define_logger
|
||
import logging
|
||
from prod_config import BEGIN, END, ad_list_interface_port
|
||
import pdb
|
||
import json
|
||
define_logger("/data/logs/ad_tasks.log")
|
||
log = logging.getLogger(__name__)
|
||
|
||
limit = 100
|
||
|
||
ad_list_interface_port = 5014
|
||
BEGIN = '1999-01-01'
|
||
END = '3000-01-01'
|
||
|
||
|
||
|
||
def send_cache_data():
|
||
mydb = MysqlBase(**mysql_promotion_config)
|
||
log.info("start update cache !")
|
||
now = datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")
|
||
all = []
|
||
|
||
get_data_sql = f"""select
|
||
a.id,
|
||
a.name,
|
||
a.ad_num,
|
||
a.ad_title,
|
||
a.ad_body,
|
||
a.ad_image,
|
||
a.jump_param,
|
||
a.ad_sort,
|
||
a.companyid,
|
||
a.gameid,
|
||
a.channelid,
|
||
a.jump_status,
|
||
a.ad_property,
|
||
b.area,
|
||
b.type,
|
||
b.mode,
|
||
a.locationid
|
||
from
|
||
(select
|
||
id,
|
||
name,
|
||
ad_num,
|
||
ad_title,
|
||
ad_body,
|
||
ad_image,
|
||
jump_param,
|
||
ad_sort,
|
||
companyid,
|
||
gameid,
|
||
channelid,
|
||
jump_status,
|
||
locationid ,
|
||
ad_property
|
||
from
|
||
ad
|
||
where
|
||
status=1
|
||
and in_used=1
|
||
and '{now}'> begin_time
|
||
and '{now}'<end_time ) a ,
|
||
(select
|
||
id,
|
||
area,
|
||
type,
|
||
mode
|
||
from
|
||
location) b
|
||
where
|
||
a.locationid=b.id"""
|
||
|
||
data = mydb.query(get_data_sql)
|
||
if data:
|
||
for line in data:
|
||
|
||
if line:
|
||
item = {}
|
||
try:
|
||
item['id'], item['name'], item['ad_num'], item['ad_title'], item['ad_body'], item['ad_image'], item[
|
||
'jump_param'], item['ad_sort'], item['companyid'], item['gameid'], item['channelid'], item[
|
||
'jump_status'], item['ad_property'], item['area'], item['type'], item['mode'], item[
|
||
'locationid'] = line
|
||
try:
|
||
item['x_offset'] = item['area'].split(',')[-2].strip()
|
||
except:
|
||
item['x_offset'] = 0
|
||
try:
|
||
item['y_offset'] = item['area'].split(',')[-1].strip().strip(')')
|
||
except:
|
||
item['y_offset'] = 0
|
||
|
||
if item.get('jump_param', "") and (item['ad_property'].find("jump_param") == -1):
|
||
try:
|
||
temp = {}
|
||
temp['jump_param'] = item.get('jump_param', "")
|
||
if not item['ad_property']:
|
||
item['ad_property'] = {}
|
||
if not isinstance(item.get('ad_property', {}), dict):
|
||
if not item.get('ad_property', {}):
|
||
item['ad_property'] = {}.update(temp)
|
||
else:
|
||
item['ad_property'] = json.loads(
|
||
item.get('ad_property', {}).replace("‘", '"').replace("’", '"'))
|
||
item['ad_property'].update(temp)
|
||
item['ad_property'] = json.dumps(item['ad_property'])
|
||
except Exception:
|
||
log.error(f"write {item}", exc_info=True)
|
||
|
||
all.append(item)
|
||
except Exception:
|
||
log.error("split data failed", exc_info=True)
|
||
# 检查ID是否存在播放列表中,以及播放次数是否完毕
|
||
if all:
|
||
# log.info(f"get data was {all}!\n")
|
||
for line in all:
|
||
key = f"ad::{line.get('gameid', 0)}::{line.get('channelid', 0)}::{line.get('area').split(',')[0].strip()}::{line.get('locationid', 0)}"
|
||
if int(line['ad_num']) > 0:
|
||
num = my_redis.get(f"ad::{line['id']}::num")
|
||
if not num:
|
||
num = 0
|
||
if int(line['ad_num']) > int(num):
|
||
my_redis.sadd(key, line['id'])
|
||
my_redis.expire(key, 120)
|
||
n = int(line['ad_num']) - int(num)
|
||
log.info(f"add {line['id']} to {key} ,num was {line['ad_num']},limit was {n}!")
|
||
else:
|
||
if my_redis.sismember(key, line['id']):
|
||
my_redis.srem(key, line['id'])
|
||
log.info(f"remove {line['id']} from {key}!")
|
||
elif int(line['ad_num']) == 0:
|
||
my_redis.sadd(key, line['id'])
|
||
my_redis.expire(key, 120)
|
||
log.info(f"add {line['id']} to {key} ,num was unlimit !")
|
||
else:
|
||
log.error(f"get ad_num from mysql failed! ad_num={line['ad_num']}")
|
||
|
||
if not my_redis.exists(f"ad::{line['id']}::info"):
|
||
# remove some filed from adinfo
|
||
remove_list = ('ad_num', 'gameid')
|
||
for item in remove_list:
|
||
line.pop(item)
|
||
my_redis.hmset(f"ad::{line['id']}::info", line)
|
||
# my_redis.expire(f"ad::{line['id']}::info", 60)
|
||
my_redis.expire(f"ad::{line['id']}::info", 3600 * 24)
|
||
log.info(f"add ad::{line['id']}::info to redis!")
|
||
|
||
# 删除过期的数据
|
||
log.info("remove expire data from cache!")
|
||
|
||
expire_sql = f"""select
|
||
a.id,a.gameid,a.channelid,a.locationid,a.status,b.area
|
||
from
|
||
(select
|
||
id,gameid,channelid,locationid,status
|
||
from
|
||
ad
|
||
where
|
||
'{now}'> end_time
|
||
or status in (3,4) ) a ,
|
||
(select
|
||
id,
|
||
area
|
||
from
|
||
location) b
|
||
where
|
||
a.locationid=b.id"""
|
||
remove_data = mydb.query(expire_sql)
|
||
if remove_data:
|
||
for line in remove_data:
|
||
try:
|
||
id, gameid, channelid, locationid, _, area = line
|
||
key = f"ad::{gameid}::{channelid}::{area.split(',')[0].strip()}::{locationid}"
|
||
# key = f"{gameid}:{locationid}"
|
||
if my_redis.sismember(key, id):
|
||
my_redis.srem(key, id)
|
||
my_redis.expire(f"ad::{id}::info", 1)
|
||
log.info(f"remove {id} from {key} success!")
|
||
except Exception:
|
||
log.error("拆解过期数据出错!", exc_info=True)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
log.info('start!')
|
||
send_cache_data()
|