redis remove key 之前判断该key是否存在

This commit is contained in:
pengtao 2019-07-17 19:50:33 +08:00
parent 3bd7da3a9f
commit 75b884d1c1
2 changed files with 174 additions and 176 deletions

View File

@ -22,173 +22,171 @@ limit = 3
def send_cache_data(): def send_cache_data():
mydb = MysqlBase(**mysql_promotion_config) mydb = MysqlBase(**mysql_promotion_config)
log.info("start update cache !") log.info("start update cache !")
now = datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S") now = datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")
all = [] all = []
# 添加无天数限定的记录 # 添加无天数限定的记录
get_full_data = f"select a.id,a.name,a.ad_num,a.ad_title,a.ad_body,a.ad_image,a.ad_url,a.ad_sort,a.companyid,a.locationid,a.gameid,b.appid from ad a,company b where a.companyid=b.id and a.begin_time='{BEGIN}' or a.end_time='{END}'" get_full_data = f"select a.id,a.name,a.ad_num,a.ad_title,a.ad_body,a.ad_image,a.ad_url,a.ad_sort,a.companyid,a.locationid,a.gameid,b.appid from ad a,company b where a.companyid=b.id and a.begin_time='{BEGIN}' or a.end_time='{END}'"
full_data = mydb.query(get_full_data) full_data = mydb.query(get_full_data)
if full_data: if full_data:
for line in full_data: for line in full_data:
if line: if line:
log.info(f"line was {line}") log.info(f"line was {line}")
item = {} item = {}
try: try:
item['id'], item['name'], item['ad_num'], item['ad_title'], item['ad_body'], item['ad_image'], item[ item['id'], item['name'], item['ad_num'], item['ad_title'], item['ad_body'], item['ad_image'], item['ad_url'], \
'ad_url'], item['ad_sort'], item['companyid'], item['locationid'], item['gameid'], item[ item['ad_sort'], item['companyid'], item['locationid'], item['gameid'], item['appid'] = line
'appid'] = line all.append(item)
all.append(item) except Exception:
except Exception: log.error("split data failed", exc_info=True)
log.error("split data failed", exc_info=True)
# 添加有天数限定的记录 # 添加有天数限定的记录
get_data_sql = f"select a.id,a.name,a.ad_num,a.ad_title,a.ad_body,a.ad_image,a.ad_url,a.ad_sort,a.companyid,a.locationid,a.gameid,b.appid from ad a,company b where a.companyid=b.id and '{now}'>a.begin_time and '{now}'<a.end_time ;" get_data_sql = f"select a.id,a.name,a.ad_num,a.ad_title,a.ad_body,a.ad_image,a.ad_url,a.ad_sort,a.companyid,a.locationid,a.gameid,b.appid from ad a,company b where a.companyid=b.id and '{now}'>a.begin_time and '{now}'<a.end_time ;"
data = mydb.query(get_data_sql) data = mydb.query(get_data_sql)
if data: if data:
for line in data: for line in data:
if line: if line:
item = {} item = {}
try: try:
item['id'], item['name'], item['ad_num'], item['ad_title'], item['ad_body'], item['ad_image'], item[ item['id'], item['name'], item['ad_num'], item['ad_title'], item['ad_body'], item['ad_image'], item['ad_url'], \
'ad_url'], item['ad_sort'], item['companyid'], item['locationid'], item['gameid'], item[ item['ad_sort'], item['companyid'], item['locationid'], item['gameid'], item['appid'] = line
'appid'] = line all.append(item)
all.append(item) except Exception:
except Exception: log.error("split data failed", exc_info=True)
log.error("split data failed", exc_info=True) # 检查ID是否存在播放列表中以及播放次数是否完毕
# 检查ID是否存在播放列表中以及播放次数是否完毕 if all:
if all: # log.info(f"get data was {all}!\n")
# log.info(f"get data was {all}!\n") for line in all:
for line in all: key = f"ad::{line.get('gameid', 0)}::{line.get('locationid', 0)}"
key = f"ad::{line.get('gameid', 0)}::{line.get('locationid', 0)}"
if int(line['ad_num']) > 0: if int(line['ad_num']) > 0:
num = my_redis.get(f"ad::{line['id']}::num") num = my_redis.get(f"ad::{line['id']}::num")
if not num: if not num:
num = 0 num = 0
if int(line['ad_num']) > int(num): if int(line['ad_num']) > int(num):
my_redis.sadd(key, line['id']) my_redis.sadd(key, line['id'])
my_redis.expire(key, 120) my_redis.expire(key, 120)
n = int(line['ad_num']) - int(num) n = int(line['ad_num']) - int(num)
log.info(f"add {line['id']} to {key} ,num was {line['ad_num']},limit was {n}!") log.info(f"add {line['id']} to {key} ,num was {line['ad_num']},limit was {n}!")
else: else:
if my_redis.sismember(key, line['id']): if my_redis.sismember(key, line['id']):
my_redis.srem(key, line['id']) my_redis.srem(key, line['id'])
log.info(f"remove {line['id']} from {key}!") log.info(f"remove {line['id']} from {key}!")
elif int(line['ad_num']) == 0: elif int(line['ad_num']) == 0:
my_redis.sadd(key, line['id']) my_redis.sadd(key, line['id'])
my_redis.expire(key, 120) my_redis.expire(key, 120)
log.info(f"add {line['id']} to {key} ,num was unlimit !") log.info(f"add {line['id']} to {key} ,num was unlimit !")
else: else:
log.error(f"get ad_num from mysql failed! ad_num={line['ad_num']}") log.error(f"get ad_num from mysql failed! ad_num={line['ad_num']}")
if not my_redis.exists(f"ad::{line['id']}::info"): if not my_redis.exists(f"ad::{line['id']}::info"):
my_redis.hmset(f"ad::{line['id']}::info", line) my_redis.hmset(f"ad::{line['id']}::info", line)
my_redis.expire(f"ad::{line['id']}::info", 3600 * 24 * 7) my_redis.expire(f"ad::{line['id']}::info", 3600 * 24 * 7)
log.info(f"add ad::{line['id']}::info to redis!") log.info(f"add ad::{line['id']}::info to redis!")
# 删除过期的数据 # 删除过期的数据
log.info("remove expire data from cache!") log.info("remove expire data from cache!")
expire_sql = f"select id,gameid,locationid from ad where '{now}'>end_time" expire_sql = f"select id,gameid,locationid from ad where '{now}'>end_time"
remove_data = mydb.query(expire_sql) remove_data = mydb.query(expire_sql)
if remove_data: if remove_data:
for line in remove_data: for line in remove_data:
try: try:
id, gameid, locationid = line id, gameid, locationid = line
key = f"{gameid}:{locationid}" key = f"{gameid}:{locationid}"
if my_redis.sismember(key, id): if my_redis.sismember(key, id):
my_redis.srem(key, id) my_redis.srem(key, id)
log.info(f"remove {id} from {key} success!") log.info(f"remove {id} from {key} success!")
except Exception: except Exception:
log.error("拆解过期数据出错!", exc_info=True) log.error("拆解过期数据出错!", exc_info=True)
class DispatchHandler(tornado.web.RequestHandler): class DispatchHandler(tornado.web.RequestHandler):
def get(self): def get(self):
if self.get_query_argument('c') == 'Ops' and self.get_query_argument('a') == 'selfChecking': if self.get_query_argument('c') == 'Ops' and self.get_query_argument('a') == 'selfChecking':
self._selfCheckingHandler() self._selfCheckingHandler()
elif self.get_query_argument('c') == 'Ops' and self.get_query_argument('a') == 'getAdList': elif self.get_query_argument('c') == 'Ops' and self.get_query_argument('a') == 'getAdList':
self._selfGetAdList() self._selfGetAdList()
elif self.get_query_argument('c') == 'Ops' and self.get_query_argument('a') == 'upAdRecording': elif self.get_query_argument('c') == 'Ops' and self.get_query_argument('a') == 'upAdRecording':
self._upAdRecording() self._upAdRecording()
else: else:
self.write("pls check args!") self.write("pls check args!")
def _upAdRecording(self): def _upAdRecording(self):
try:
adid = self.get_query_argument('adid')
except Exception:
result = {'errcode': 2, "errmsg": f"get args failed`"}
log.error(result, exc_info=True)
self.write({'errcode': 1, "errmsg": 'get adid failed!'})
if adid:
key = f"ad::{adid}::num"
my_redis.incr(key, amount=1)
self.write({'errcode': 0, "errmsg": '', "message": f"{adid} incr success!"})
def _selfCheckingHandler(self):
self.write(json.dumps({
'errcode': 0, 'errmsg': '', 'healthy': 1, 'max_rundelay': 10
}, separators=(',', ':')))
def _selfGetAdList(self):
try:
input = json.loads(self.get_query_argument('body'))
gameid = input['gameid']
locationid = input['locationid']
except Exception as e:
result = {'errcode': 2, "errmsg": f"get args failed,{str(e)}"}
log.error(result)
self.write_error(2)
return False
if gameid and locationid:
key = f"ad::{gameid}::{locationid}"
ids = my_redis.smembers(key)
info = []
if not ids:
result = {'errcode': 0, "errmsg": '', "message": {"totoal": len(info), "result": info}}
else:
try: try:
adid = self.get_query_argument('adid') # 如果取得的记录条数大于预设,扔掉多余的记录,当前采用的是随机选择,以后可能需要添加加权选择
except Exception: id_list = []
result = {'errcode': 2, "errmsg": f"get args failed`"} if limit < len(ids):
log.error(result, exc_info=True) nums = limit
self.write({'errcode': 1, "errmsg": 'get adid failed!'}) else:
nums = len(ids)
if adid: for i in range(nums):
key = f"ad::{adid}::num" while 1:
my_redis.incr(key, amount=1) new = my_redis.srandmember(key)
self.write({'errcode': 0, "errmsg": '', "message": f"{adid} incr success!"}) if new not in id_list:
id_list.append(new)
break
def _selfCheckingHandler(self): for id in id_list:
self.write(json.dumps({ temp = my_redis.hgetall(f"ad::{id}::info")
'errcode': 0, 'errmsg': '', 'healthy': 1, 'max_rundelay': 10 info.append(temp)
}, separators=(',', ':'))) result = {'errcode': 0, "errmsg": '', "message": {"totoal": len(info), "result": info}}
def _selfGetAdList(self):
try:
input = json.loads(self.get_query_argument('body'))
gameid = input['gameid']
locationid = input['locationid']
except Exception as e: except Exception as e:
result = {'errcode': 2, "errmsg": f"get args failed,{str(e)}"} result = {'errcode': 1, "errmsg": e}
log.error(result) self.write(result)
self.write_error(2) else:
return False result = {'errcode': 2, "errmsg": f"get args failed!"}
if gameid and locationid: self.write(result)
key = f"ad::{gameid}::{locationid}"
ids = my_redis.smembers(key)
info = []
if not ids:
result = {'errcode': 0, "errmsg": '', "message": {"totoal": len(info), "result": info}}
else:
try:
# 如果取得的记录条数大于预设,扔掉多余的记录,当前采用的是随机选择,以后可能需要添加加权选择
id_list = []
if limit < len(ids):
nums = limit
else:
nums = len(ids)
for i in range(nums):
while 1:
new = my_redis.srandmember(key)
if new not in id_list:
id_list.append(new)
break
for id in id_list:
temp = my_redis.hgetall(f"ad::{id}::info")
info.append(temp)
result = {'errcode': 0, "errmsg": '', "message": {"totoal": len(info), "result": info}}
except Exception as e:
result = {'errcode': 1, "errmsg": e}
self.write(result)
else:
result = {'errcode': 2, "errmsg": f"get args failed!"}
self.write(result)
def make_app(): def make_app():
return tornado.web.Application([(r"/webapp/index[\.]php", DispatchHandler)]) return tornado.web.Application([(r"/webapp/index[\.]php", DispatchHandler)])
if __name__ == "__main__": if __name__ == "__main__":
print('start!') print('start!')
send_cache_data() send_cache_data()
app = make_app() app = make_app()
app.listen(ad_list_interface_port) app.listen(ad_list_interface_port)
tornado.ioloop.PeriodicCallback(send_cache_data, 60000).start() tornado.ioloop.PeriodicCallback(send_cache_data, 60000).start()
tornado.ioloop.IOLoop.current().start() tornado.ioloop.IOLoop.current().start()

View File

@ -14,32 +14,32 @@ logging.basicConfig(level=logging.INFO, stream=sys.stdout)
class Cos_sdk(Resource): class Cos_sdk(Resource):
def __init__(self): def __init__(self):
secret_id = os.environ.get('cos_id') secret_id = os.environ.get('cos_id')
secret_key = os.environ.get('cos_key') secret_key = os.environ.get('cos_key')
region = 'ap-beijing' # 替换为用户的 Region region = 'ap-beijing' # 替换为用户的 Region
token = None # 使用临时密钥需要传入 Token默认为空可不填 token = None # 使用临时密钥需要传入 Token默认为空可不填
scheme = 'https' # 指定使用 http/https 协议来访问 COS默认为 https可不填 scheme = 'https' # 指定使用 http/https 协议来访问 COS默认为 https可不填
config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token, Scheme=scheme) config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token, Scheme=scheme)
# 2. 获取客户端对象 # 2. 获取客户端对象
self.client = CosS3Client(config) self.client = CosS3Client(config)
self.bucket = "client-1256832210" self.bucket = "client-1256832210"
self.base_dirs = "/ad" self.base_dirs = "/ad"
self.url = "https://resource.kingsome.cn" self.url = "https://resource.kingsome.cn"
def post(self): def post(self):
try: try:
import uuid import uuid
filename = "{}_{}".format(uuid.uuid1(), request.files.get('image-file').filename) filename = "{}_{}".format(uuid.uuid1(), request.files.get('image-file').filename)
body = request.files.get('image-file').read() body = request.files.get('image-file').read()
except Exception as e: except Exception as e:
error = f"get filename={filename},body={body} , 'error' = {e}" error = f"get filename={filename},body={body} , 'error' = {e}"
return jsonify({'code': 500, 'message': error}) return jsonify({'code': 500, 'message': error})
cos_filename = f"{self.base_dirs}/{filename}" cos_filename = f"{self.base_dirs}/{filename}"
url = f"{self.url}/{cos_filename}" url = f"{self.url}/{cos_filename}"
response = self.client.put_object(Bucket=self.bucket, Body=body, Key=cos_filename, StorageClass='STANDARD', response = self.client.put_object(Bucket=self.bucket, Body=body, Key=cos_filename, StorageClass='STANDARD',
EnableMD5=False) EnableMD5=False)
print(response) print(response)
return jsonify({'code': 200, 'message': {'url': url, 'response': response}}) return jsonify({'code': 200, 'message': {'url': url, 'response': response}})