119 lines
3.6 KiB
Python
119 lines
3.6 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding:utf-8 -*-
|
|
|
|
import pymongo
|
|
import re
|
|
from bson.json_util import loads # load MongoDB Extended JSON data
|
|
|
|
|
|
class MongodbBase:
|
|
|
|
def __init__(self, **args):
|
|
self.host = args.get('host')
|
|
try:
|
|
self.port = int(args.get('port'))
|
|
except Exception as e:
|
|
raise Exception("get args about port failed,{}".format(e))
|
|
self.dbname = args.get('dbname')
|
|
self.collname = args.get('collname', self.dbname)
|
|
|
|
try:
|
|
# 建立mongodb连接 选择对应的集合
|
|
self.client = pymongo.MongoClient(self.host, self.port)
|
|
self.db = self.client[self.dbname]
|
|
self.coll = self.db[self.collname]
|
|
except Exception as e:
|
|
raise ValueError(
|
|
'mongodb connect {}{}{}{} error,out was {1}'.format(self.host, self.port, self.db, self.coll, e))
|
|
|
|
def __del__(self):
|
|
self.client.close()
|
|
|
|
def select(self, myquery):
|
|
try:
|
|
data = self.coll.find(myquery={})
|
|
return data
|
|
except Exception as e:
|
|
print("get data error ,{}".format(e))
|
|
return None
|
|
|
|
# 查询
|
|
def query(self, qc=list()):
|
|
try:
|
|
if isinstance(qc, list):
|
|
rel = self.coll.aggregate(qc)
|
|
return rel
|
|
else:
|
|
raise ValueError("arg type is not list!")
|
|
except Exception as e:
|
|
return e
|
|
|
|
# 更新
|
|
def update(self, match_data, update_data):
|
|
try:
|
|
rel = self.coll.update_many(match_data, {"$set": update_data})
|
|
return "匹配到{0}条数据,修改{1}条数据".format(rel.matched_count, rel.modified_count)
|
|
except Exception as e:
|
|
return e
|
|
|
|
# 插入
|
|
def insert(self, inert_data):
|
|
try:
|
|
if isinstance(inert_data, list):
|
|
rel = self.coll.insert_many(inert_data)
|
|
return rel.inserted_ids
|
|
else:
|
|
raise ValueError("arg type is not list!")
|
|
except Exception as e:
|
|
return e
|
|
|
|
# 导入json数据
|
|
def insert_file(self, file):
|
|
try:
|
|
ids = self.coll.distinct('_id')
|
|
f = open(file, 'r', encoding='utf-8')
|
|
data = list()
|
|
tmp_data = list()
|
|
for i in f:
|
|
d = loads(i)
|
|
if d['_id'] in ids:
|
|
tmp_data.append(d['_id'])
|
|
else:
|
|
data.append(d)
|
|
if tmp_data:
|
|
return "数据中包含已存在ObjectId"
|
|
else:
|
|
rel = self.coll.insert_many(data)
|
|
return rel.acknowledged
|
|
except Exception as e:
|
|
return e
|
|
|
|
# 删除
|
|
def delete(self, del_data):
|
|
try:
|
|
rel = self.coll.delete_many(del_data)
|
|
return "{0}个条目已删除".format(rel.deleted_count)
|
|
except Exception as e:
|
|
return e
|
|
|
|
|
|
if __name__ == "__main__":
|
|
args = dict()
|
|
args['host'] = '127.0.0.1'
|
|
# args['host'] = '10.10.5.4'
|
|
args['port'] = 27017
|
|
# args['dbname'] = 'dalmatian-production'
|
|
args['dbname'] = 'test'
|
|
# args['collname'] = 'redemption_record'
|
|
match_data = {'ppp': '99999999'}
|
|
update_data = {'ppp': '7777777'}
|
|
|
|
# 查询条件
|
|
qc = [{"$addFields": {"ouid": {"$toObjectId": "$uid"}}},
|
|
{'$lookup': {'from': 'users', 'localField': 'ouid', 'foreignField': '_id', 'as': 'userinfo'}}]
|
|
m = MongodbBase(**args)
|
|
# r = m.update(match_data, update_data)
|
|
r = m.query()
|
|
ii = m.insert_file(file='random_users.json')
|
|
print(ii) # for i in r: # print(i)
|