datacollect/taptap/tap2influxdb.py
2019-11-27 11:42:59 +08:00

79 lines
2.5 KiB
Python

# -*- coding: utf-8 -*-
import pdb
from influxdb import InfluxDBClient, exceptions
from ops.mtga import FromTga
from ops.plog import define_logger
import logging
import datetime
import sys
define_logger("/data/logs/ops/tap2influxdb.log")
log = logging.getLogger(__name__)
class TAP2Influx:
def __init__(self, day):
url = "http://10.10.3.17:8992/querySql"
api_secret = "n9H4R32ZcjtSeN89ljCY6ESzTmOlnwwnOB3r4YsggnP5M1AXLtKtiS4sS1KKLOEQ"
self.tga = FromTga(url=url, token=api_secret)
self.day = day
self.influx = InfluxDBClient('10.10.3.19', '8086', 'miles', 'aspect', 'tap_data')
self.measurement = "tapdata"
def get_tga_data(self):
sql = f"""SELECT
gameid,
catename,
cate,
title,
score,
tags,
reserve,
watch,
download,
sell,
review,
topic,
"#event_time"
FROM
v_event_25
WHERE
"$part_date"='{self.day}' """
body = list()
data = self.tga.get_data(sql)
pdb.set_trace()
if data:
try:
for line in data:
gameid, catename, cate, title, score, tag, reserve, watch, download, sell, review, topic, createdat = line
tags = {"gameid": str(gameid), "catename": catename, "cate": cate, "catedat": createdat}
fields = {"title": title, "score": score, "tag": tag, "reserve": reserve, "watch": watch,
"download": download, "sell": sell, "review": review, "topic": topic}
temp = {"measurement": self.measurement, "tags": tags, "fields": fields}
body.append(temp)
except Exception:
log.error(f"split data failed ,{line}", exc_info=True)
try:
self.influx.write_points(body)
except Exception:
log.error(f"write 2 influx failed,data={body}", exc_info=True)
def run(self):
self.get_tga_data()
if __name__ == "__main__":
if len(sys.argv) == 2:
try:
d = datetime.datetime.strptime(sys.argv[1], "%Y-%m-%d")
except Exception:
log.error(f"split args with start time failed ,args={d}", exc_info=True)
raise Exception(f"split args with start time failed ,args={d}")
else:
d = (datetime.date.today() - datetime.timedelta(days=1)).strftime('%Y-%m-%d')
tt = TAP2Influx(d)
tt.run()