添加微信和百度审核接口

This commit is contained in:
zhl 2021-06-17 12:01:51 +08:00
parent b212e6f968
commit 9bc177d2a5
9 changed files with 199 additions and 10 deletions

0
lib/__init__.py Normal file
View File

0
lib/service/__init__.py Normal file
View File

76
lib/service/baidu.py Normal file
View File

@ -0,0 +1,76 @@
# encoding:utf-8
import asyncio
import urllib.parse
import urllib.request
from urllib.request import Request, urlopen
from random import random
from urllib.error import URLError
import json
cates = ['暴恐违禁', '文本色情', '政治敏感', '恶意推广', '低俗辱骂', '低质灌水']
def random_int(start, end):
return int(random() * (end - start) + start)
def get_ip():
return '%d.%d.%d.%d' % (random_int(1, 254), random_int(1, 254), random_int(1, 254), random_int(1, 254))
async def check_baidu(txt):
url = 'https://ai.baidu.com/aidemo'
num1 = int(random() * 10 + 70)
num2 = int(random() * 100)
user_agent = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_0_1) AppleWebKit/537.36 (KHTML, like Gecko) ' \
'Chrome/%d.0.4324.%d Safari/537.36' % (num1, num2)
headers = {'User-Agent': user_agent,
'Content-Type': 'application/x-www-form-urlencoded',
'referer': 'https://ai.baidu.com/tech/textcensoring',
'X-Forwarded-For': get_ip()
}
values = {
'content': txt,
'type': 'textcensor',
'apiType': 'censor'
}
data = urllib.parse.urlencode(values)
data = data.encode('ascii')
req = Request(url, data, headers)
try:
response = urlopen(req)
except URLError as e:
if hasattr(e, 'reason'):
print('We failed to reach a server.')
print('Reason: ', e.reason)
return {'errcode': 100, 'errmsg': e.reason}
elif hasattr(e, 'code'):
print('The server couldn\'t fulfill the request.')
print('Error code: ', e.code)
return {'errcode': 100, 'errmsg': e.code}
else:
res = json.loads(response.read().decode('utf-8'))
print(res)
if res['errno'] != 0:
return {'errcode': 100, 'errmsg': res['msg']}
result_data = res['data']['result']
reasons = []
risk = 0
if 'review' in result_data and len(result_data['review']) > 0:
risk = 1
review_data = result_data['review']
for d in review_data:
reasons.append((cates[d['label'] - 1], d['score']))
if 'reject' in result_data and len(result_data['reject']) > 0:
risk = 1
reject_data = result_data['reject']
for d in reject_data:
reasons.append((cates[d['label'] - 1], d['score']))
return {'errcode': 0, 'risk': risk, 'reason': reasons}
async def main():
await check_baidu('私人侦探修宪')
# asyncio.run(main())

10
lib/service/local.py Normal file
View File

@ -0,0 +1,10 @@
from lib.text_filter import TextFilter
def check_local(txt):
t = TextFilter()
result = t.is_contain(txt)
risk = 0
if len(result) > 0:
risk = 1
return {'errcode': 0, 'risk': risk, 'reason': result}

60
lib/service/wechat.py Normal file
View File

@ -0,0 +1,60 @@
# encoding:utf-8
import asyncio
import json
from urllib.error import URLError
from urllib.request import Request, urlopen
import requests
from lib.service.wechat_token import WechatToken
APPID = 'wxf8c3da4e7dfe00a2'
APP_SECRET = '8c0a1e88a6b43e4be80ed6a597c0b047'
async def refreshToken(appid, app_secret, refresh=False):
util = WechatToken()
token = util.get_token(appid)
if token is not None and not refresh:
return token
url = 'https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid=%s&secret=%s' % (
appid, app_secret)
try:
response = urlopen(url)
except URLError as e:
if hasattr(e, 'reason'):
print('We failed to reach a server.')
print('Reason: ', e.reason)
elif hasattr(e, 'code'):
print('The server couldn\'t fulfill the request.')
print('Error code: ', e.code)
return None
else:
res = json.loads(response.read().decode('utf-8'))
util.update_token(appid, res['access_token'], res['expires_in'])
print(res)
return res['access_token']
async def msg_sec_check(content):
token = await refreshToken(APPID, APP_SECRET)
url = 'https://api.weixin.qq.com/wxa/msg_sec_check?access_token=%s' % token
data = {
'content': content.encode("utf-8").decode("latin1")
}
headers = {'content-type': 'application/json'}
r = requests.post(url, data=json.dumps(data, ensure_ascii=False), headers=headers)
if r.status_code != 200:
return {'errcode': 100, 'errmsg': r.status_code}
rep_data = r.json()
print(rep_data)
if rep_data['errcode'] == 87014:
result = 1
else:
result = 0
return {'errcode': 0, 'risk': result, 'reason': rep_data['errmsg']}
async def main():
res = await msg_sec_check('特3456书yuuo莞6543李zxcz蒜7782法fgnv级')
print(res)

View File

@ -0,0 +1,28 @@
# encoding:utf-8
import time
class WechatToken(object):
# Singleton
_instance = None
_cache = {}
def __new__(cls, *args, **kw):
"""单例模式"""
if not cls._instance:
cls._instance = super(WechatToken, cls).__new__(cls, *args, **kw)
return cls._instance
def get_token(self, appid):
data = self._cache.get(appid)
if data is None:
return None
now = round(time.time())
if data[1] - now < 1200:
return None
return data[0]
def update_token(self, appid, token, exptime):
now = round(time.time()) + exptime
data = (token, now)
self._cache[appid] = data

View File

@ -26,7 +26,7 @@
import re
from collections import Counter
from sensitive_word import SensitiveWords
from lib.sensitive_word import SensitiveWords
class Node(object):

31
main.py
View File

@ -1,12 +1,27 @@
# encoding:utf-8
import flask
from fastapi import FastAPI
from pydantic import BaseModel
import uvicorn
app = flask.Flask(__name__)
from lib.service.baidu import check_baidu
from lib.service.local import check_local
from lib.service.wechat import msg_sec_check
if __name__ == '__main__':
app.run(
host='0.0.0.0',
port=8081,
debug=True
)
app = FastAPI()
class Item(BaseModel):
content: str
@app.post("/check")
async def check_content(item: Item):
# res_wechat = await msg_sec_check(item.content)
# res_baidu = await check_baidu(item.content)
res_local = check_local(item.content)
return res_local
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8009)

View File

@ -1 +1 @@
flask
fastapi~=0.65.2