- 安装:
pip install mongoengine
- 连接mongodb数据库
当连接数据库时,需要使用connect()函数,下面将介绍不同情况下的连接参数。
1. 默认情况下,指mongod运行在localhost且端口为27017
from mongoengine import connect
# 连接到User数据库
connect('user')
2. mongod运行在其它服务器且指定端口时,默认端口为27017时可以不写
from mongoengine import connect
# 连接到User数据库
connect('user', host='192.168.124.104', port=27017)
3. 需要用户登录验证时
from mongoengine import connect
# 连接到User数据库
connect('user', host='192.168.124.49', port=27017, username='root', password='88888888', authentication_source='admin')
4. 使用URL连接方式
from mongoengine import connect
# 连接到User数据库
connect('user', host='mongodb://root:88888888@192.168.124.49:27017/?authSource=admin')
注意:如果URL中包含数据库名称、用户名和密码时,将会覆盖响应参数的传值。下面代码中,将会连接test-movie数据库,用户名为root,密码为88888888。
from mongoengine import connect
# 连接到test-movie数据库
connect('user', username='test', password='123456', host='mongodb://root:88888888@192.168.124.49:27017/test-movie?authSource=admin')
5. 连接副本集
from mongoengine import connect
# 连接到dbname数据库
# 方式一
connect('dbname', replicaset='re-name')
# 方式二
connect(host='mongodb://root:88888888@192.168.124.49:27017/dbname?authSource=admin&replicaSet=rs-name')
6. 多个数据库进行连接
可以使用传参的形式,并可以设置别名参数”alias“,也可以使用URL方式。
二、MongoEngine模型介绍 2.1、ODM模型介绍from mongoengine import Document, StringField
class User(Document):
email = StringField(required=True)
first_name = StringField(max_length=50)
last_name = StringField(max_length=50)
2.2、常见数据类型
- db_field:文档中的field/域/列名称
- required:是否为必填项
- default:默认值
- unique:唯一性约束
- choices:可选择的列表
- primary_key:是否为文档的主键,默认为False
-
类属性,其配置项为python的dict(字典)
from mongoengine import Document, StringField
class User(Document):
email = StringField(required=True)
first_name = StringField(max_length=50)
last_name = StringField(max_length=50)
meta = {
}
常见的配置项:
- db_alias:指定文档所在的数据库(逻辑库)
- collection:指定文档所在的集合
- ordering:指定文档的默认排序规则
- indexes:指定文档的索引规则
学生信息数据字典
文档的嵌套场景
情况一,数组-简单数据类型:{'grades': [76, 51, 84]}
from mongoengine import Document, IntField, ListField
class Student(Document):
grade = ListField(IntField())
情况二,单个文档:{'grade': {'course_name': '语文', 'score': 76}}
from mongoengine import Document, StringField, IntField, ListField, EmbeddedDocument, EmbeddedDocumentField
# 自定义类型
class CourseGrade(EmbeddedDocument):
course_name = StringField()
score = IntField()
class Student(Document):
grade = EmbeddedDocumentField(CourseGrade)
情况三,数组-文档:{'grades': [{'score': 76}, {'score': 51}]}
from mongoengine import Document, IntField, ListField, EmbeddedDocument, EmbeddedDocumentField
# 自定义类型
class CourseGrade(EmbeddedDocument):
score = IntField()
class Student(Document):
grade = ListField(EmbeddedDocumentField(CourseGrade))
完整代码:
from mongoengine import Document, connect, EnumField, StringField, IntField, ListField, EmbeddedDocument, \
EmbeddedDocumentField
from enum import Enum
# 连接到User数据库
connect('user', host='192.168.124.104', port=27017)
class SexEnum(Enum):
MAN = '男'
WOMAN = '女'
class CourseGrade(EmbeddedDocument):
"""成绩信息(科目、老师、成绩)-被嵌套的文档"""
course_name = StringField(max_length=64, required=True, verbose_name='科目')
teacher = StringField(max_length=16, verbose_name='老师')
score = IntField(min_value=0, max_value=150, required=True, verbose_name='成绩')
def __repr__(self):
return f"CourseGrade({self.course_name},{self.score})"
def __str__(self):
return self.__repr__()
class Student(Document):
"""学生信息"""
# verbose_name 自定义参数,用于显示定义域的名称
stu_no = IntField(required=True, unique=True, verbose_name='学号')
stu_name = StringField(required=True, max_length=16, verbose_name='姓名')
sex = EnumField(enum=SexEnum, verbose_name='性别')
class_name = StringField(max_length=10, verbose_name='班级')
address = StringField(max_length=255, verbose_name='家庭住址')
phone_no = StringField(max_length=11, verbose_name='电话号码')
age = IntField(min_value=0, max_value=150, verbose_name='年龄')
grades = ListField(EmbeddedDocumentField(CourseGrade), verbose_name='成绩数组')
meta = {
# 指定文档的集合
'collection': 'students',
# 指定排序,可以指定多个域。例如:'age':根据年龄升序,'-age':根据年龄降序
'ordering': ['-age']
}
def __repr__(self):
return f'Grade({self.stu_no}, {self.stu_name})'
def __str__(self):
return self.__repr__()
class Grade(Document):
"""学生成绩"""
# verbose_name 自定义参数,用于显示定义域的名称
stu_no = IntField(required=True, verbose_name="学号")
stu_name = StringField(required=True, max_length=16, verbose_name='姓名')
sex = EnumField(enum=SexEnum, verbose_name='性别')
class_name = StringField(max_length=10, verbose_name='班级')
address = StringField(max_length=255, verbose_name='家庭住址')
phone_no = StringField(max_length=11, verbose_name='电话号码')
age = IntField(min_value=0, max_value=150, verbose_name='年龄')
grade = EmbeddedDocumentField(CourseGrade, verbose_name='成绩')
meta = {
# 指定文档的集合
'collection': 'grades',
# 指定排序,可以指定多个域。例如:'age':根据年龄升序,'-age':根据年龄降序
'ordering': ['-age']
}
def __repr__(self):
return f'Grade({self.stu_no}, {self.stu_name})'
def __str__(self):
return self.__repr__()
三、添加数据
添加数据一般有两种方式:
3.1、方式一一般步骤:
- 第一步,构造ODM模型类对象:
user=User(username='张三')
- 第二步,验证数据:
user.validate()
- 第三步,保存数据:
user.save()
模型中的验
-
内置的验证器,如
max_length, min_value
-
自定义验证器
模型中自定义验证方法示例代码如下:
import re
from mongoengine import StringField
from mongoengine.errors import ValidationError
def phone_required(value):
pattern = r'^1[0-9][10]$'
if not re.search(pattern, value):
raise ValidationError('请输入正确的手机号')
phone_no = StringField(validation=phone_required)
方式一示例代码:
import random
from model import Student, Grade, SexEnum, CourseGrade
class LearnMongoDBEngine(object):
def __init__(self, info):
self.info = info
print(self.info)
def add_one_student(self):
"""新增一个学生信息"""
student = Student(
stu_no=random.randint(3000, 9999999),
stu_name=self.info['name'],
sex=random.choice([SexEnum.MAN, SexEnum.WOMAN]),
class_name='六年级三班',
address=self.info['address'],
phone_no=self.info['phone'],
age=random.randint(10, 15),
grades=[
CourseGrade(course_name='语文', teacher=self.info['teacher'], score=random.randint(1, 100)),
CourseGrade(course_name='数学', teacher=self.info['teacher'], score=random.randint(1, 100)),
CourseGrade(course_name='英语', teacher=self.info['teacher'], score=random.randint(1, 100)),
]
)
print(student, student.grades)
result = student.save()
return result
if __name__ == '__main__':
info = {'name': '铁扇公主', 'address': '北京市朝阳区', 'phone': '19121741234', 'teacher': '王老师'}
obj = LearnMongoDBEngine(info)
obj.add_one_student()
mongodb数据库中插入的数据:
User.objects.create(**kwargs)
示例代码:
import random
from model import Student, Grade, SexEnum, CourseGrade
class LearnMongoDBEngine(object):
def __init__(self, info):
self.info = info
print(self.info)
def add_one_student(self):
"""新增一个学生信息"""
result = Student.objects.create(
stu_no=random.randint(3000, 9999999),
stu_name=self.info['name'],
sex=random.choice([SexEnum.MAN, SexEnum.WOMAN]),
class_name='六年级三班',
address=self.info['address'],
phone_no=self.info['phone'],
age=random.randint(10, 15),
grades=[
CourseGrade(course_name='语文', teacher=self.info['teacher'], score=random.randint(1, 100)),
CourseGrade(course_name='数学', teacher=self.info['teacher'], score=random.randint(1, 100)),
CourseGrade(course_name='英语', teacher=self.info['teacher'], score=random.randint(1, 100)),
]
)
# student = Student(
# stu_no=random.randint(3000, 9999999),
# stu_name=self.info['name'],
# sex=random.choice([SexEnum.MAN, SexEnum.WOMAN]),
# class_name='六年级三班',
# address=self.info['address'],
# phone_no=self.info['phone'],
# age=random.randint(10, 15),
# grades=[
# CourseGrade(course_name='语文', teacher=self.info['teacher'], score=random.randint(1, 100)),
# CourseGrade(course_name='数学', teacher=self.info['teacher'], score=random.randint(1, 100)),
# CourseGrade(course_name='英语', teacher=self.info['teacher'], score=random.randint(1, 100)),
# ]
# )
print(result, result.grades)
# result = student.save()
return result
if __name__ == '__main__':
info = {'name': '卢俊义', 'address': '上海市浦东新区', 'phone': '18721741234', 'teacher': '张老师'}
obj = LearnMongoDBEngine(info)
obj.add_one_student()
mongodb数据库中插入的数据:
结果集QuerySet的获取:User.objects
,User是模型对象
常用的查询方法:
- all():查询所有文档
- filter():按照条件查询
- count():满足条件的文档数
- sum()/average():求和/求平均数
- order_by():排序
- .skip().limit():分页
- first():没有文档则返回None,
User.objects.first()
- get(**kwargs) 多个文档时,异常:MultipleObjectsReturned 没有文档时,异常:DoesNotExist 仅有一个文档时:返回ODM对象
- only():返回需要的字段
示例代码:
from model import Student
class LearnMongoDBEngine(object):
def get_one_student(self):
"""查询一个学生的信息"""
student_info = Student.objects.first()
print(student_info, student_info.id, student_info.stu_name, student_info.address)
return student_info
def get_all_student(self):
"""查询所有学生的信息"""
student_all_info = Student.objects.all()
print(student_all_info)
return student_all_info
def get_student_by_id(self, pk: str):
"""根据学生的id查询"""
student_info_id = Student.objects.get(id=pk)
print(student_info_id)
return student_info_id
if __name__ == '__main__':
obj = LearnMongoDBEngine()
obj.get_one_student()
obj.get_all_student()
obj.get_student_by_id('62dcd1f1a0da9e5521e73223')
运行结果:
当只想检索一条document的某些字段,可以使用only(),把需要检索的字段作为参数,没有选中的字段则会返回默认值。
示例代码:
from model import Student
class LearnMongoDBEngine(object):
def get_one_student(self):
"""查询一个学生的信息"""
student_info = Student.objects.first()
print(student_info, student_info.id, student_info.stu_name, student_info.address)
return student_info
def get_all_student(self):
"""查询所有学生的信息"""
student_all_info = Student.objects.all()
print(student_all_info)
return student_all_info
def get_all_student_only(self):
"""查询学生的某个字段信息"""
student_all_info = Student.objects.only('stu_no').first()
print(student_all_info.stu_no, student_all_info.stu_name)
student_all_info = Student.objects.only('stu_no', 'stu_name').first()
print(student_all_info.stu_no, student_all_info.stu_name)
return student_all_info
if __name__ == '__main__':
# 自我测试代码
obj = LearnMongoDBEngine()
aa = obj.get_one_student()
print(aa)
print("*" * 100)
bb = obj.get_all_student()
print(bb)
print("*" * 100)
cc = obj.get_all_student_only()
print(cc)
运行结果:
1)比较运算符
在MongoEngine中使用双下划线(__)分割。比如:age__gt=12
,表示年龄大于12的学生信息。
2)MongoEngine中的字符串查询 【i表示不区分大小写】
3)多个条件组合查询
- Q函数的使用:from mongoengine.queryset.visitor import Q
- 多个条件同时满足:Student.objects.filter(Q(key1=value1) & Q(key2=value2))
- 多个条件部分满足:Student.objects.filter(Q(key1=value1) | Q(key2=value2))
from mongoengine.queryset.visitor import Q
from model import Student, Grade, SexEnum
class LearnMongoDBEngine(object):
def get_one_student(self):
"""查询一个学生的信息"""
student_info = Student.objects.first()
print(student_info, student_info.id, student_info.stu_name, student_info.address)
return student_info
def get_all_student(self):
"""查询所有学生的信息"""
student_all_info = Student.objects.all()
print(student_all_info)
return student_all_info
def get_student_by_id(self, pk: str):
"""根据学生的id查询"""
student_info_id = Student.objects.get(id=pk)
print(student_info_id)
return student_info_id
def get_student_1(self):
"""获取大于12岁的学生信息"""
result = Student.objects.filter(age__gt=12)
print(result)
return result
def get_student_2(self):
"""获取所有姓李的学生"""
result = Student.objects.filter(stu_name__startswith='李')
print(result)
return result
def get_student_3(self):
"""查询年龄在9~12之间(含)的学生信息"""
# SELECT * FROM school_student_info WHERE age BETWEEN 9 AND 12;
# db.students.find({'age': {'$gte': 9, '$lte': 12}})
result = Student.objects.filter(Q(age__gte=9) & Q(age__lte=12))
print(result)
return result
def get_student_4(self):
"""查询所有12岁以上的男生和9岁以下的女生"""
result = Student.objects.filter(Q(age__gt=12, sex=SexEnum.MAN) | Q(age__lt=9, sex=SexEnum.WOMAN))
print(result)
return result
def get_grade(self):
"""查询大于等于80分的学生成绩信息"""
result = Student.objects.filter(grades__score__gte=80) # 注意这儿写法
print(result)
for i in result:
print(i.grades)
return result
if __name__ == '__main__':
obj = LearnMongoDBEngine()
obj.get_one_student()
obj.get_student_1()
obj.get_student_2()
obj.get_student_3()
obj.get_student_4()
obj.get_grade()
运行结果: 【注意:打印信息是由model中__call__函数设置决定的】
对于嵌套列表中数据的查询时,需要指定条件为列表的第几列时,可以在条件中指定第几列的。如:Student.objects.filter(grades__0__score__gte=80)
示例代码:
from model import Student, Grade, SexEnum, CourseGrade
class LearnMongoDBEngine(object):
def __init__(self, info):
self.info = info
print(self.info)
def get_grade_2(self):
"""查询大于等于80分的学生成绩信息"""
result = Student.objects.filter(grades__score__gte=80) # 注意这儿写法
print(result)
for i in result:
print(i.grades)
print("*" * 100)
"""查询语文成绩大于等于80分的学生成绩信息"""
result = Student.objects.filter(grades__0__score__gte=80) # 注意:这儿0代表嵌套列表中的第一列
print(result)
for i in result:
print(i.grades)
return result
if __name__ == '__main__':
# 自我测试代码
obj = LearnMongoDBEngine('info')
obj.get_grade_2()
运行结果:
可以直接在objects()里面直接写查询限制条件,而不再使用filter() ,如:Student.objects(age__gt=11).all()
示例代码:
import random
from mongoengine.queryset.visitor import Q
from model import Student, Grade, SexEnum, CourseGrade
class LearnMongoDBEngine(object):
def __init__(self, info):
self.info = info
print(self.info)
def get_all_student2(self):
"""查询所有学生的信息,并测试在objects里面填写查询条件"""
student_all_info = Student.objects.all()
print(student_all_info, student_all_info.count())
query = None
student_all_info2 = Student.objects(query).all()
print(student_all_info2, student_all_info2.count())
student_all_info2 = Student.objects(age__gt=11).all()
print(student_all_info2, student_all_info2.count())
return student_all_info
if __name__ == '__main__':
# 自我测试代码
obj = LearnMongoDBEngine('info')
obj.get_all_student2()
运行结果:
- 满足条件的文档数:
User.objects.count()
,所有的结果集都可以使用 - 求和/平均数:
User.objects.filter().sum(field) / User.objects.filter().average(field)
aggrate()聚合方法,使用pipeline()管道
示例代码1:
from mongoengine.queryset.visitor import Q
from model import Student, Grade, SexEnum
class LearnMongoDBEngine(object):
def get_student_4(self):
"""查询所有12岁以上的男生和9岁以下的女生"""
result = Student.objects.filter(Q(age__gt=12, sex=SexEnum.MAN) | Q(age__lt=9, sex=SexEnum.WOMAN))
print(result)
return result
def get_student_5(self):
"""查询所有12岁以上的男生和9岁以下的女生总数"""
result = Student.objects.filter(Q(age__gt=12, sex=SexEnum.MAN) | Q(age__lt=9, sex=SexEnum.WOMAN)).count()
print(result)
return result
def get_man_sex(self):
"""统计性别为男的总人数,并求出其平均年龄和总年龄"""
queryset = Student.objects.filter(sex='男')
print(queryset)
man_num_count = queryset.count()
print(man_num_count)
man_avg_age = queryset.average('age')
print(man_avg_age)
man_sum_age = queryset.sum('age')
print(man_sum_age)
return man_avg_age, man_sum_age
if __name__ == '__main__':
obj = LearnMongoDBEngine()
obj.get_student_4()
obj.get_student_5()
obj.get_man_sex()
运行结果:
示例代码2:
from model import Student
class LearnMongoDBEngine(object):
def get_man_sex(self):
"""统计性别为男的总人数,并求出其平均年龄和总年龄"""
queryset = Student.objects.filter(sex='男')
print(queryset)
man_num_count = queryset.count()
print(man_num_count)
man_avg_age = queryset.average('age')
print(man_avg_age)
man_sum_age = queryset.sum('age')
print(man_sum_age)
return man_avg_age, man_sum_age
def get_man_age_total_agg(self):
"""使用聚合管道获得男性年龄总和"""
pipeline = [
{
"$match": {
"sex": '男'
}
},
{
"$group": {
"_id": None,
"total": {
"$sum": '$age'
}
}
}
]
# man_sum_age_agg = Student.objects.filter().aggregate(pipeline)
man_sum_age_agg = Student.objects.aggregate(pipeline)
print(man_sum_age_agg) # 返回的是一个对象
for data in man_sum_age_agg:
print(data)
if __name__ == '__main__':
# 自我测试代码
obj = LearnMongoDBEngine()
aa = obj.get_man_sex()
print(aa)
print("*" * 100)
obj.get_man_age_total_agg()
运行结果:
- -:倒叙排列
- (+):正序排序,默认就是正序
Student.objects().order_by('field1', '-field2')
from mongoengine.queryset.visitor import Q
from model import Student, Grade, SexEnum
class LearnMongoDBEngine(object):
def get_max_min_age(self):
"""获取最大年龄和最小年龄的学生信息"""
queryset_min_age = Student.objects.order_by('age').first()
queryset_max_age = Student.objects.order_by('-age').first()
print(queryset_min_age, queryset_min_age.age)
print(queryset_max_age, queryset_max_age.age)
return '200 OK'
if __name__ == '__main__':
obj = LearnMongoDBEngine()
obj.get_max_min_age()
运行结果:
- 方式一,切片方式:
User.objects.all()[10:15]
- 方式二,.skip().limit():
User.objects.skip(10).limit(5)
from mongoengine.queryset.visitor import Q
from model import Student, Grade, SexEnum
class LearnMongoDBEngine(object):
def paginate(self, page: int = 1, page_size: int = 5):
"""
分页处理
:param page: 当前是第几页
:param page_size: 每页有多少数据
:return:
"""
# 方法一:切片
start = (page - 1) * page_size
end = start + page_size
queryset1 = Student.objects.all()[start:end]
print(queryset1)
# 方法二:skip().limit()
queryset2 = Student.objects.skip(start).limit(page_size)
print(queryset2)
return "200 OK"
if __name__ == '__main__':
obj = LearnMongoDBEngine()
obj.paginate(1, 6)
print("*" * 100)
obj.paginate(1, 3)
obj.paginate(2, 3)
运行结果:
修改数据时一般先过滤数据,再修改
- 修改一条数据:
User.objects.filter().update_one()
- 修改一条数据:User._get_db()['集合名称'].update_one({},{}) 局部替换
- 修改一条数据:User()._get_collection().update_one({},{}) 局部替换
- 修改一条数据:User._get_db()['集合名称'].replace_one({},{}) 整体替换
- 修改一条数据:User()._get_collection().replace_one({},{}) 整体替换
- 批量修改数据:
User.objects.filter().update()
示例代码1:
from mongoengine.queryset.visitor import Q
from model import Student, Grade, SexEnum
class LearnMongoDBEngine(object):
def update_one(self):
"""修改一条数据"""
queryset = Student.objects.filter(stu_no='2438197')
print(queryset)
result = queryset.update_one(stu_name='白龙马', phone_no='16452412564')
# result = queryset.update_one(stu_name='白龙马', unset__phone_no=True)
print(result)
def update_one_2(self):
"""修改一条数据"""
queryset = Student.objects.filter(stu_no=3152784).first()
print(queryset)
if queryset:
queryset.stu_name = '沙和尚'
result = queryset.save()
print(result)
return "200 OK"
else:
return "error"
def update_many(self):
"""将年龄10岁的学生年龄加一岁"""
queryset = Student.objects.filter(age=10)
print(queryset)
queryset.update(inc__age=1)
print(queryset)
return "200 OK"
if __name__ == '__main__':
obj = LearnMongoDBEngine()
obj.update_one()
obj.update_one_2()
obj.update_many()
运行结果:
示例代码2: 【User._get_db()['集合名称'].update_one({},{}) 局部替换】
from model import Student
student = Student()
# aa = student._get_collection().find({'stu_name': '王五'})
aa = student._get_collection().find() # 默认全查询
print(aa)
for i in aa:
print(i)
# 使用update_one进行局部更新,students为集合的名称
# 只需要放需要更新的字段,注意使用$进行操作
bb = Student._get_db()['students'].update_one({'stu_name': '王五'}, {"$set": {'stu_no': 888888, 'age': 36}})
print(bb)
cc = student._get_collection().find() # 默认全查询
print(cc)
for i in cc:
print(i)
运行结果:
示例代码3: 【User()._get_collection().update_one({},{}) 局部替换】
from model import Student
student = Student()
# aa = student._get_collection().find({'stu_name': '王五'})
aa = student._get_collection().find() # 默认全查询
print(aa)
for i in aa:
print(i)
# 使用update_one进行局部更新,students为集合的名称
# 只需要放需要更新的字段,注意使用$进行操作
bb = student._get_collection().update_one({'stu_name': '王五'}, {"$set": {'stu_no': 999999, 'age': 21}})
print(bb)
cc = student._get_collection().find() # 默认全查询
print(cc)
for i in cc:
print(i)
运行结果:
示例代码4: 【User._get_db()['集合名称'].replace_one({},{}) 整体替换】
from model import Student
student = Student()
# aa = student._get_collection().find({'stu_name': '王五'})
aa = student._get_collection().find() # 默认全查询
print(aa)
for i in aa:
print(i)
# 使用replace_one进行整体替换,students为集合的名称
# replace_one要求是所有数据,少一个key-value则报错
doument = {'stu_no': 555555, 'stu_name': '张三', 'address': '山东省青岛市', 'phone_no': '12345678910', 'age': 25}
bb = Student._get_db()['students'].replace_one({'stu_name': '张三'}, doument, upsert=True)
print(bb)
cc = student._get_collection().find() # 默认全查询
print(cc)
for i in cc:
print(i)
运行结果:
示例代码5: 【User()._get_collection().replace_one({},{}) 整体替换】
from model import Student
student = Student()
# aa = student._get_collection().find({'stu_name': '王五'})
aa = student._get_collection().find() # 默认全查询
print(aa)
for i in aa:
print(i)
# 使用replace_one进行整体替换,students为集合的名称
# replace_one要求是所有数据,少一个key-value则报错
doument = {'stu_no': 666666, 'stu_name': '张三', 'address': '山东省青岛市', 'phone_no': '12345678910', 'age': 26}
bb = student._get_collection().replace_one({'stu_name': '张三'}, doument, upsert=True)
print(bb)
cc = student._get_collection().find() # 默认全查询
print(cc)
for i in cc:
print(i)
运行结果:
- User.objects.filter().delete()
from mongoengine.queryset.visitor import Q
from model import Student, Grade, SexEnum
class LearnMongoDBEngine(object):
def delete_data(self):
"""删除年龄大于13岁的学生"""
queryset_start = Student.objects.all()
print(f"删除前学生总数量:{queryset_start.count()}")
queryset = Student.objects.filter(age__gt=13)
print(f'删除的学生的数量:{queryset.count()}')
res = queryset.delete()
print(f"删除的结果:{res}")
queryset_end = Student.objects.all()
print(f"删除后剩余学生总数量:{queryset_end.count()}")
if __name__ == '__main__':
obj = LearnMongoDBEngine()
obj.delete_data()
运行结果:
附录:
main.py
import random
from mongoengine.queryset.visitor import Q
from model import Student, Grade, SexEnum, CourseGrade
class LearnMongoDBEngine(object):
def __init__(self, info):
self.info = info
print(self.info)
def add_one_student(self):
"""新增一个学生信息"""
result = Student.objects.create(
stu_no=random.randint(3000, 9999999),
stu_name=self.info['name'],
sex=random.choice([SexEnum.MAN, SexEnum.WOMAN]),
class_name='六年级三班',
address=self.info['address'],
phone_no=self.info['phone'],
age=random.randint(10, 15),
grades=[
CourseGrade(course_name='语文', teacher=self.info['teacher'], score=random.randint(1, 100)),
CourseGrade(course_name='数学', teacher=self.info['teacher'], score=random.randint(1, 100)),
CourseGrade(course_name='英语', teacher=self.info['teacher'], score=random.randint(1, 100)),
]
)
# student = Student(
# stu_no=random.randint(3000, 9999999),
# stu_name=self.info['name'],
# sex=random.choice([SexEnum.MAN, SexEnum.WOMAN]),
# class_name='六年级三班',
# address=self.info['address'],
# phone_no=self.info['phone'],
# age=random.randint(10, 15),
# grades=[
# CourseGrade(course_name='语文', teacher=self.info['teacher'], score=random.randint(1, 100)),
# CourseGrade(course_name='数学', teacher=self.info['teacher'], score=random.randint(1, 100)),
# CourseGrade(course_name='英语', teacher=self.info['teacher'], score=random.randint(1, 100)),
# ]
# )
print(result, result.grades)
# result = student.save()
return result
def get_one_student(self):
"""查询一个学生的信息"""
student_info = Student.objects.first()
print(student_info, student_info.id, student_info.stu_name, student_info.address)
return student_info
def get_all_student(self):
"""查询所有学生的信息"""
student_all_info = Student.objects.all()
print(student_all_info)
return student_all_info
def get_all_student2(self):
"""查询所有学生的信息,并测试在objects里面填写查询条件"""
student_all_info = Student.objects.all()
print(student_all_info, student_all_info.count())
query = None
student_all_info2 = Student.objects(query).all()
print(student_all_info2, student_all_info2.count())
student_all_info2 = Student.objects(age__gt=11).all()
print(student_all_info2, student_all_info2.count())
return student_all_info
def get_all_student_only(self):
"""查询学生的某个字段信息"""
student_all_info = Student.objects.only('stu_no').first()
print(student_all_info.stu_no, student_all_info.stu_name)
student_all_info = Student.objects.only('stu_no', 'stu_name').first()
print(student_all_info.stu_no, student_all_info.stu_name)
return student_all_info
def get_student_by_id(self, pk: str):
"""根据学生的id查询"""
student_info_id = Student.objects.get(id=pk)
print(student_info_id)
return student_info_id
def get_student_1(self):
"""获取大于12岁的学生信息"""
result = Student.objects.filter(age__gt=12)
print(result)
return result
def get_student_2(self):
"""获取所有姓李的学生"""
result = Student.objects.filter(stu_name__startswith='李')
print(result)
return result
def get_student_3(self):
"""查询年龄在9~12之间(含)的学生信息"""
# SELECT * FROM school_student_info WHERE age BETWEEN 9 AND 12;
# db.students.find({'age': {'$gte': 9, '$lte': 12}})
result = Student.objects.filter(Q(age__gte=9) & Q(age__lte=12))
print(result)
return result
def get_student_4(self):
"""查询所有12岁以上的男生和9岁以下的女生"""
result = Student.objects.filter(Q(age__gt=12, sex=SexEnum.MAN) | Q(age__lt=9, sex=SexEnum.WOMAN))
print(result)
return result
def get_student_5(self):
"""查询所有12岁以上的男生和9岁以下的女生总数"""
result = Student.objects.filter(Q(age__gt=12, sex=SexEnum.MAN) | Q(age__lt=9, sex=SexEnum.WOMAN)).count()
print(result)
return result
def get_man_sex(self):
"""统计性别为男的总人数,并求出其平均年龄和总年龄"""
queryset = Student.objects.filter(sex='男')
print(queryset)
man_num_count = queryset.count()
print(man_num_count)
man_avg_age = queryset.average('age')
print(man_avg_age)
man_sum_age = queryset.sum('age')
print(man_sum_age)
return man_avg_age, man_sum_age
def get_man_age_total_agg(self):
"""使用聚合管道获得男性年龄总和"""
pipeline = [
{
"$match": {
"sex": '男'
}
},
{
"$group": {
"_id": None,
"total": {
"$sum": '$age'
}
}
}
]
# man_sum_age_agg = Student.objects.filter().aggregate(pipeline)
man_sum_age_agg = Student.objects.aggregate(pipeline)
print(man_sum_age_agg) # 返回的是一个对象
for data in man_sum_age_agg:
print(data)
return data
def get_grade(self):
"""查询大于等于80分的学生成绩信息"""
result = Student.objects.filter(grades__score__gte=80) # 注意这儿写法
print(result)
for i in result:
print(i.grades)
return result
def get_grade_2(self):
"""查询大于等于80分的学生成绩信息"""
result = Student.objects.filter(grades__score__gte=80) # 注意这儿写法
print(result)
for i in result:
print(i.grades)
print("*" * 100)
"""查询语文成绩大于等于80分的学生成绩信息"""
result = Student.objects.filter(grades__0__score__gte=80) # 注意:这儿0代表嵌套列表中的第一列
print(result)
for i in result:
print(i.grades)
return result
def get_max_min_age(self):
"""获取最大年龄和最小年龄的学生信息"""
queryset_min_age = Student.objects.order_by('age').first()
queryset_max_age = Student.objects.order_by('-age').first()
print(queryset_min_age, queryset_min_age.age)
print(queryset_max_age, queryset_max_age.age)
return '200 OK'
def paginate(self, page: int = 1, page_size: int = 5):
"""
分页处理
:param page: 当前是第几页
:param page_size: 每页有多少数据
:return:
"""
# 方法一:切片
start = (page - 1) * page_size
end = start + page_size
queryset1 = Student.objects.all()[start:end]
print(queryset1)
# 方法二:skip().limit()
queryset2 = Student.objects.skip(start).limit(page_size)
print(queryset2)
return "200 OK"
def update_one(self):
"""修改一条数据"""
queryset = Student.objects.filter(stu_no='2438197')
print(queryset)
result = queryset.update_one(stu_name='白龙马', phone_no='16452412564')
# result = queryset.update_one(stu_name='白龙马', unset__phone_no=True)
print(result)
def update_one_2(self):
"""修改一条数据"""
queryset = Student.objects.filter(stu_no=3152784).first()
print(queryset)
if queryset:
queryset.stu_name = '沙和尚'
result = queryset.save()
print(result)
return "200 OK"
else:
return "error"
def update_many(self):
"""将年龄10岁的学生年龄加一岁"""
queryset = Student.objects.filter(age=10)
print(queryset)
queryset.update(inc__age=1)
print(queryset)
return "200 OK"
def delete_data(self):
"""删除年龄大于13岁的学生"""
queryset_start = Student.objects.all()
print(f"删除前学生总数量:{queryset_start.count()}")
queryset = Student.objects.filter(age__gt=13)
print(f'删除的学生的数量:{queryset.count()}')
res = queryset.delete()
print(f"删除的结果:{res}")
queryset_end = Student.objects.all()
print(f"删除后剩余学生总数量:{queryset_end.count()}")
if __name__ == '__main__':
# 自我测试代码
pass
model.py
from mongoengine import Document, connect, EnumField, StringField, IntField, ListField, EmbeddedDocument, \
EmbeddedDocumentField
from enum import Enum
# 连接到User数据库
connect('user', host='192.168.124.104', port=27017)
class SexEnum(Enum):
MAN = '男'
WOMAN = '女'
class CourseGrade(EmbeddedDocument):
"""成绩信息(科目、老师、成绩)-被嵌套的文档"""
course_name = StringField(max_length=64, required=True, verbose_name='科目')
teacher = StringField(max_length=16, verbose_name='老师')
score = IntField(min_value=0, max_value=150, required=True, verbose_name='成绩')
def __repr__(self):
return f"CourseGrade({self.course_name},{self.score})"
def __str__(self):
return self.__repr__()
class Student(Document):
"""学生信息"""
# verbose_name 自定义参数,用于显示定义域的名称
stu_no = IntField(required=True, unique=True, verbose_name='学号')
stu_name = StringField(required=True, max_length=16, verbose_name='姓名')
sex = EnumField(enum=SexEnum, verbose_name='性别')
class_name = StringField(max_length=10, verbose_name='班级')
address = StringField(max_length=255, verbose_name='家庭住址')
phone_no = StringField(max_length=11, verbose_name='电话号码')
age = IntField(min_value=0, max_value=150, verbose_name='年龄')
grades = ListField(EmbeddedDocumentField(CourseGrade), verbose_name='成绩数组')
meta = {
# 指定文档的集合
'collection': 'students',
# 指定排序,可以指定多个域。例如:'age':根据年龄升序,'-age':根据年龄降序
'ordering': ['-age'],
'strict': False # 设置非严格校验字段则不需要吧所有字段都声明
}
def __repr__(self):
return f'Grade({self.stu_no}, {self.stu_name})'
def __str__(self):
return self.__repr__()
class Grade(Document):
"""学生成绩"""
# verbose_name 自定义参数,用于显示定义域的名称
stu_no = IntField(required=True, verbose_name="学号")
stu_name = StringField(required=True, max_length=16, verbose_name='姓名')
sex = EnumField(enum=SexEnum, verbose_name='性别')
class_name = StringField(max_length=10, verbose_name='班级')
address = StringField(max_length=255, verbose_name='家庭住址')
phone_no = StringField(max_length=11, verbose_name='电话号码')
age = IntField(min_value=0, max_value=150, verbose_name='年龄')
grades = EmbeddedDocumentField(CourseGrade, verbose_name='成绩')
meta = {
# 指定文档的集合
'collection': 'grades',
# 指定排序,可以指定多个域。例如:'age':根据年龄升序,'-age':根据年龄降序
'ordering': ['-age']
}
def __repr__(self):
return f'Grade({self.stu_no}, {self.stu_name})'
def __str__(self):
return self.__repr__()
官方文档: https://docs.mongoengine.org/guide/querying.html