From ee0e3164e255e9c88482d8bf1495da6942b43ec5 Mon Sep 17 00:00:00 2001 From: pengtao Date: Sun, 29 Sep 2019 11:47:47 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=E7=BC=93=E5=AD=98=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E7=94=9F=E6=88=90=E8=84=9A=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ad_tasks.py | 185 +--------------------------------------------- clear_redis.py | 32 ++++++++ ops/ad_produce.py | 154 ++++++++++++++++++++++++++++++++++++++ ops/base.py | 13 ++++ 4 files changed, 201 insertions(+), 183 deletions(-) create mode 100644 clear_redis.py create mode 100644 ops/ad_produce.py diff --git a/ad_tasks.py b/ad_tasks.py index 6f1e669..e1064a1 100644 --- a/ad_tasks.py +++ b/ad_tasks.py @@ -1,187 +1,6 @@ # -*- coding: utf-8 -*- -from myredis.myredis import my_redis -import datetime -from mysql.mmysql import MysqlBase -from prod_config import mysql_promotion_config, log_path, info_expire, adkey_expire -from log.mylog import define_logger -import logging -import json -import copy -define_logger(f"{log_path}/ad_tasks.log") -log = logging.getLogger(__name__) - -limit = 100 - -mydb = MysqlBase(**mysql_promotion_config) -remove_list = ('ad_num', 'gameid') - - -def get_area_by_locationid(ldid): - area = [] - sql = f"select area from location where id={ldid}" - data = mydb.query(sql) - if data: - return data[0][0].strip('[]').replace('"', "") - else: - log.error(f"get area from location failed!,sql={sql}") - return area - - -def split_ad_info(localtionid, line): - # 从locationid获取位置相关属性,与ad信息打包,产生提供给接口用的数据 - new = copy.deepcopy(line) - location_sql = f"select x,y,x_offset,y_offset,type,mode,ld_property from location WHERE id={localtionid}" - data = mydb.query(location_sql) - try: - new['x'], new['y'], new['x_offset'], new['y_offset'], new['type'], new['mode'], new['ld_property'] = data[0] - new['area'] = f"{new['area']},{new['x']},{new['y']},{new['x_offset']},{new['y_offset']}" - except Exception: - log.error(f"get localtion info by sql={location_sql} error!", exc_info=True) - return None - for item in remove_list: - new.pop(item) - return new - - -def sadd_adkey(key, line): - - locationid = line['locationid'] - redis_key = f"{line['id']}_{locationid}" - my_redis.sadd(key, redis_key) - my_redis.expire(key, adkey_expire) - #log.info(f"add {redis_key} 2 {key}!") - info_key = f"adinfo::{redis_key}::info" - new_line = split_ad_info(locationid, line) - - if new_line: - my_redis.hmset(info_key, new_line) - my_redis.expire(info_key, info_expire) - #log.info(f"add info {new_line} 2 {info_key}!") - else: - log.error(f"split adinfo about locationid failed! localtionid={locationid} ") - - - -def send_cache_data(): - log.info("start produce cache !") - now = datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S") - all = [] - get_data_sql = f"""select - id, - name, - ad_num, - ad_title, - ad_body, - ad_image, - jump_param, - ad_sort, - companyid, - gameid, - channelid, - jump_status, - locationid , - ad_property, - createtime - from - ad - where - status=1 - and in_used=1 - and '{now}'> begin_time - and '{now}' 0: - num = my_redis.get(f"adnum::{line['id']}_{locationid}::num") - print(f" get {line['id']} {locationid} num was {num}!") - if not num: - num = 0 - if int(line['ad_num']) > int(num): - sadd_adkey(key, line) - else: - redis_key = f"{line['id']}_{locationid}" - if my_redis.sismember(key, redis_key): - my_redis.srem(key, redis_key) - log.info(f"remove {redis_key} from {key}!") - else: - log.error(f"get ad_num from mysql failed! ad_num={line['ad_num']}") - log.info("end produce cache!") - - # # 删除过期的数据 - # log.info("remove expire data from cache!") - # expire_sql = f"""select - # id,gameid,channelid,locationid,status - # from - # ad - # where - # '{now}'> end_time - # or status in (3,4) or in_used=0""" - # remove_data = mydb.query(expire_sql) - # if remove_data: - # for line in remove_data: - # try: - # id, gameid, channelid, locationids, status = line - # for locationid in locationids.replace("[", "").replace("]", "").replace('"', "").split(','): - # locationid = locationid.strip() - # area_all = get_area_by_locationid(locationid) - # if area_all: - # for area in area_all.split(','): - # area = area.strip() - # key = f"ad::{gameid}_{locationid}::{channelid}::{area}::{locationid}" - # values = f"{id}_{locationid}" - # if my_redis.sismember(key, values): - # my_redis.srem(key, values) - # my_redis.expire(f"adinfo::{values}::info", 1) - # log.info(f"remove {id} from {key} success!") - # except Exception: - # log.error("拆解过期数据出错!", exc_info=True) +from ops.ad_produce import produce_task if __name__ == "__main__": #log.info('start!') - send_cache_data() + produce_task() diff --git a/clear_redis.py b/clear_redis.py new file mode 100644 index 0000000..3475258 --- /dev/null +++ b/clear_redis.py @@ -0,0 +1,32 @@ +# -*- coding: utf-8 -*- +from myredis.myredis import my_redis +from log.mylog import define_logger +import logging + +define_logger("/data/logs/ops/clear_redis.log") +log = logging.getLogger(__name__) + + +def find_key(partten): + keys = [] + for key in my_redis.keys(pattern=partten): + if key: + keys.append(key) + return keys + + +def clear_keys(keys): + for key in keys: + my_redis.expire(key, 1) + log.info(f"expirt {len(keys)} success!") + + +def main(): + partten = "ad::*::num" + keys = find_key(partten) + if keys: + clear_keys(keys) + + +if __name__ == "__main__": + main() diff --git a/ops/ad_produce.py b/ops/ad_produce.py new file mode 100644 index 0000000..2a284c9 --- /dev/null +++ b/ops/ad_produce.py @@ -0,0 +1,154 @@ +# -*- coding: utf-8 -*- +from myredis.myredis import my_redis +import datetime +from mysql.mmysql import MysqlBase +from prod_config import mysql_promotion_config, log_path, info_expire, adkey_expire +from log.mylog import define_logger +import logging +import json +import copy +from ops.base import split_list + +define_logger(f"{log_path}/ad_tasks.log") +log = logging.getLogger(__name__) + +limit = 100 + +mydb = MysqlBase(**mysql_promotion_config) +remove_list = ('ad_num', 'gameid') + + +def get_area_by_locationid(ldid): + area = [] + sql = f"select area from location where id={ldid}" + data = mydb.query(sql) + if data: + return data[0][0].strip('[]').replace('"', "") + else: + log.error(f"get area from location failed!,sql={sql}") + return area + + +def split_ad_info(localtionid, line): + # 从locationid获取位置相关属性,与ad信息打包,产生提供给接口用的数据 + new = copy.deepcopy(line) + location_sql = f"select x,y,x_offset,y_offset,type,mode,ld_property from location WHERE id={localtionid}" + data = mydb.query(location_sql) + try: + new['x'], new['y'], new['x_offset'], new['y_offset'], new['type'], new['mode'], new['ld_property'] = data[0] + new['area'] = f"{new['area']},{new['x']},{new['y']},{new['x_offset']},{new['y_offset']}" + except Exception: + log.error(f"get localtion info by sql={location_sql} error!", exc_info=True) + return None + for item in remove_list: + new.pop(item) + return new + + +def sadd_adkey(key, line): + locationid = line['locationid'] + redis_key = f"{line['id']}_{locationid}" + my_redis.sadd(key, redis_key) + my_redis.expire(key, adkey_expire) + # log.info(f"add {redis_key} 2 {key}!") + info_key = f"adinfo::{redis_key}::info" + new_line = split_ad_info(locationid, line) + + if new_line: + my_redis.hmset(info_key, new_line) + my_redis.expire(info_key, info_expire) # log.info(f"add info {new_line} 2 {info_key}!") + else: + log.error(f"split adinfo about locationid failed! localtionid={locationid} ") + + +def produce_task(): + log.info("start produce cache !") + now = datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S") + all = [] + get_data_sql = f"""select + id, + name, + ad_num, + ad_title, + ad_body, + ad_image, + jump_param, + ad_sort, + companyid, + gameid, + channelid, + jump_status, + locationid , + ad_property, + createtime + from + ad + where + status=1 + and in_used=1 + and '{now}'> begin_time + and '{now}' 0: + num = my_redis.get(f"adnum::{line['id']}_{locationid}::num") + print(f" get {line['id']} {locationid} num was {num}!") + if not num: + num = 0 + if int(line['ad_num']) > int(num): + sadd_adkey(key, line) + else: + redis_key = f"{line['id']}_{locationid}" + if my_redis.sismember(key, redis_key): + my_redis.srem(key, redis_key) + log.info(f"remove {redis_key} from {key}!") + else: + log.error(f"get ad_num from mysql failed! ad_num={line['ad_num']}") + except Exception: + log.error(f"split {line} failed!", exc_info=True) + log.info("end produce cache!") diff --git a/ops/base.py b/ops/base.py index 9a29f3b..c20163f 100644 --- a/ops/base.py +++ b/ops/base.py @@ -11,3 +11,16 @@ def my_quote(data): return urlencode(data) elif isinstance(data, str): return quote(data) + + +def split_list(field, data_list): + data = dict() + if not data_list: + return None + if not (isinstance(field, list) and isinstance(data_list, list)): + return None + if len(field) != len(data_list): + return None + for i, v in enumerate(data_list): + data[v] = field[i] + return data