#!/usr/bin/env python # -*- coding:utf-8 -*- import pymongo import re from bson.json_util import loads # load MongoDB Extended JSON data class MongodbBase: def __init__(self, **args): self.host = args.get('host') try: self.port = int(args.get('port')) except Exception as e: raise Exception("get args about port failed,{}".format(e)) self.dbname = args.get('dbname') self.collname = args.get('collname', self.dbname) try: # 建立mongodb连接 选择对应的集合 self.client = pymongo.MongoClient(self.host, self.port) self.db = self.client[self.dbname] self.coll = self.db[self.collname] except Exception as e: raise ValueError( 'mongodb connect {}{}{}{} error,out was {1}'.format(self.host, self.port, self.db, self.coll, e)) def __del__(self): self.client.close() def select(self, myquery): try: data = self.coll.find(myquery={}) return data except Exception as e: print("get data error ,{}".format(e)) return None # 查询 def query(self, qc=list()): try: if isinstance(qc, list): rel = self.coll.aggregate(qc) return rel else: raise ValueError("arg type is not list!") except Exception as e: return e # 更新 def update(self, match_data, update_data): try: rel = self.coll.update_many(match_data, {"$set": update_data}) return "匹配到{0}条数据,修改{1}条数据".format(rel.matched_count, rel.modified_count) except Exception as e: return e # 插入 def insert(self, inert_data): try: if isinstance(inert_data, list): rel = self.coll.insert_many(inert_data) return rel.inserted_ids else: raise ValueError("arg type is not list!") except Exception as e: return e # 导入json数据 def insert_file(self, file): try: ids = self.coll.distinct('_id') f = open(file, 'r', encoding='utf-8') data = list() tmp_data = list() for i in f: d = loads(i) if d['_id'] in ids: tmp_data.append(d['_id']) else: data.append(d) if tmp_data: return "数据中包含已存在ObjectId" else: rel = self.coll.insert_many(data) return rel.acknowledged except Exception as e: return e # 删除 def delete(self, del_data): try: rel = self.coll.delete_many(del_data) return "{0}个条目已删除".format(rel.deleted_count) except Exception as e: return e if __name__ == "__main__": args = dict() args['host'] = '127.0.0.1' # args['host'] = '10.10.5.4' args['port'] = 27017 # args['dbname'] = 'dalmatian-production' args['dbname'] = 'test' # args['collname'] = 'redemption_record' match_data = {'ppp': '99999999'} update_data = {'ppp': '7777777'} # 查询条件 qc = [{"$addFields": {"ouid": {"$toObjectId": "$uid"}}}, {'$lookup': {'from': 'users', 'localField': 'ouid', 'foreignField': '_id', 'as': 'userinfo'}}] m = MongodbBase(**args) # r = m.update(match_data, update_data) r = m.query() ii = m.insert_file(file='random_users.json') print(ii) # for i in r: # print(i)