From 221cf8032d4cdfcabc34670158a382957c08b814 Mon Sep 17 00:00:00 2001 From: pengtao Date: Wed, 27 Nov 2019 11:28:54 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0tap=202=20influxdb=E8=84=9A?= =?UTF-8?q?=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- taptap/tap2influxdb.py | 77 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) create mode 100644 taptap/tap2influxdb.py diff --git a/taptap/tap2influxdb.py b/taptap/tap2influxdb.py new file mode 100644 index 0000000..e6cfa10 --- /dev/null +++ b/taptap/tap2influxdb.py @@ -0,0 +1,77 @@ +# -*- coding: utf-8 -*- + +import pdb +from influxdb import InfluxDBClient, exceptions +from ops.mtga import FromTga +from ops.plog import define_logger +import logging +import datetime +import sys + +define_logger("/data/logs/ops/tap2influxdb.log") +log = logging.getLogger(__name__) + + +class TAP2Influx: + def __init__(self, day): + url = "http://10.10.3.17:8992/querySql" + api_secret = "n9H4R32ZcjtSeN89ljCY6ESzTmOlnwwnOB3r4YsggnP5M1AXLtKtiS4sS1KKLOEQ" + self.tga = FromTga(url=url, token=api_secret) + self.day = day + self.influx = InfluxDBClient('10.10.3.19', '8086', 'miles', 'aspect', 'tap_data') + self.measurement = "tapdata" + + def get_tga_data(self): + sql = f"""SELECT + gameid, + catename, + cate, + title, + score, + tags, + reserve, + watch, + download, + sell, + review, + topic, + "#event_time" + FROM + v_event_25 + WHERE + "$part_date"='{self.day}' """ + + body = list() + data = self.tga.get_data(sql) + if data: + try: + for line in data: + gameid, catename, cate, title, score, tags, reserve, watch, download, sell, review, topic, createdat = line + tags = {"gameid": gameid, "catename": catename, "cate": cate, "catedat": createdat} + fields = {"title": title, "score": score, "tags": tags, "reserve": reserve, "watch": watch, + "download": download, "sell": sell, "review": review, "topic": topic} + temp = {"measurement": self.measurement, "tags": tags, "fields": fields} + body.append(temp) + except Exception: + log.error(f"split data failed ,{line}", exc_info=True) + try: + self.influx.write(body) + except Exception: + log.error(f"write 2 influx failed,data={body}", exc_info=True) + + + def run(self): + self.get_tga_data() + + +if __name__ == "__main__": + if len(sys.argv) == 2: + try: + d = datetime.datetime.strptime(sys.argv[1], "%Y-%m-%d") + except Exception: + log.error(f"split args with start time failed ,args={d}", exc_info=True) + raise Exception(f"split args with start time failed ,args={d}") + else: + d = datetime.datetime.now() - datetime.timedelta(days=1) + tt = TAP2Influx(d) + tt.run()