# -*- coding: utf-8 -*- # 推广系统对外接口,提供与客户端之间的广告信息接口及每分钟执行一次的缓存变更操作 # http://ad.kingsome.cn/webapp/index.php?c=Ops&a=getAdList&body={"gameid":1004,"locationid":1001} # http://ad.kingsome.cn/webapp/index.php?c=Ops&a=upAdRecording&adid=1002 import tornado.ioloop import tornado.web import json from myredis.myredis import my_redis import datetime from mysql.mmysql import MysqlBase from config import mysql_promotion_config from log.mylog import define_logger import logging from config import BEGIN, END, ad_list_interface_port from tornado import gen import pdb define_logger("/data/logs/ad_interface_tornado.log") log = logging.getLogger(__name__) limit = 10 def send_cache_data(): mydb = MysqlBase(**mysql_promotion_config) log.info("start update cache !") now = datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S") all = [] # # 添加无天数限定的记录 # get_full_data = f"""SELECT # a.id, # a.name, # a.ad_num, # a.ad_title, # a.ad_body, # a.ad_image, # a.jump_param, # a.ad_sort, # a.companyid, # a.gameid, # a.channelid, # a.jump_status, # b.area, # b.type, # b.mode, # a.locationid # FROM # (SELECT # id, # name, # ad_num, # ad_title, # ad_body, # ad_image, # jump_param, # ad_sort, # companyid, # gameid, # channelid, # jump_status, # locationid # FROM # ad # WHERE # status=1 # AND in_used=1 AND (begin_time='{BEGIN}' OR end_time='{END}')) a , # (SELECT # id, # area, # type, # mode # FROM # location) b # WHERE # a.locationid=b.id""" # full_data = mydb.query(get_full_data) # # if full_data: # for line in full_data: # if line: # log.info(f"line was {line}") # item = {} # try: # item['id'], item['name'], item['ad_num'], item['ad_title'], item['ad_body'], item['ad_image'], item[ # 'jump_param'], item['ad_sort'], item['companyid'], item['gameid'], item['channelid'], item[ # 'jump_status'], item['area'], item['type'], item['mode'], item['locationid'] = line # all.append(item) # except Exception: # log.error("split data failed", exc_info=True) # 添加有天数限定的记录 get_data_sql = f"""select a.id, a.name, a.ad_num, a.ad_title, a.ad_body, a.ad_image, a.jump_param, a.ad_sort, a.companyid, a.gameid, a.channelid, a.jump_status, b.area, b.type, b.mode, a.locationid from (select id, name, ad_num, ad_title, ad_body, ad_image, jump_param, ad_sort, companyid, gameid, channelid, jump_status, locationid from ad where status=1 and in_used=1 and '{now}'> begin_time and '{now}' 0: num = my_redis.get(f"ad::{line['id']}::num") if not num: num = 0 if int(line['ad_num']) > int(num): my_redis.sadd(key, line['id']) my_redis.expire(key, 120) n = int(line['ad_num']) - int(num) log.info(f"add {line['id']} to {key} ,num was {line['ad_num']},limit was {n}!") else: if my_redis.sismember(key, line['id']): my_redis.srem(key, line['id']) log.info(f"remove {line['id']} from {key}!") elif int(line['ad_num']) == 0: my_redis.sadd(key, line['id']) my_redis.expire(key, 120) log.info(f"add {line['id']} to {key} ,num was unlimit !") else: log.error(f"get ad_num from mysql failed! ad_num={line['ad_num']}") if not my_redis.exists(f"ad::{line['id']}::info"): # remove some filed from adinfo remove_list = ('ad_num', 'gameid') for item in remove_list: line.pop(item) my_redis.hmset(f"ad::{line['id']}::info", line) my_redis.expire(f"ad::{line['id']}::info", 120) # my_redis.expire(f"ad::{line['id']}::info", 3600 * 24 * 7) log.info(f"add ad::{line['id']}::info to redis!") # 删除过期的数据 log.info("remove expire data from cache!") expire_sql = f"""select a.id,a.gameid,a.channelid,a.locationid,a.status,b.area from (select id,gameid,channelid,locationid,status from ad where '{now}'> end_time or status in (3,4) ) a , (select id, area from location) b where a.locationid=b.id""" remove_data = mydb.query(expire_sql) if remove_data: for line in remove_data: try: id, gameid, channelid, locationid, _, area = line key = f"ad::{gameid}::{channelid}::{area.split(',')[0].strip()}::{locationid}" # key = f"{gameid}:{locationid}" if my_redis.sismember(key, id): my_redis.srem(key, id) my_redis.expire(f"ad::{id}::info", 1) log.info(f"remove {id} from {key} success!") except Exception: log.error("拆解过期数据出错!", exc_info=True) class DispatchHandler(tornado.web.RequestHandler): @gen.coroutine def get(self): if self.get_query_argument('c') == 'Ops' and self.get_query_argument('a') == 'selfChecking': self._selfCheckingHandler() elif self.get_query_argument('c') == 'Ops' and self.get_query_argument('a') == 'getAdList': yield self._selfGetAdList() elif self.get_query_argument('c') == 'Ops' and self.get_query_argument('a') == 'upAdRecording': yield self._upAdRecording() else: self.write("pls check args!") def _upAdRecording(self): try: adid = self.get_query_argument('adid') except Exception: result = {'errcode': 2, "errmsg": f"get args failed`"} log.error(result, exc_info=True) self.write({'errcode': 1, "errmsg": 'get adid failed!'}) if adid: key = f"ad::{adid}::num" my_redis.incr(key, amount=1) self.write({'errcode': 0, "errmsg": '', "message": f"{adid} incr success!"}) def _selfCheckingHandler(self): self.write(json.dumps({ 'errcode': 0, 'errmsg': '', 'healthy': 1, 'max_rundelay': 10 }, separators=(',', ':'))) def _selfGetAdList(self): try: input = json.loads(self.get_query_argument('body')) gameid = input['gameid'] channelid=input.get('channelid',None) or 6001 area=input.get('area',0) locationid = input.get('locationid',0) except Exception as e: result = {'errcode': 2, "errmsg": f"get args failed,{str(e)}"} log.error(result) return self.write_error(2) if gameid and locationid: key_word = f"ad::{gameid}::{channelid}::*::{locationid}" elif gameid and area: key_word = f"ad::{gameid}::{channelid}::{area}*" else: pass ad_keys = my_redis.keys(key_word) ids = [] for ad_key in ad_keys: adlists = my_redis.smembers(ad_key) try: for key in adlists: ids.append(key) except Exception: log.error(f"get redis data failed!", exc_info=True) return self.write({'errcode': 2, "errmsg": f"get redis data failed!"}) dist_ids = list(set(ids)) info = [] if not dist_ids: result = {'errcode': 0, "errmsg": '', "message": {"totoal": len(info), "result": info}} else: try: # 如果取得的记录条数大于预设,扔掉多余的记录,当前采用的是随机选择,以后可能需要添加加权选择 id_list = [] if limit < len(dist_ids): nums = limit else: nums = len(dist_ids) for i in range(nums): while 1: # new = my_redis.srandmember(key) import random new = random.choice(dist_ids) if new not in id_list: id_list.append(new) break for id in id_list: temp = my_redis.hgetall(f"ad::{id}::info") info.append(temp) result = {'errcode': 0, "errmsg": '', "message": {"totoal": len(info), "result": info}} except Exception as e: result = {'errcode': 1, "errmsg": e} return self.write(result) def make_app(): return tornado.web.Application([(r"/webapp/index[\.]php", DispatchHandler)]) if __name__ == "__main__": print('start!') send_cache_data() app = make_app() app.listen(ad_list_interface_port) tornado.ioloop.PeriodicCallback(send_cache_data, 60000).start() tornado.ioloop.IOLoop.current().start()