# -*- coding: utf-8 -*- import pdb from influxdb import InfluxDBClient, exceptions from ops.mtga import FromTga from ops.plog import define_logger import logging import datetime import sys define_logger("/data/logs/ops/tap2influxdb.log") log = logging.getLogger(__name__) class TAP2Influx: def __init__(self, day): url = "http://10.10.3.17:8992/querySql" api_secret = "n9H4R32ZcjtSeN89ljCY6ESzTmOlnwwnOB3r4YsggnP5M1AXLtKtiS4sS1KKLOEQ" self.tga = FromTga(url=url, token=api_secret) self.day = day self.influx = InfluxDBClient('10.10.3.19', '8086', 'miles', 'aspect', 'tap_data') self.measurement = "tapdata" def get_tga_data(self): sql = f"""SELECT gameid, catename, cate, title, score, tags, reserve, watch, download, sell, review, topic, "#event_time" FROM v_event_25 WHERE "$part_date"='{self.day}' """ body = list() data = self.tga.get_data(sql) pdb.set_trace() if data: try: for line in data: gameid, catename, cate, title, score, tags, reserve, watch, download, sell, review, topic, createdat = line tags = {"gameid": gameid, "catename": catename, "cate": cate, "catedat": createdat} fields = {"title": title, "score": score, "tags": tags, "reserve": reserve, "watch": watch, "download": download, "sell": sell, "review": review, "topic": topic} temp = {"measurement": self.measurement, "tags": tags, "fields": fields} body.append(temp) except Exception: log.error(f"split data failed ,{line}", exc_info=True) try: self.influx.write(body) except Exception: log.error(f"write 2 influx failed,data={body}", exc_info=True) def run(self): self.get_tga_data() if __name__ == "__main__": if len(sys.argv) == 2: try: d = datetime.datetime.strptime(sys.argv[1], "%Y-%m-%d") except Exception: log.error(f"split args with start time failed ,args={d}", exc_info=True) raise Exception(f"split args with start time failed ,args={d}") else: d = (datetime.date.today() - datetime.timedelta(days=1)).strftime('%Y-%m-%d') tt = TAP2Influx(d) tt.run()