添加tap 2 influxdb脚本
This commit is contained in:
parent
ab340f4574
commit
221cf8032d
77
taptap/tap2influxdb.py
Normal file
77
taptap/tap2influxdb.py
Normal file
@ -0,0 +1,77 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import pdb
|
||||
from influxdb import InfluxDBClient, exceptions
|
||||
from ops.mtga import FromTga
|
||||
from ops.plog import define_logger
|
||||
import logging
|
||||
import datetime
|
||||
import sys
|
||||
|
||||
define_logger("/data/logs/ops/tap2influxdb.log")
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TAP2Influx:
|
||||
def __init__(self, day):
|
||||
url = "http://10.10.3.17:8992/querySql"
|
||||
api_secret = "n9H4R32ZcjtSeN89ljCY6ESzTmOlnwwnOB3r4YsggnP5M1AXLtKtiS4sS1KKLOEQ"
|
||||
self.tga = FromTga(url=url, token=api_secret)
|
||||
self.day = day
|
||||
self.influx = InfluxDBClient('10.10.3.19', '8086', 'miles', 'aspect', 'tap_data')
|
||||
self.measurement = "tapdata"
|
||||
|
||||
def get_tga_data(self):
|
||||
sql = f"""SELECT
|
||||
gameid,
|
||||
catename,
|
||||
cate,
|
||||
title,
|
||||
score,
|
||||
tags,
|
||||
reserve,
|
||||
watch,
|
||||
download,
|
||||
sell,
|
||||
review,
|
||||
topic,
|
||||
"#event_time"
|
||||
FROM
|
||||
v_event_25
|
||||
WHERE
|
||||
"$part_date"='{self.day}' """
|
||||
|
||||
body = list()
|
||||
data = self.tga.get_data(sql)
|
||||
if data:
|
||||
try:
|
||||
for line in data:
|
||||
gameid, catename, cate, title, score, tags, reserve, watch, download, sell, review, topic, createdat = line
|
||||
tags = {"gameid": gameid, "catename": catename, "cate": cate, "catedat": createdat}
|
||||
fields = {"title": title, "score": score, "tags": tags, "reserve": reserve, "watch": watch,
|
||||
"download": download, "sell": sell, "review": review, "topic": topic}
|
||||
temp = {"measurement": self.measurement, "tags": tags, "fields": fields}
|
||||
body.append(temp)
|
||||
except Exception:
|
||||
log.error(f"split data failed ,{line}", exc_info=True)
|
||||
try:
|
||||
self.influx.write(body)
|
||||
except Exception:
|
||||
log.error(f"write 2 influx failed,data={body}", exc_info=True)
|
||||
|
||||
|
||||
def run(self):
|
||||
self.get_tga_data()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) == 2:
|
||||
try:
|
||||
d = datetime.datetime.strptime(sys.argv[1], "%Y-%m-%d")
|
||||
except Exception:
|
||||
log.error(f"split args with start time failed ,args={d}", exc_info=True)
|
||||
raise Exception(f"split args with start time failed ,args={d}")
|
||||
else:
|
||||
d = datetime.datetime.now() - datetime.timedelta(days=1)
|
||||
tt = TAP2Influx(d)
|
||||
tt.run()
|
Loading…
x
Reference in New Issue
Block a user