• [技术干货] 在CentOS 7上安装MongoDB数据库的方法步骤 --转载
    简介MongoDB 是一个免费、开源的面向文档的数据库。它被归类为 NoSQL 数据库,因为它不依赖于传统的基于表的关系数据库结构。相反,它使用具有动态模式的类似 JSON 的文档。与关系数据库不同,MongoDB 在向数据库添加数据之前不需要预定义模式。您可以随时和任意次数地更改模式,而无需设置具有更新后模式的新数据库。在本教程中,您将在 CentOS 7 服务器上安装 MongoDB 社区版。先决条件在按照本教程之前,请确保您拥有:一个具有非 root sudo 权限的 CentOS 7 服务器。有关指导,请参阅我们的教程《在 CentOS 7 上进行初始服务器设置》。步骤 1 – 添加 MongoDB 仓库mongodb-org 软件包在 CentOS 的默认仓库中不存在。但是,MongoDB 维护了一个专用的仓库。让我们将其添加到我们的服务器上。使用 vi 编辑器,为 CentOS 的软件包管理实用程序 yum 创建一个 .repo 文件:1sudo vi /etc/yum.repos.d/mongodb-org.repo然后,访问 MongoDB 文档的 Red Hat 安装部分,并将最新稳定版本的仓库信息添加到文件中:123456[mongodb-org-6.0]name=MongoDB Repositorybaseurl=https://repo.mongodb.org/yum/redhat/$releasever/mongodb-org/6.0/x86_64/gpgcheck=1enabled=1gpgkey=https://www.mongodb.org/static/pgp/server-6.0.asc通过按 ESC 键保存文件更改,然后输入 :wq,并按 ENTER 键。在继续之前,您应该验证 MongoDB 仓库是否存在于 yum 实用程序中。repolist 命令显示已启用的仓库列表:1yum repolist1234567. . .repo id                          repo namebase/7/x86_64                    CentOS-7 - Baseextras/7/x86_64                  CentOS-7 - Extrasmongodb-org-6.0/7/x86_64         MongoDB Repositoryupdates/7/x86_64                 CentOS-7 - Updates. . .有了 MongoDB Repository,您可以继续进行安装。步骤 2 – 安装 MongoDB您可以使用 yum 实用程序从第三方仓库安装 mongodb-org 软件包。1sudo yum install mongodb-org会有两个 Is this ok [y/N]: 提示。第一个允许安装 MongoDB 软件包,第二个导入 GPG 密钥。MongoDB 的发布者使用密钥来确认已下载软件包的完整性。在每个提示处,输入 Y,然后按 ENTER 键。接下来,使用 systemctl 实用程序启动 MongoDB 服务:1sudo systemctl start mongod虽然在本教程中我们不会使用它们,但您也可以使用 reload 和 stop 命令更改 MongoDB 服务的状态。reload 命令请求 mongod 进程读取配置文件 /etc/mongod.conf,并应用任何更改,而无需重新启动。1sudo systemctl reload mongodstop 命令停止所有正在运行的 mongod 进程。1sudo systemctl stop mongod在执行 start 命令后,systemctl 实用程序没有提供结果,但您可以通过使用 tail 命令查看 mongod.log 文件的末尾来检查服务是否已启动:1sudo tail /var/log/mongodb/mongod.log12. . .[initandlisten] waiting for connections on port 27017等待连接 的输出确认了 MongoDB 已成功启动,并且您可以使用 MongoDB Shell 访问数据库服务器:1mongo要了解如何从 shell 与 MongoDB 交互,您可以查看 db.help() 方法的输出,该方法提供了 db 对象的方法列表。1db.help()1234567891011DB methods:    db.adminCommand(nameOrDocument) - 切换到 'admin' 数据库,并运行命令 [ 仅调用 db.runCommand(...) ]    db.auth(username, password)    db.cloneDatabase(fromhost)    db.commandHelp(name) 返回命令的帮助    db.copyDatabase(fromdb, todb, fromhost)    db.createCollection(name, { size : ..., capped : ..., max : ... } )    db.createUser(userDocument)    db.currentOp() 显示数据库中当前正在执行的操作    db.dropDatabase(). . .让 mongod 进程在后台运行,但使用 exit 命令退出 shell:1exit1Bye步骤 3 – 验证启动由于一个依赖数据库的应用程序在没有数据库的情况下无法正常运行,我们将确保 MongoDB 守护程序 mongod 会随系统启动而启动。使用 systemctl 实用程序来检查其启动状态:1systemctl is-enabled mongod; echo $?输出为零表示已启用守护程序,这是我们想要的。而输出为一则表示守护程序被禁用,将不会启动。123. . .enabled0如果守护程序被禁用,使用 systemctl 实用程序来启用它:1sudo systemctl enable mongod现在你已经有一个运行中的 MongoDB 实例,它将在系统重新启动后自动启动。步骤 4 – 导入示例数据集(可选)与其他数据库服务器不同,MongoDB 不会在其 test 数据库中提供数据。由于我们不希望使用生产数据来尝试新软件,我们将从 MongoDB 示例中下载一个样本数据集。这个 JSON 文档包含了一组餐馆数据,我们将用它来练习与 MongoDB 的交互,并避免对敏感数据造成伤害。首先进入一个可写目录:1cd /tmp使用 curl 命令和 MongoDB 的链接来下载 JSON 文件:1curl -LO https://raw.githubusercontent.com/mongodb/docs-assets/primer-dataset/primer-dataset.jsonmongoimport 命令将数据插入到 test 数据库中。--db 标志定义了要使用的数据库,而 --collection 标志指定了信息将被存储在数据库中的位置,--file 标志告诉命令在哪个文件上执行导入操作:1mongoimport --db test --collection restaurants --file /tmp/primer-dataset.json输出确认了从 primer-dataset.json 文件导入数据:12connected to: localhostimported 25359 documents有了样本数据集,你可以对其执行查询。重新启动 MongoDB Shell:1mongoShell 默认选择 test 数据库,这就是你导入数据的地方。使用 find() 方法查询 restaurants 集合,以显示数据集中所有餐馆的列表。由于集合包含超过 25,000 条记录,使用可选的 limit() 方法将查询的输出减少到指定数量。此外,pretty() 方法通过换行和缩进使信息更易读。1db.restaurants.find().limit(1).pretty()12345678910111213141516171819202122232425262728293031323334353637383940414243{    "_id" : ObjectId("57e0443b46af7966d1c8fa68"),    "address" : {        "building" : "1007",        "coord" : [            -73.856077,            40.848447        ],        "street" : "Morris Park Ave",        "zipcode" : "10462"    },    "borough" : "Bronx",    "cuisine" : "Bakery",    "grades" : [        {            "date" : ISODate("2014-03-03T00:00:00Z"),            "grade" : "A",            "score" : 2        },        {            "date" : ISODate("2013-09-11T00:00:00Z"),            "grade" : "A",            "score" : 6        },        {            "date" : ISODate("2013-01-24T00:00:00Z"),            "grade" : "A",            "score" : 10        },        {            "date" : ISODate("2011-11-23T00:00:00Z"),            "grade" : "A",            "score" : 9        },        {            "date" : ISODate("2011-03-10T00:00:00Z"),            "grade" : "B",            "score" : 14        }    ],    "name" : "Morris Park Bake Shop",    "restaurant_id" : "30075445"} 你可以继续使用样本数据集来熟悉 MongoDB,或者使用 db.restaurants.drop() 方法删除它:1db.restaurants.drop()最后,使用 exit 命令退出 Shell:1exit1Bye
  • [技术干货] MongoDB中的Primary Shard详解 --转载
    什么是Primary Shard在MongoDB的Sharding架构中,每个database中都可以存储两种类型的集合,一种是未分片的集合,一种是通过分片键,被打散的集合。被分片键打散的集合数据可以均匀的分布在各个分片上;而对于未分片的集合,则只会存储在所在的database的Primary Shard中,每个database有且只有一个Primary Shard。简单来说就是,Primary Shard存储了当前数据库未分片的集合。其中Collection I为分片集合,数据被打散到不同的分片上。Collection 2作为未分片集合所有的所有都只存储在Primary Shard中,这里它的Primary Shard为Shard A。Primary Shard的选择及问题当我们通过mongos创建一个database时,mongos会根据当前每个分片中存储的数据量来决定哪个分片为当前数据库的Primary Shard,存储数据量最少的作为当前数据库的主分片。数据量判断依据为listDatabases命令中的totalSize的值。比如我们新建一个test1库,并在里面创建一个未分片的集合,mongos为它挑选的Primary Shard为shard2:12345678910111213141516171819202122[direct: mongos] test> use test1;switched to db test1[direct: mongos] test1> db.foo.insertOne({"name":"aaa"});{  acknowledged: true,  insertedId: ObjectId("66c6f060b47ae218b4ef12aa")}[direct: mongos] test1> sh.status()...  {    database: {      _id: 'test1',      primary: 'shard2',      partitioned: false,      version: {        uuid: new UUID("a419673b-944b-4cdd-8d11-f3074bbf43fb"),        timestamp: Timestamp({ t: 1724313695, i: 1 }),        lastMod: 1      }    },    collections: {}  }Primary Shard自动选择存在的问题问题一假设有如下需求:有两个专门存储配置集合的database,由于是配置集合,所有数据量比较小,不需要再将其数据打散,但是为了方面管理,需要将这两个配置库存放在同一个分片中。换句话说就是这两个配置库的Primary Shard需要为同一个。我们知道,mongos在选择Primary Shard的时候是根据当前各个分片现有的数据量来决定的,但是如果当前各个分片的数据量都比较均衡,当我们的业务数据在持续变动的过程中,mongos在选择Primary Shard的时候就会表现出随机性--先后创建的database的Primary Shard极有可能会是不同的分片。问题二假设目前需要有一个日志库,然后每个月都会以月份为后缀进行切库,就是说6月份的话我会创建一个叫DB_202406的库,7月份的话会创建一个叫做DB_202407的库,集合的月增量为1个T。由于增量比较大,后面肯定是需要不断加机器扩容的,当单台服务器的磁盘IO能力不是瓶颈的情况下,为了后期加机器扩容时不需要进行历史数据的均衡,所以日志集合的选择了未分片。那么当我进行月底切库,即创建新库的时候,mongos就会判断各个分片上的现有数据量,然后找一个数据量相对较小的分片作为Primary Shard,此时,如果各个分片的服务器磁盘并不是相同大小的,比如说A分片的磁盘大小为10T,B分片磁盘的大小为5T,但是A分片存储的数据为4.8T,B分片存储的数据为4.5T,这时候由于B分片上存储的数据少,mongos会将B分片选为新库的Primary Shard,但是实际情况是B分片所在的主机空闲空间已经完全不足以支持本月的月增数据量了,后面就只能临时扩容本机或删数据或将数据在线同步迁移到有足够空间的新机器上,并进行应用切换。如何解决针对上面存在的两个问题,根本原因还是由于在新建库后,mongs自动为当前库选择的Primary Shard可能会出现不合理性,所以我们在进行建库的时候,最好的办法是直接指定新库的Primary Shard,这样就可以规避掉上面两个问题。而且针对第二个问题,还需要做的是当业务上线前需要做好的架构规划和容量规划等。建库后指定当前库的Primary Shard:1234567891011121314151617181920212223242526272829[direct: mongos] test> use DB1switched to db DB1[direct: mongos] DB1> sh.enableSharding("DB1","shard1") {  ok: 1,  '$clusterTime': {    clusterTime: Timestamp({ t: 1724317769, i: 3 }),    signature: {      hash: Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0),      keyId: Long("0")    }  },  operationTime: Timestamp({ t: 1724317769, i: 1 })}[direct: mongos] DB1> sh.status()...  {    database: {      _id: 'DB1',      primary: 'shard1',      partitioned: false,      version: {        uuid: new UUID("ec0e50b8-3029-4333-887b-9ca0b54c4e20"),        timestamp: Timestamp({ t: 1724317768, i: 1 }),        lastMod: 1      }    },    collections: {}  }只能在use新库之后,立即指定新库的Primary Shard,不能等创建过集合并写入数据之后再指定,因为那时mongos已经为新库选好了Primary Shard,就不支持再次修改了12[direct: mongos] DB1> sh.enableSharding("DB1","shard3") MongoServerError: Database DB1 could not be created :: caused by :: database already creat
  • [技术干货] MongoDB配置用户名和密码的操作步骤 --转载
    要在 MongoDB 中配置用户名和密码,可以按照以下步骤进行操作:1. 启动 MongoDB 服务并连接首先确保 MongoDB 服务正在运行。然后,使用 MongoDB 客户端连接到 MongoDB 服务器。可以使用 mongo 命令行工具或者 MongoDB Compass 进行连接。使用 mongo 命令行工具连接:打开命令提示符(Windows)或终端(Linux/macOS),输入以下命令:1mongo这将连接到默认的 MongoDB 服务器(通常是本地的 localhost:27017)。2. 创建管理员用户(可选)如果还没有管理员用户,首先需要创建一个管理员用户,用于管理 MongoDB 数据库的访问和权限。以下是创建管理员用户的示例:123456use admindb.createUser({    user: "adminUser",    pwd: "adminPassword",    roles: [{ role: "userAdminAnyDatabase", db: "admin" }]})adminUser 是管理员用户名,adminPassword 是管理员用户的密码。roles 指定了管理员用户在 admin 数据库上的权限,这里是 userAdminAnyDatabase,允许管理任何数据库中的用户。3. 创建数据库用户接下来,为你的具体数据库(比如一个叫做 mydatabase 的数据库)创建用户。首先切换到要创建用户的数据库(如果不存在则会创建):1use mydatabase然后,创建一个用户并授予适当的角色:12345db.createUser({    user: "dbUser",    pwd: "dbPassword",    roles: [{ role: "readWrite", db: "mydatabase" }]})dbUser 是数据库用户名,dbPassword 是数据库用户的密码。roles 指定了用户在 mydatabase 数据库上的角色权限,这里是 readWrite,允许读写这个数据库中的数据。4. 退出并重新连接完成用户创建后,退出当前 MongoDB 连接,然后使用新创建的用户重新连接到 MongoDB 服务器:1exit然后,使用新创建的用户重新连接:1mongo -u dbUser -p dbPassword --authenticationDatabase mydatabase-u dbUser 和 -p dbPassword 分别指定了用户名和密码。--authenticationDatabase mydatabase 指定了用于认证的数据库,这里是 mydatabase,因为我们创建的用户是在 mydatabase 数据库下。5. 配置 MongoDB 以允许身份验证如果 MongoDB 服务器默认没有启用身份验证,你需要修改 MongoDB 的配置文件 mongod.conf 并重新启动 MongoDB 服务。在 mongod.conf 文件中添加或修改以下行:12security:  authorization: enabled保存并关闭文件。然后,重启 MongoDB 服务以使更改生效。注意事项安全性:确保密码强度足够,并且不要将密码硬编码在应用程序中。权限管理:根据实际需求分配合适的角色和权限给用户。生产环境:上述步骤适用于开发和测试环境。在生产环境中,建议根据安全最佳实践进行更详细的配置和管理。
  • [技术干货] SpringBoot集成Mongodb的操作方法 --转载
    1.什么是mongoDB?    MongoDB是一种非关系型数据库,被广泛用于大型数据存储和分布式系统的构建。MongoDB支持的数据模型比传统的关系型数据库更加灵活,支持动态查询和索引,也支持json格式和bson格式的数据存储,这种格式可以支持读取和写入大量的数据。2.Docker安装mongoDB2.1拉取镜像1docker pull mongo:4.42.2创建mongo数据持久化目录 (看自己爱好,放哪里都行)1mkdir -p /docker_volume/mongodb/data2.3启动容器1docker run -itd --name mongo -v /docker_volume/mongodb/data:/data/db -p 27017:27017 mongo:4.4 --auth2.4创建用户   1.使用以下命令进入容器的 MongoDB shell:1docker exec -it mongo mongo admin  2.创建一个用户,mongo 默认没有用户12345678db.createUser({  user: 'root',  pwd: 'xgd@123',  roles: [    { role: 'readWriteAnyDatabase', db: 'admin' },    { role: 'dbAdminAnyDatabase', db: 'admin' }  ]});命令解释:-d:表示在后台运行容器,并返回容器 ID。-i:表示交互式运行容器,保留 STDIN 打开。--name mongo-service:为容器指定一个名称,这里为 mongo-service。--restart=always:表示在容器退出后自动重启容器。-p 27017:27017:将容器的 27017 端口映射到宿主机的 27017 端口。-v ~/data/mongodata:/data:将宿主机的 ~/data/mongodata 目录挂载到容器的 /data 目录下,用于持久化存储数据。mongo:指定要运行的镜像名称,这里为官方的 MongoDB 镜像。3.SpringBoot整合MongoDB步骤 1.导入依赖1234<dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-data-mongodb</artifactId></dependency>2.配置123456789spring:  data:    mongodb:      host: 你的ip      port: 27017      username: root      password: 123      authenticationDatabase: admin      database: test3.操作MongoDB     第一种方式使用MongoTemplate‌工具类   添加实体类123456789101112131415import lombok.Data;import org.springframework.data.mongodb.core.mapping.Document;import java.io.Serializable;import java.util.Date;/** * 映射表 test_demo */@Data@Document("test_demo")public class TestDemo implements Serializable {    private static final long serialVersionUID = 1L;    private String id;    private String name;    private Date birthDay;}  测试12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849/** * MongoDB测试类 * */@SpringBootTest(classes = MongoApplication.class)@RunWith(SpringRunner.class)public class MongoTest1 {    @Autowired    private MongoTemplate mongoTemplate;    //保存    @Test    public void saveTest(){        for (int i = 0; i < 10; i++) {            TestDemo testDemo = new TestDemo();            testDemo.setName("张三");            testDemo.setBirthDay(new Date());             mongoTemplate.save(testDemo);        }        TestDemo testDemo = new TestDemo();        testDemo.setName("张三你吃饭了吗");        testDemo.setBirthDay(new Date());        mongoTemplate.save(testDemo);    }    //查询一个    @Test    public void saveFindOne(){        TestDemo testDemo = mongoTemplate.findById("661743b77bee2f0a5739819d", TestDemo.class);        System.out.println(testDemo);        //TestDemo(id=661743b77bee2f0a5739819d, name=张三, birthDay=Thu Apr 11 09:58:15 CST 2024)    }    //条件查询    @Test    public void testQuery(){        //查询字段name为张三的数据(多条件查询)        Query query = Query.query(Criteria.where("name").is("张三"))                .with(Sort.by(Sort.Direction.DESC,"birthDay"));        // 执行查询 模糊查询 只查询5条数据        Query query1 = Query.query(Criteria.where("name").regex(".*?\\" + "张三" + ".*"));        query.limit(5);        List<TestDemo> list = mongoTemplate.find(query, TestDemo.class);        List<TestDemo> list1 = mongoTemplate.find(query1, TestDemo.class);        System.out.println("list:"+list);        System.out.println("list1:"+list1);    }    //测试删除    @Test    public void testDel(){        mongoTemplate.remove(Query.query(Criteria.where("name").is("张三")),TestDemo.class);    }}第二种方式使用继承MongoRepository   添加实体类1234567891011121314151617181920package com.xsp.spm.domain;import lombok.Data;import org.springframework.data.annotation.Id;import org.springframework.data.mongodb.core.index.Indexed;import org.springframework.data.mongodb.core.mapping.Document;/** * @Author:xsp * @Description: * @name:User * @Date:2024/9/14 17:24 */@Data@Document(collection = "users")public class User {    @Id    private String id;    @Indexed(unique = true)    private String name;    private Integer age;}repository层12345678910111213package com.xsp.spm.repository;import com.xsp.spm.domain.User;import org.springframework.data.mongodb.repository.MongoRepository;import org.springframework.stereotype.Repository;/** * @Author:xsp * @Description: * @name:UserRepository * @Date:2024/9/14 17:25 */@Repositorypublic interface UserRepository extends MongoRepository<User, String> {}controller层123456789101112131415161718192021222324252627282930313233343536373839404142434445464748package com.xsp.spm.controller;import com.xsp.spm.domain.User;import com.xsp.spm.repository.UserRepository;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.mongodb.core.MongoTemplate;import org.springframework.web.bind.annotation.*;/** * @Author:xsp * @Description: * @name:UserController * @Date:2024/9/14 17:26 */@RestControllerpublic class UserController {    @Autowired    private UserRepository userRepository;    // 添加    @PostMapping    public User createUser(@RequestBody User user) {        return userRepository.save(user);    }    // 详细    @GetMapping("/{id}")    public User getUser(@PathVariable String id) {        return userRepository.findById(id).orElse(null);    }    // 删除    @DeleteMapping("/{id}")    public void deleteUser(@PathVariable String id) {        userRepository.deleteById(id);    }    // 修改    @PutMapping("/{id}")    public User updateUser(@PathVariable String id, @RequestBody User user) {        User userFromDb = userRepository.findById(id).orElse(null);        if (userFromDb != null) {            userFromDb.setName(user.getName());            userFromDb.setAge(user.getAge());            return userRepository.save(userFromDb);        }        return null;    }    // 列表    @GetMapping    public Iterable<User> getAllUsers() {        return userRepository.findAll();    }}mongoDB 中MongoRepository和MongoTemplate的区别MongoRepository‌:      特点‌:MongoRepository是Spring Data MongoDB的一部分,它提供了一种面向对象的方式来操作MongoDB。通过继承MongoRepository接口,可以很容易地实现CRUD操作和一些基本的查询操作。这种方式不需要编写原生的查询语句,使得操作更加简单和直观‌1。‌       适用场景‌:当查询需求简单,可以通过方法名命名规则进行快速的CRUD操作时,使用MongoRepository是一个不错的选择。它适用于基本的CRUD操作和一些简单查询,但如果需要执行复杂的聚合或定制查询,可能不够灵活‌1。MongoTemplate‌:    特点‌:MongoTemplate是Spring Data MongoDB中更低级别的工具,它提供了更丰富的操作方法,包括条件查询、模糊查询、分页查询、数据修改和删除等。MongoTemplate允许更灵活的操作,包括原生的查询语句,提供了更大的灵活性来处理复杂的查询和数据操作‌2。 ‌    适用场景‌:当需要执行复杂的查询或定制操作时,MongoTemplate可以弥补MongoRepository的不足。它适用于需要更多控制和灵活性的场景,如执行复杂的聚合查询或定制的数据处理任务‌总结来说,MongoRepository适合简单的CRUD操作和一些基本的查询需求,而MongoTemplate则提供了更丰富的功能和更大的灵活性,适用于需要执行复杂查询或定制操作的场景。选择使用哪一个取决于具体的应用需求和技术栈‌
  • [技术干货] mongodb 中rs.stauts()命令参数解析 --转载
    rs.status()命令用于获取MongoDB副本集的状态信息。它提供了关于副本集中各个节点的详细信息,包括节点的健康状况、角色、选举状态等。  以下是查看一个mongo集群状态返回的参数:123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138rs0:PRIMARY> rs.status(){        "set" : "rs0",        "date" : ISODate("2024-09-14T06:44:36.882Z"),        "myState" : 1,        "term" : NumberLong(510),        "syncingTo" : "",        "syncSourceHost" : "",        "syncSourceId" : -1,        "heartbeatIntervalMillis" : NumberLong(2000),        "majorityVoteCount" : 2,        "writeMajorityCount" : 2,        "optimes" : {                "lastCommittedOpTime" : {                        "ts" : Timestamp(0, 0),                        "t" : NumberLong(-1)                },                "lastCommittedWallTime" : ISODate("1970-01-01T00:00:00Z"),                "appliedOpTime" : {                        "ts" : Timestamp(1726296270, 1),                        "t" : NumberLong(510)                },                "durableOpTime" : {                        "ts" : Timestamp(1726296270, 1),                        "t" : NumberLong(510)                },                "lastAppliedWallTime" : ISODate("2024-09-14T06:44:30.859Z"),                "lastDurableWallTime" : ISODate("2024-09-14T06:44:30.859Z")        },        "lastStableRecoveryTimestamp" : Timestamp(1725300368, 3),        "lastStableCheckpointTimestamp" : Timestamp(1725300368, 3),        "electionCandidateMetrics" : {                "lastElectionReason" : "electionTimeout",                "lastElectionDate" : ISODate("2024-09-14T06:28:20.630Z"),                "electionTerm" : NumberLong(510),                "lastCommittedOpTimeAtElection" : {                        "ts" : Timestamp(0, 0),                        "t" : NumberLong(-1)                },                "lastSeenOpTimeAtElection" : {                        "ts" : Timestamp(1726284227, 1),                        "t" : NumberLong(509)                },                "numVotesNeeded" : 2,                "priorityAtElection" : 2,                "electionTimeoutMillis" : NumberLong(10000),                "numCatchUpOps" : NumberLong(0),                "newTermStartDate" : ISODate("2024-09-14T06:28:20.830Z")        },        "members" : [                {                        "_id" : 0,                        "name" : "mongo1:27017",                        "health" : 1,                        "state" : 9,                        "stateStr" : "ROLLBACK",                        "uptime" : 987,                        "optime" : {                                "ts" : Timestamp(1726197065, 1),                                "t" : NumberLong(505)                        },                        "optimeDurable" : {                                "ts" : Timestamp(1726197065, 1),                                "t" : NumberLong(505)                        },                        "optimeDate" : ISODate("2024-09-13T03:11:05Z"),                        "optimeDurableDate" : ISODate("2024-09-13T03:11:05Z"),                        "lastHeartbeat" : ISODate("2024-09-14T06:44:35.841Z"),                        "lastHeartbeatRecv" : ISODate("2024-09-14T06:44:36.665Z"),                        "pingMs" : NumberLong(0),                        "lastHeartbeatMessage" : "",                        "syncingTo" : "mongo2:27017",                        "syncSourceHost" : "mongo2:27017",                        "syncSourceId" : 1,                        "infoMessage" : "",                        "configVersion" : 1950478                },                {                        "_id" : 1,                        "name" : "mongo2:27017",                        "health" : 1,                        "state" : 1,                        "stateStr" : "PRIMARY",                        "uptime" : 990,                        "optime" : {                                "ts" : Timestamp(1726296270, 1),                                "t" : NumberLong(510)                        },                        "optimeDate" : ISODate("2024-09-14T06:44:30Z"),                        "syncingTo" : "",                        "syncSourceHost" : "",                        "syncSourceId" : -1,                        "infoMessage" : "",                        "electionTime" : Timestamp(1726295300, 1),                        "electionDate" : ISODate("2024-09-14T06:28:20Z"),                        "configVersion" : 1950478,                        "self" : true,                        "lastHeartbeatMessage" : ""                },                {                        "_id" : 2,                        "name" : "mongo3:27017",                        "health" : 1,                        "state" : 2,                        "stateStr" : "SECONDARY",                        "uptime" : 987,                        "optime" : {                                "ts" : Timestamp(1726197065, 1),                                "t" : NumberLong(505)                        },                        "optimeDurable" : {                                "ts" : Timestamp(1726197065, 1),                                "t" : NumberLong(505)                        },                        "optimeDate" : ISODate("2024-09-13T03:11:05Z"),                        "optimeDurableDate" : ISODate("2024-09-13T03:11:05Z"),                        "lastHeartbeat" : ISODate("2024-09-14T06:44:34.930Z"),                        "lastHeartbeatRecv" : ISODate("1970-01-01T00:00:00Z"),                        "pingMs" : NumberLong(1),                        "lastHeartbeatMessage" : "",                        "syncingTo" : "",                        "syncSourceHost" : "",                        "syncSourceId" : -1,                        "infoMessage" : "",                        "configVersion" : 1829326                }        ],        "ok" : 1,        "$clusterTime" : {                "clusterTime" : Timestamp(1726296270, 1),                "signature" : {                        "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),                        "keyId" : NumberLong(0)                }        },        "operationTime" : Timestamp(1726296270, 1)}rs0:PRIMARY>以下是rs.status()响应字段的意义及其对应值的整理:字段意义示例值set副本集的名称"rs0"date响应生成的时间ISODate("2024-09-14T06:44:36Z")myState当前节点的状态(1: PRIMARY, 2: SECONDARY, 3: RECOVERING等):常见的状态包括:PRIMARY (1): 当前节点是主节点,负责处理所有写入操作。SECONDARY (2): 当前节点是从节点,复制主节点的数据并提供读取服务。ARBITER (7): 当前节点是仲裁者,不存储数据,仅参与选举过程。OTHER (8): 当前节点的状态不属于上述任何一种,通常是由于配置或网络问题。RECOVERING (9): 当前节点正在恢复中,通常是从不健康状态恢复。DOWN (10): 当前节点不可用,可能是由于网络问题或故障。STARTUP (11): 当前节点正在启动,尚未完成初始化。STARTUP2 (12): 当前节点在启动的第二阶段,正在进行数据同步。UNKNOWN (13): 当前节点的状态未知,可能是由于网络分区或其他问题。1term当前选举周期510lastElectionReason最近一次选举的原因"electionTimeout"members副本集成员的详细信息数组,包含各个节点的信息health节点的健康状态(1: 健康, 0: 不健康)1stateStr节点的状态描述(如 PRIMARY, SECONDARY, ROLLBACK等)"SECONDARY"uptime节点的运行时间(秒)987optime最近一次操作的时间戳Timestamp(1726197065, 1)optimeDurable最近一次持久化操作的时间戳Timestamp(1726197065, 1)optimeDate最近一次操作的日期ISODate("2024-09-13T03:11:05Z")optimeDurableDate最近一次持久化操作的日期ISODate("2024-09-13T03:11:05Z")lastHeartbeat最近一次心跳信号的时间ISODate("2024-09-14T06:44:34.930Z")lastHeartbeatRecv最近一次接收到心跳信号的时间ISODate("1970-01-01T00:00:00Z")pingMs节点的延迟(毫秒)NumberLong(1)lastHeartbeatMessage最近一次心跳的消息""syncingTo当前节点正在同步的目标节点""syncSourceHost当前节点的同步源主机""syncSourceId当前节点的同步源ID-1infoMessage额外的信息消息""configVersion配置版本号1829326$clusterTime集群时间信息包含 clusterTime 和 signatureoperationTime最近一次操作的时间Timestamp(1726296270, 1)使用场景故障排查: 当副本集出现问题时,使用rs.status()可以快速定位故障节点。性能监控: 定期检查副本集状态,以确保所有节点正常运行并及时发现性能瓶颈。维护操作: 在进行维护或升级操作前,确认副本集的健康状况。选举监控: 监控选举过程,确保主节点的选举和切换正常进行。
  • [技术干货] 批量清理mongodb历史数据的方法详解 --转载
    批量清理mongodb历史数据清理程序的原来目前项目组上很多平台上线历史数据积压,导致入库查询数据缓慢,历史数据有些已经归档,进行历史数据清理删除。之前临时写shell脚本,太简陋,重新使用Python进行改造,新增备份功能,和配置文件删除指定字段和时间范围内数据。代码篇123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322#!/usr/local/python3/bin/python3 import configparser,logging.config,sys,os,subprocessimport pymongo,ast# from pymongo import MongoClientfrom datetime import datetime,timedeltafrom urllib import parse def init_mongodb(MongoDBAuth):    if mongodb_auth:        username = parse.quote_plus(mongodb_user)        password = parse.quote_plus(mongodb_passwd)        ConnPasswd = "mongodb://" + username + ":" + password + "@" + mongodb_ip + ":" + mongodb_port + "/"        try:            clients = pymongo.MongoClient(ConnPasswd)            logger.info("init mongodb conn: " + ConnPasswd)            return clients        except  Exception as e:            logger.info("use mongodb user pass conn err: " +  str(e))            return False    else:        try:            clients = pymongo.MongoClient(mongodb_ip, int(mongodb_port))            logger.info("init mongodb conn: " + mongodb_ip +":" +mongodb_port)            return clients        except  Exception as e:            logger.info("use mongodb user pass conn err: " + str(e))            return False #查看出全部dbdef get_mongodb_dbname():    db_names_list = []    db_names  = mongo_client.list_database_names()    for db_name  in db_names:        db_names_list.append(db_name)    for filter_dbname in need_filter_dbname_list:        if filter_dbname in db_names_list:            db_names_list.remove(filter_dbname)            logger.info("delete need filter dbname: " + filter_dbname)    # logger.info("get all db_name: " +str(db_names_list))    return db_names_list #查询出db中全部表def get_mongodb_tables(entid):    db_collections_list = []    db=mongo_client[entid]    collections = db.list_collection_names()    for collection  in collections:        db_collections_list.append(collection)    logger.debug("get " + entid + " all collections: " +str(db_collections_list))    return db_collections_list #查询集合中索引索引和是否分片def get_index_key_tables(entid,collection_name):    index_list = []    formatted_results = []    db=mongo_client[entid]    collection=db[collection_name]    indexes = collection.list_indexes()    ns_name = entid + "." + collection_name    for result  in indexes:        formatted_result = {k.upper(): v for k, v in result.items()}        each_key = formatted_result.get("KEY")        ns_name = formatted_result.get("NS")        ok_index = {key: value for key, value in each_key.items()}        index_list.append(ok_index)    index_list = result = [d for d in index_list if not (isinstance(d, dict) and '_id' in d and d['_id'] == 1)]     collection_stats = db.command("collstats", collection_name)    collection_sharded = collection_stats.get("sharded", False)    if len(index_list) != 0:        logger.debug("get collection " + ns_name + " index: " +str(index_list))    #logger.info("get now In the collection " + ns_name + " sharded status: " +str(collection_sharded))    return index_list,collection_sharded  #创建集合索引def craete_index(entid,collection_name,index):    db=mongo_client[entid]    collection=db[collection_name]    logger.info("need craete index: " + entid +"."+collection_name + " : "+ str(index))         # index = (list(index.keys())[0], list(index.values())[0])    index = [(k, v) for k, v in index.items()]    result = collection.create_index(index)    logger.info("mongodb " +entid +"."+collection_name + " create index return msg: " + str(result) ) #查看对应dbname是否已经是shards,弃用def is_database_sharded(database_name):    db = mongo_client["admin"]    sharded_databases = db.command("listshards")["shards"]    for shard in sharded_databases:        if database_name in db.command("listdatabases")["databases"]:            return True    return False #创建分片索引片键def create_sharded_func(entid, collection_name, shard_key):    db = mongo_client["admin"]    collection_path = '{}.{}'.format(entid, collection_name)    logger.info("need craete sharded key : " + collection_path + " : " + str(shard_key))    sharding_colunm,sharding_type =  "",""    for key, value in shard_key.items():        sharding_colunm= key         sharding_type = value    try:        db.command('enableSharding', entid)    except  Exception as e:        logger.error("create dbname sharded key error: return: " + str(e))     try:        result = db.command('shardCollection', collection_path,key = {sharding_colunm:sharding_type})        logger.info(entid + "." + collection_path + " create sharded key return: " + str(result))    except  Exception as e:        logger.error("create sharded key error: return: " + str(e)) #读取文件获取对应索引和片键key信息def read_file_index(index_file):    index_list = []    Shard_list = []    with open(index_file, 'r') as f:        for line in f.readlines():            line = line.replace(" ", "")            #通过mongodbShard: 来区分那个片键的可以,写            # print(line)            if "mongodbShard:" not in line:                table, key_str = line.strip().split("=")                key = ast.literal_eval(key_str)                index_list.append({table: key})            else:                Shard_key_str = line.strip().split("mongodbShard:")[1]                Shard_key_str = ast.literal_eval(Shard_key_str)                Shard_list.append(Shard_key_str)    return index_list,Shard_list #获取多少天前的时间戳def get_timestamp_days_ago(get_days):    # 获取当前日期和时间    now = datetime.now()    # 减去30天    date_30_days_ago = now - timedelta(days=int(get_days))    # 将结果转换为当天的整点00:00:00    date_start_of_day  = date_30_days_ago.replace(hour=0, minute=0, second=0, microsecond=0)    # 将结果转换为时间戳    timestamp = int(date_start_of_day .timestamp())    return timestamp #判断字符串类型和长度对应返回需要删除的时间字段值def if_string_type(data_stamp):    del_timestamp = ""    get_need_del_timestamp =  get_timestamp_days_ago(int(Del_day))    if isinstance(data_stamp, str) and  len(data_stamp) == 10:        del_timestamp = str(get_need_del_timestamp)     if isinstance(data_stamp, str) and  len(data_stamp) == 13:        del_timestamp = str(get_need_del_timestamp) + "000"     if isinstance(data_stamp, int) and  len(str(data_stamp)) == 10:        del_timestamp = get_need_del_timestamp     if isinstance(data_stamp, int) and  len(str(data_stamp)) == 13:        del_timestamp = int(get_need_del_timestamp) * 1000     return del_timestamp #获取该集合中一条数据def get_one_data(entid,collection_name):    db=mongo_client[entid]    collection=db[collection_name]    Filter_conditions_key = str(need_del_table_field)    result = collection.find_one({}, {**{Filter_conditions_key: 1}, '_id': 0})    if result and Filter_conditions_key in result:        start_time_value = result.get(Filter_conditions_key)        logger.debug("get "+ entid + "." + collection_name + " Corresponding " +Filter_conditions_key + " field value: " + str(start_time_value) )        return start_time_value    else:        # logger.info("No " +Filter_conditions_key + " field found in the document. return: " + str(result) )        return False # 按照日期删除该集合中历史数据def del_data(entid,collection_name,get_del_timestamp):    db=mongo_client[entid]    collection=db[collection_name]    Filter_conditions_key = str(need_del_table_field)    Filter_conditions_value = get_del_timestamp    try:        result = collection.delete_many({Filter_conditions_key: {"$lt": Filter_conditions_value}})        logger.info(entid +" run sql: db"+"."+collection_name+".remove({"+Filter_conditions_key+ ":"+"{$lt:"+str(Filter_conditions_value) +"})")        if result.deleted_count > 0:            logger.info("By date delete " + str(entid) + "." + collection_name + " less than " + str(get_del_timestamp) + " del document count: " + str(result.deleted_count))    except Exception as e:        logger.error("Error occurred while deleting documents: " + str(e)) # 删除该集合中全部历史数据def del_all_data(entid,collection_name):    db=mongo_client[entid]    collection=db[collection_name]    try:        result = collection.delete_many({})        if result.deleted_count > 0:            logger.info(entid + " run sql: db"+"."+collection_name+".remove({})")            logger.info(entid + "." + collection_name +  " del all document count: " + str(result.deleted_count))    except Exception as e:        logger.info(entid + "." + collection_name +   " del all document error: " + str(result) ) # 备份数据def dump_mongodb_data(dbname,table,not_quiet_dump,del_time):    status_info = ["1"]    if is_del_bakcup_data:                 if os.path.exists(mongodump_command_path):            run_status = " && echo $?"            run_commnd = ""            if not_quiet_dump:                if mongodb_auth:                    #run_commnd =  mongodump_command_path + " -h " + mongodb_ip + ":" + str(mongodb_port) + " --authenticationDatabase=" +mongodb_auth_db + " -u " + mongodb_user + " -p " + mongodb_passwd +  " -d " + dbname + " -c " + table  + " -q '{" + need_del_table_field + ": {" +   +  "}}'"  + " -o " +  bakcup_dir_path                    run_command = f"{mongodump_command_path} -h {mongodb_ip}:{mongodb_port} --authenticationDatabase={mongodb_auth_db} -u {mongodb_user} -p {mongodb_passwd} -d {dbname} -c {table} -q '{{\"{need_del_table_field}\": {{\"$lt\": \"{del_time}\"}}}}' -o {bakcup_dir_path}"                else:                    # run_commnd =  mongodump_command_path + " -h " + mongodb_ip + ":" + str(mongodb_port) + " -d " + dbname + " -c " + table  + " -o " +  bakcup_dir_path                    run_commnd =   f"{mongodump_command_path} -h {mongodb_ip}:{mongodb_port} -d {dbname} -c {table}  -q '{{\"{need_del_table_field}\": {{\"$lt\": \"{del_time}\"}}}}' -o {bakcup_dir_path}"            else:                if mongodb_auth:                    # run_commnd =  mongodump_command_path + " -h " + mongodb_ip + ":" + str(mongodb_port) + " --authenticationDatabase=" +mongodb_auth_db + " -u " + mongodb_user + " -p " + mongodb_passwd +  " -d " + dbname + " -c " + table  + " -o " +  bakcup_dir_path                    run_command = f"{mongodump_command_path} -h {mongodb_ip}:{mongodb_port} --authenticationDatabase={mongodb_auth_db} -u {mongodb_user} -p {mongodb_passwd} -d {dbname} -c {table}  -o {bakcup_dir_path}"                 else:                    # run_commnd =  mongodump_command_path + " -h " + mongodb_ip + ":" + str(mongodb_port) + " -d " + dbname + " -c " + table  + " -o " +  bakcup_dir_path                    run_commnd =   f"{mongodump_command_path} -h {mongodb_ip}:{mongodb_port} -d {dbname} -c {table}  -o {bakcup_dir_path}"            logger.info("run command: " + run_commnd)            try:                msg = os.popen(run_commnd + run_status)                status_info = [line.strip() for line in msg.readlines()]                logger.info("mongodump command result: " + str(status_info))            except Exception as e:                logger.error("mongodump command error: " + str(e))        else:            logger.info("mongodump command file not exists ," +  mongodump_command_path)    else:        logger.debug("config file not set is_del_bakcup_data = True, not dump data")    return status_info if __name__=="__main__":    cfgpath = "./cfg/config.ini"    conf = configparser.ConfigParser()    conf.read(cfgpath)    mongodb_ip = conf.get("main", "mongodb_ip")    mongodb_port = conf.get("main", "mongodb_port")    mongodb_auth = conf.getboolean("main", "mongodb_auth")    mongodb_user = conf.get("main", "mongodb_user")    mongodb_passwd = conf.get("main", "mongodb_passwd")    mongodb_auth_db = conf.get("main", "mongodb_auth_db")    need_filter_dbname = conf.get("main", "need_filter_dbname")    is_del_bakcup_data = conf.getboolean("main", "is_del_bakcup_data")    bakcup_dir_path = conf.get("main", "bakcup_dir_path")    mongodump_command_path = conf.get("main", "mongodump_command_path")    Del_day = conf.get("main", "Del_day")    need_del_table_field = conf.get("main", "need_del_table_field")    need_del_table_list = conf.get("main", "need_del_table_list")    need_del_table_list = [item for item in need_del_table_list.split(",") if item != '']     need_del_null_table_list = conf.get("main", "need_del_null_table_list")    need_del_null_table_list = [item for item in need_del_null_table_list.split(",") if item != '']    auth_get_entid = conf.getboolean("main", "auth_get_entid")    need_filter_dbname_list = [item for item in need_filter_dbname.split(",") if item != '']         #获取配置项    all_ent_id = conf.get("main", "ent_id")    get_dbname_list = all_ent_id.split(",")    logging.config.fileConfig("./cfg/logger.conf")    logger = logging.getLogger("rotatfile")     # 初始化 MongoDB    mongo_client = init_mongodb(mongodb_auth)    if mongo_client:        logger.info("MongoDB init successfully")    else:        logger.error("Failed to initialize MongoDB")        sys.exit(10)     if auth_get_entid:        get_dbname_list = get_mongodb_dbname()        logger.info("get all dbname list: " + str(get_dbname_list))    else:        logger.info("file get dbname list: " + str(get_dbname_list))     for dbname in get_dbname_list:        get_end_all_table = get_mongodb_tables(dbname)        for table in need_del_table_list:            get_one_data_mes = get_one_data(dbname,table)            if table in get_end_all_table:                get_index_key_tables(dbname,table)            else:                logger.error(dbname + " not have table: " + table)                continue                # break            #删除按照日期数据            if get_one_data_mes:                get_del_timestmap = if_string_type(get_one_data_mes)                if dump_mongodb_data(dbname,table,True,get_del_timestmap)[0] == '0' or is_del_bakcup_data == False:                    if get_del_timestmap:                        del_data(dbname,table,get_del_timestmap)                    else:                        logger.error("get del timestmap fail")                else:                    if is_del_bakcup_data == False:                        logger.error("is_del_bakcup_data seting False, dump mongodb data fail")                    else:                        logger.error("dump mongodb data fail, but is del backup data")        for null_table in need_del_null_table_list:            if dump_mongodb_data(dbname,null_table,False,"1")[0] == '0'  or is_del_bakcup_data == False:                if null_table in get_end_all_table:                    #删除全部历史数据                    del_all_data(dbname,null_table)                else:                    logger.error( dbname +  " not have table: " + null_table)            else:                if is_del_bakcup_data == False:                    logger.error("is_del_bakcup_data seting False, dump mongodb data fail")                else:                    logger.error("dump mongodb data fail, but is del backup data")    mongo_client.close()    logger.info("MongoDB closed")配置文件篇该配置项大概使用说明支持删除指定时间前,进行数据备份在删除,根据不同配置项进行配置;同理可支持不进行备份,也可以清理删除,根据不同配置项进行配置;根据字段来查询过滤。123456789101112131415161718192021222324252627282930[DEFAULT]mongodb_ip = 10.130.47.197mongodb_port = 40000mongodb_auth = Falsemongodb_user = adminmongodb_passwd =  test@123mongodb_auth_db = admin#从全部dbname中进行过滤不需要处理的dbname,使用逗号分割need_filter_dbname = local,config,admin#指定需要按照日期删除的集合,使用逗号分割need_del_table_list = new_r_ags_e_back,call_detail_back #指定需要按照日期删除的集合字段过滤need_del_table_field = start_time#指定清空删除的集合,使用逗号分割need_del_null_table_list = call_duration_cache,duration_cache [main]#是否自动获取对应mongodb中全部dbnameauth_get_entid = False#从配置文件中获取dbnameent_id  = 20241205,20250107#需要删除多少天以前的数据Del_day = 97#是否需要备份数据is_del_bakcup_data = False#备份目录bakcup_dir_path = ./data#备份命令路径mongodump_command_path = /home/devops/Python/Mongodb_del_history/mongodump脚本运行脚本运行1234567891011121314[devops@db1 Mongodb_del_history]$ tar xf Mongodb_del_history.tar.gz[devops@db1 Mongodb_del_history]$ cd Mongodb_del_history[devops@db1 Mongodb_del_history]$ nohup ./del_history_data &2025-01-06 14:15:01 139749303605056 del_history_data.py:24 INFO init mongodb conn: 10.130.47.197:400002025-01-06 14:15:01 139749303605056 del_history_data.py:303 INFO MongoDB init successfully2025-01-06 14:15:01 139749303605056 del_history_data.py:39 INFO delete need filter dbname: local2025-01-06 14:15:01 139749303605056 del_history_data.py:310 INFO get all dbname list: ['0103290010', '0103290012', '0103290013', '0103290015']2025-01-06 14:15:01 139749303605056 del_history_data.py:321 ERROR 0103290010 not have table: jhk_task_status2025-01-06 14:15:01 139749303605056 del_history_data.py:321 ERROR 0103290010 not have table: sd_call_detail_back2025-01-06 14:15:01 139749303605056 del_history_data.py:229 INFO run command: /home/devops/Python/Mongodb_del_history/mongodump -h 10.130.47.197:40000 -d 0103290010 -c call_duration_cache  -o ./data2025-01-06 14:15:01 139749303605056 del_history_data.py:233 INFO mongodump command result: ['0']2025-01-06 14:15:01 139749303605056 del_history_data.py:229 INFO run command: /home/devops/Python/Mongodb_del_history/mongodump -h 10.130.47.197:40000 -d 0103290010 -c duration_cache  -o ./data2025-01-06 14:15:01 139749303605056 del_history_data.py:233 INFO mongodump command result: ['0']2025-01-06 14:15:01 139749303605056 del_history_data.py:347 INFO MongoDB closed二进制文件程序下载使用链接下载1wget https://zhao138969.com/LinuxPackage/Python/del_history_data 
  • [技术干货] Node.js和Python进行连接与操作MongoDB的全面指南 --转载
    1. MongoDB简介1.1 什么是MongoDBMongoDB是一个基于分布式文件存储的NoSQL数据库,采用BSON(Binary JSON)格式存储数据。相较于传统的关系型数据库(如MySQL、PostgreSQL),MongoDB具有以下优势:无模式(Schema-less):数据结构灵活,字段可动态调整。高性能:支持索引、分片和副本集,适用于高并发场景。水平扩展:通过分片(Sharding)实现数据分布式存储。丰富的查询语言:支持CRUD、聚合管道、地理空间查询等。1.2 适用场景实时数据分析(如日志、用户行为分析)内容管理系统(CMS)物联网(IoT)数据存储微服务架构下的数据存储2. Node.js连接MongoDB2.1 使用官方MongoDB驱动Node.js的官方MongoDB驱动(mongodb包)提供了最基础的数据库操作能力。安装驱动1npm install mongodb基本连接与操作1234567891011121314151617181920212223const { MongoClient } = require('mongodb');  const uri = 'mongodb://localhost:27017';const client = new MongoClient(uri);  async function run() {  try {    await client.connect();    const db = client.db('mydb');    const collection = db.collection('users');      // 插入数据    await collection.insertOne({ name: 'Alice', age: 25 });      // 查询数据    const users = await collection.find({ age: { $gt: 20 } }).toArray();    console.log(users);  } finally {    await client.close();  }}  run().catch(console.error);特点✅ 轻量级,适合简单查询✅ 直接操作BSON,性能较高❌ 需要手动管理Schema2.2 使用Mongoose(ODM)Mongoose是一个基于MongoDB驱动的ODM(对象文档映射)库,提供了Schema定义、数据校验、中间件等功能。安装Mongoose1npm install mongoose定义Schema并操作123456789101112131415161718192021const mongoose = require('mongoose');  // 连接数据库mongoose.connect('mongodb://localhost:27017/mydb');  // 定义Schemaconst UserSchema = new mongoose.Schema({  name: { type: String, required: true },  age: { type: Number, default: 18 }});  // 定义Modelconst User = mongoose.model('User', UserSchema);  // 插入数据const user = new User({ name: 'Bob' });await user.save();  // 查询数据const users = await User.find({ age: { $gte: 20 } });console.log(users);特点✅ Schema管理,避免无效数据✅ 内置数据校验(如required、default)✅ 支持中间件(pre/post hooks)❌ 稍重的封装,性能略低于原生驱动3. Python连接MongoDB3.1 使用PyMongo(官方驱动)PyMongo是Python的官方MongoDB驱动,提供类似Node.js原生驱动的操作方式。安装PyMongo1pip install pymongo基本操作12345678910111213from pymongo import MongoClient  client = MongoClient("mongodb://localhost:27017/")db = client["mydb"]collection = db["users"]  # 插入数据collection.insert_one({"name": "Charlie", "age": 30})  # 查询数据users = collection.find({"age": {"$gt": 20}})for user in users:    print(user)特点✅ Pythonic API,易上手✅ 支持同步/异步(Motor)❌ 无Schema管理3.2 使用Motor(异步驱动)Motor是PyMongo的异步版本,适用于asyncio框架(如FastAPI、Tornado)。安装Motor1pip install motor异步操作示例12345678910111213141516import asynciofrom motor.motor_asyncio import AsyncIOMotorClient  async def main():    client = AsyncIOMotorClient("mongodb://localhost:27017")    db = client["mydb"]    collection = db["users"]      # 插入数据    await collection.insert_one({"name": "Dave", "age": 28})      # 查询数据    async for user in collection.find({"age": {"$gt": 25}}):        print(user)  asyncio.run(main())特点✅ 异步非阻塞,适合高并发✅ 与PyMongo API兼容❌ 需配合asyncio使用4. 高级操作4.1 索引优化索引能大幅提升查询性能,MongoDB支持单字段、复合、全文索引等。Node.js示例123456// 创建索引await collection.createIndex({ name: 1 }, { unique: true });  // 查看索引const indexes = await collection.listIndexes().toArray();console.log(indexes);Python示例123456# 创建索引collection.create_index([("name", pymongo.ASCENDING)], unique=True)  # 查看索引for index in collection.list_indexes():    print(index)4.2 聚合查询MongoDB的聚合管道(Aggregation Pipeline)支持复杂数据分析。Node.js示例1234const result = await collection.aggregate([  { $match: { age: { $gt: 20 } } },  { $group: { _id: "$name", total: { $sum: 1 } } }]).toArray();Python示例1234result = collection.aggregate([    {"$match": {"age": {"$gt": 20}}},    {"$group": {"_id": "$name", "total": {"$sum": 1}}}])5. 最佳实践1.连接池管理:避免频繁创建/关闭连接,使用长连接。2.错误处理:捕获网络异常、查询错误。3.生产环境配置:使用mongodb+srv://连接Atlas集群启用TLS加密4.性能优化:合理使用索引避免全表扫描($where)使用投影(projection)减少返回字段
  • [技术干货] 基于MongoDB实现文件的分布式存储 --转载
    一、引言当系统存在大量的图片、视频、文档等文件需要存储和管理时,对于分布式系统而言,如何高效、可靠地存储这些文件是一个关键问题。MongoDB 的 GridFS 作为一种分布式文件存储机制,为我们提供了一个优秀的解决方案。它基于 MongoDB 的分布式架构,能够轻松应对海量文件存储的挑战,同时提供了便捷的文件操作接口。二、GridFS 原理剖析GridFS 是 MongoDB 中用于存储大文件的一种规范。它将文件分割成多个较小的 chunks(默认大小为 256KB),并将这些 chunks 存储在 fs.chunks 集合中,而文件的元数据(如文件名、大小、创建时间、MIME 类型等)则存储在 fs.files 集合中。这样的设计不仅能够突破 MongoDB 单个文档大小的限制(默认 16MB),还能利用 MongoDB 的分布式特性,实现文件的分布式存储和高效读取。例如,当我们上传一个 1GB 的视频文件时,GridFS 会将其切分为约 4096 个 256KB 的 chunks,然后将这些 chunks 分散存储在不同的 MongoDB 节点上,同时在 fs.files 集合中记录文件的相关信息。三、Spring Boot 集成 GridFS在实际项目中,我们通常使用 Spring Boot 与 MongoDB 结合,下面是具体的集成步骤与代码示例。3.1 添加依赖在 pom.xml 文件中添加 Spring Boot 与 MongoDB 相关依赖:12345678910<dependencies>    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-data-mongodb</artifactId>    </dependency>    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-web</artifactId>    </dependency></dependencies>3.2 配置 MongoDB 连接在 application.properties 中配置 MongoDB 的连接信息:12spring.data.mongodb.uri=mongodb://localhost:27017/fsspring.data.mongodb.database=fs3.3 编写服务类使用 GridFsTemplate 和 GridFSBucket 来实现文件的上传、下载、删除等操作:123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990@Servicepublicclass MongoFsStoreService implements FsStoreService {      privatefinal GridFsTemplate gridFsTemplate;      private GridFSBucket gridFSBucket;      public MongoFsStoreService(GridFsTemplate gridFsTemplate) {        this.gridFsTemplate = gridFsTemplate;    }      @Autowired(required = false)    public void setGridFSBucket(GridFSBucket gridFSBucket) {        this.gridFSBucket = gridFSBucket;    }      /**     * 上传文件     * @param in     * @param fileInfo     * @return     */    @Override    public FileInfo uploadFile(InputStream in, FileInfo fileInfo){        ObjectId objectId = gridFsTemplate.store(in, fileInfo.getFileId(), fileInfo.getContentType(), fileInfo);        fileInfo.setDataId(objectId.toString());        return fileInfo;    }      /**     *     * @param in     * @param fileName     * @return     */    @Override    public FileInfo uploadFile(InputStream in, String fileName) {        FileInfo fileInfo = FileInfo.fromStream(in, fileName);        return uploadFile(in, fileInfo);    }      /**     *     * @param fileId     * @return     */    @Override    public File downloadFile(String fileId){        GridFsResource gridFsResource = download(fileId);        if( gridFsResource != null ){            GridFSFile gridFSFile = gridFsResource.getGridFSFile();            FileInfo fileInfo = JsonHelper.convert(gridFSFile.getMetadata(), FileInfo.class);              try(InputStream in = gridFsResource.getInputStream()) {                return FileHelper.newFile( in, fileInfo.getFileId() ); //            } catch (IOException e) {                thrownew RuntimeException(e);            }        }        returnnull;    }      /**     * 查找文件     * @param fileId     * @return     */    public GridFsResource download(String fileId) {        GridFSFile gridFSFile = gridFsTemplate.findOne(Query.query(GridFsCriteria.whereFilename().is(fileId)));        if (gridFSFile == null) {            returnnull;        }          if( gridFSBucket == null ){            return gridFsTemplate.getResource(gridFSFile.getFilename());        }        GridFSDownloadStream downloadStream = gridFSBucket.openDownloadStream(gridFSFile.getObjectId());        returnnew GridFsResource(gridFSFile, downloadStream);    }      /**     * 删除文件     * @param fileId     */    @Override    public void deleteFile(String fileId) {        gridFsTemplate.delete(Query.query(GridFsCriteria.whereFilename().is(fileId)));    }  }3.4 创建控制器提供 REST API 接口,方便外部调用:12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182@RestController@RequestMapping("/mongo")publicclass MongoFsStoreController {      privatefinal MongoFsStoreService mongoFsStoreService;      public MongoFsStoreController(MongoFsStoreService mongoFsStoreService) {        this.mongoFsStoreService = mongoFsStoreService;    }      /**     *     * @param file     * @return     */    @RequestMapping("/upload")    public ResponseEntity<Result> uploadFile(@RequestParam("file") MultipartFile file){        try(InputStream in = file.getInputStream()){            FileInfo fileInfo = convertMultipartFile(file);            return ResponseEntity.ok( Result.ok(mongoFsStoreService.uploadFile(in, fileInfo)) );        }catch (Exception e){            return ResponseEntity.ok( Result.fail(HttpStatus.INTERNAL_SERVER_ERROR.value(), e.getMessage()) );        }    }      private FileInfo convertMultipartFile(MultipartFile file){        FileInfo fileInfo = new FileInfo();        fileInfo.setType(FilenameUtils.getExtension(file.getOriginalFilename()));        fileInfo.setFileId(UUID.randomUUID().toString() + "." + fileInfo.getType()); //        fileInfo.setFileName(file.getOriginalFilename());        fileInfo.setSize(file.getSize());        fileInfo.setContentType(file.getContentType());        fileInfo.setCreateTime(new Date());        return fileInfo;    }      /**     *     * @param fileId     * @param response     */    @RequestMapping("/download")    public void downloadFile(@RequestParam("fileId") String fileId, HttpServletResponse response){        File file = mongoFsStoreService.downloadFile(fileId);        if( file != null ){            response.setContentType("application/octet-stream");            response.setHeader("Content-Disposition", "attachment; filename=\"" + file.getName() + "\"");            try {                FileUtils.copyFile(file, response.getOutputStream());            } catch (IOException e) {                thrownew RuntimeException(e);            }        }    }      @RequestMapping("/download/{fileId}")    public ResponseEntity<InputStreamResource> download(@PathVariable("fileId") String fileId) throws IOException {        GridFsResource resource = mongoFsStoreService.download(fileId);        if( resource != null ){            GridFSFile gridFSFile = resource.getGridFSFile();            FileInfo fileInfo = JsonHelper.convert(gridFSFile.getMetadata(), FileInfo.class);              return ResponseEntity.ok()                    .header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + fileInfo.getFileName() + "\"")                    .contentLength(fileInfo.getSize())//                    .contentType(MediaType.parseMediaType(fileInfo.getContentType()))                    .body(new InputStreamResource(resource.getInputStream()));        }//        return ResponseEntity.noContent().build();        return ResponseEntity.internalServerError().build();    }      /**     *     * @param fileId     * @return     */    @RequestMapping("/delete")    public ResponseEntity<String> deleteFile(@RequestParam("fileId") String fileId){        mongoFsStoreService.deleteFile(fileId);        return ResponseEntity.ok("删除成功");    }四、实战中的常见问题与解决方案4.1 文件下载时的内存管理在下载文件时,GridFSDownloadStream 提供了流式处理的能力,避免一次性将整个文件加载到内存中。我们可以通过 GridFsResource 将流包装后直接返回给客户端,实现边读边传,从而节省内存。例如:123// 正确:直接返回 InputStreamResource,边读边传return ResponseEntity.ok()       .body(new InputStreamResource(resource.getInputStream()));而应避免将整个文件读取到字节数组中再返回,如以下错误示例:1234// 错误:将整个文件加载到内存再返回byte[] content = resource.getInputStream().readAllBytes(); return ResponseEntity.ok()       .body(content);
  • [技术干货] MongoDB部署超详细步骤记录 --转载
    一、MongoDB安装配置1. 下载安装包12# https://www.mongodb.com/try/download/communitywget https://fastdl.mongodb.org/linux/mongodb-linux-x86_64-rhel70-7.0.14.tgz2. 解压1tar fx mongodb-linux-x86_64-rhel70-7.0.14.tgz -C /usr/local/3. 创建软链接1ln -s /usr/local/mongodb-linux-x86_64-rhel70-7.0.14/ /usr/local/mongodb4. 创建数据和日志目录12mkdir /usr/local/mongodb/{data,logs}touch /usr/local/mongodb/logs/mongodb.log5. 设置环境变量123vim /etc/profileexport MONGODB_HOME=/usr/local/mongodbexport PATH=$MONGODB_HOME/bin:$PATH6. 生效环境变量1source /etc/profile7. 修改配置文件123456789101112131415vim /etc/mongodb.conf#指定数据库路径dbpath=/usr/local/mongodb/data#指定MongoDB日志文件logpath=/usr/local/mongodb/logs/mongodb.log# 使用追加的方式写日志logappend=true#端口号port=27017 #方便外网访问bind_ip=0.0.0.0fork=true # 以守护进程的方式运行MongoDB,创建服务器进程#auth=true #启用用户验证#bind_ip=0.0.0.0 #绑定服务IP,若绑定127.0.0.1,则只能本机访问,不指定则默认本地所有IP#replSet=single #开启oplog日志用于主从复制8. 启动和关闭服务1234# 启动mongod -f /etc/mongodb.conf# 关闭mongod --shutdown -f /etc/mongodb.conf9. 验证12ps -ef|grep mongodbnetstat -ntlp|grep 27017二、MongoDB Shell安装1. 下载安装包12# 下载链接:https://www.mongodb.com/try/download/shellwget https://downloads.mongodb.com/compass/mongosh-2.3.2-linux-x64.tgz2. 解压1tar fx mongosh-2.3.2-linux-x64.tgz3 . 修改命令目录1cp mongosh-2.3.2-linux-x64/bin/mongosh /usr/local/bin/4. 登录1234# 不需要认证mongosh# 需要认证mongosh mongodb://192.168.9.25:27017/admin -u "admin" -p "abc123456"三、常用命令合集1. 角色操作1)管理员角色1234567# 只能创建在admin逻辑库readAnyDatabase: 只可以把用户创建在admin逻辑库中,允许读取任何逻辑库readWriteAnyDatabase: 只可以把用户创建在admin逻辑库中,允许读写任何逻辑库dbAdminAnyDatabase: 只可以把用户创建在admin逻辑库中,允许管理任何逻辑库userAdminAnyDatabase: 只可以把用户创建在admin逻辑库中,允许管理任何逻辑库用户clusterAdmin: 只可以把用户创建在admin逻辑库中,允许管理MongoDB集群root: 只可以把用户创建在admin逻辑库中,超级管理员,拥有最高权限2)普通角色12345# 在指定逻辑库上创建Read: 允许用户读取指定逻辑库readWrite: 允许用户读写指定逻辑库dbAdmin: 可以管理指定的逻辑库userAdmin: 可以管理指定逻辑库的用户3)创建角色123456# 创建管理员use admindb.createUser({user:"admin",pwd:"abc123456",roles:[{role:"root",db:"admin"}]})# 创建普通角色use commondb.createUser({user:"qyc",pwd:"abc123456",roles:[{role:"dbAdmin",db:"common"},{role:"readWrite",db:"common"}]})4)查询角色123456# 查询所有db.system.users.find().pretty()show users# 查询指定角色db.getUser('qyc')db.runCommand({usersInfo:"qyc"})5) 更新角色1db.updateUser('qyc',{'roles':[{'role':'userAdmin','db':'common'},{'role':'read','db':'common'}]})6) 修改角色密码1db.changeUserPassword("qyc", "123456")7) 删除角色1db.dropUser('qyc')8) 角色认证1db.auth('qyc','123456')2. 数据库操作1)查看所有库1show dbs2) 切换库12# 切换到指定库,不存在会自动创建use common3)查看当前库1db4)删除当前库1db.dropDatabase()3. 集合操作1)创建集合1db.createCollection("student")2)查看集合1show collections3)重命名集合1db.student.renameCollection("stu")4) 查看集合记录数量1db.student.count()5) 查看集合数据空间容量12345# db.student.dataSize()# 查看集合总大小(字节为单位)db.student.totalSize()# 查看集合的统计信息db.student.stats()6) 删除集合1db.student.drop()4. 文档操作1)在集合中插入文档123456# 插入单条db.student.insertOne({name:"Scott",sex:"male",age:25,city:"Beijing"})# 插入多条,save在_id主键存在就更新,不存在就插入db.student.insert([{name:"Scott3",sex:"male",age:22,city:"Beijing"},{name:"Scott2",sex:"male",age:22,city:"Beijing"}])db.student.insertMany([{name:"Scott3",sex:"male",age:22,city:"Beijing"},{name:"Scott2",sex:"male",age:22,city:"Beijing"}])db.student.save([{name:"Scott3",sex:"male",age:22,city:"Beijing"},{name:"Scott2",sex:"male",age:22,city:"Beijing"}])2)更新文档12345678# 修改一条记录db.student.update({name:"Scott2"},{$set:{age:26,classno:"2-6"}})# 修改多条记录db.student.updateMany({name:"Scott3"},{$set:{classno:"2-7"}})# 在age属性上都加2db.student.updateMany({},{$inc:{age:2}})# 向数组属性添加元素db.student.update({name:"Scott"},{$push:{role:"班长"}})3)从文档主键ID中提取时间1ObjectId("66dac03ddf68fdd4c95796d4").getTimestamp()4) 删除文档12345678# 删除文档中的字段,{}代表修改所有db.student.update({name:"Scott"},{$unset:{classno:"2-6"}})# 删除数组中的某个元素db.student.update({name:"Scott"},{$pull:{role:"班长"}})# 删除所有文档db.student.remove({})# 删除指定文档db.student.remove({name:"Scott2"})5) 简单查询表达式说明$lt小于$gt大于$lte小于等于$gte大于等于$in包括$nin不包括$ne不等于$all全部匹配$not取反$or或$exists含有字段12345678910111213# 查询所有文档db.student.find()# 查询指定文档,并显示指定字段,1为显示,0不显示db.student.find({name:"Scott3",classno:"2-8"},{name:1,_id:0})db.student.find({age:{$gte:24}})db.student.find({name:/^S/})# 查看单条记录db.student.findOne({name:/3$/})# 文档嵌套查询# {class:{type: 1, data: [1,2,3]}}db.student.find({data.class.type:1})db.student.find({data.class.data.1:2})db.student.find({data.class.data:[1,2,3]})6)分页查询1234# 取前十条db.student.find().limit(10)# 从21条开始取十条db.student.find().skip(20).limit(10)7) 文档排序12# 数据排序(1代表升序,-1代表降序)db.student.find().sort({name:1})8) 文档去重123456# 返回去重后指定数据,格式为数组db.student.distinct("name")# 为去重后数据排序,(-1为正序,1为倒序)db.student.distinct("name").sort(function(){return -1})# 截取数组中指定数据,(0,5)表示截取从第一行到第六行,(5)表示截取第六行到最后一行db.stuent.distinct("name").slice(0,5)5. 索引操作1) 创建索引123# 1升序,-1降序,background代表在空闲时创建db.student.createIndex({name:1})db.student.createIndex({name:-1},{background:true,name:"index_name"})2) 创建唯一性索引1db.student.createIndex({sid:1},{background:true,unique:true})3) 查看索引1db.student.getIndexes()4) 删除索引1db.student.dropIndexes()四、备份与恢复1. 全库备份1mongodump --host=localhost --port=27017 -u admin -p abc123456 --authenticationDatabase=admin -o /data2. 备份逻辑库12# --dumpDbUsersAndRoles参数可以备份隶属于逻辑库的用户mongodump --host=localhost --port=27017 -u admin -p abc123456 --authenticationDatabase=admin -d school -o /data3. 备份集合数据1234# --gzip压缩备份,--oplog使用oplog进行时间点快照mongodump --host=localhost --port=27017 -u admin -p abc123456 --authenticationDatabase=admin -d school -c student -o /data# 数据导出JSON或CSV格式数据,-f指定输出字段,-q指定查询语句mongoexport --host=localhost --port=27017 -u admin -p abc123456 --authenticationDatabase=admin -d school -c student -f "_id,name,sex,age" -o student.json4. 单库恢复12# --drop表示导入前删除数据库中集合mongorestore --host=localhost --port=27017 -u admin -p abc123456 --authenticationDatabase=admin --drop -d school /data/school5. 集合恢复1234# --gzip解压Gzip压缩存档还原,--oplogReplay重放oplog.bson中的操作内容,--oplogLimit与--oplogReplay一起使用时,可以限制重放到指定的时间点mongorestore --host=localhost --port=27017 -u admin -p abc123456 --authenticationDatabase=admin -d school -c student /data/school/student.bson# 导入json数据mongoimport --host=localhost --port=27017 -u admin -p abc123456 --authenticationDatabase=admin -d school -c student --file student.json6. 增量恢复123456789101112# 使用oplog参数全备,需要开启副本集mongodump --host=localhost --port=27017 -u admin -p abc123456 --authenticationDatabase=admin --oplog -o /data# 解析oplog文件,找出全备最后一个时间的数据bsondump /data/oplog.bson > /data/oplog.json# 导出增量数据,这里修改为oplog.json最近一行的时间戳mongodump --host=localhost --port=27017 -u admin -p abc123456 --authenticationDatabase=admin -d local -c oplog.rs -q '{ts:{$gt:Timestamp(1610789118,416)}}' -o /data/oplog# 恢复全备mongorestore --host=localhost --port=27017 -u admin -p abc123456 --authenticationDatabase=admin --oplogReplay /data# 复制增量的oplog到备份目录,重命名为oplog.bson,将原来的oplog.bson覆盖cp oplog.rs.bson /data/oplog.bson# 将增量的oplog进行恢复,指定要恢复的时间点mongorestore --host=localhost --port=27017 -u admin -p abc123456 --authenticationDatabase=admin --oplogReplay --oplogLimit "1610789168:1" /data
  • [技术干货] Mongo Shell 执行环境的基本操作 --转载
    Mongo Shell 是 MongoDB 的交互式 JavaScript shell,用于与 MongoDB 数据库进行交互。一、启动与连接1. 启动方式在安装了 MongoDB 的系统中,打开命令行终端,输入`mongo`命令即可启动 Mongo Shell。如果 MongoDB 的可执行文件路径不在系统环境变量中,则需要先进入 MongoDB 的安装目录下的`bin`文件夹,再执行`mongo`命令。如果数据库没启动,输入 `mongod` 启动数据库。2. 连接数据库默认情况下,Mongo Shell 会尝试连接到本地运行的 MongoDB 实例,端口为 27017。也可以在启动时指定连接的主机、端口和数据库等信息,例如`mongo mongodb://localhost:27017/mydb`,其中`mydb`是要连接的数据库名称。二、基本操作1. 数据库操作use: 切换或创建数据库db: 查看当前连接的数据库show dbs: 查看所有数据库2. 集合操作db.createCollection('mycollection'): 命令创建集合show collections: 查看当前数据库中的所有集合db.mycollection.insert(): 增db.mycollection.find(): 查db.mycollection.update(): 改db.mycollection.remove():删三、数据类型1. 支持的数据类型Mongo Shell 支持多种数据类型,包括字符串、数字、日期、数组、对象等。123456789db.mycollection.insert({  name: "John Doe",  age: 30,  hobbies: ["reading", "coding"],  address: {    street: "123 Main St",    city: "Anytown",  },});四、脚本编写与执行1. 编写脚本可以在 Mongo Shell 中直接编写 JavaScript 脚本,实现复杂的数据库操作逻辑。123function findUsersByAge(age) {  return db.users.find({ age: age });}2. 执行脚本文件也可以将 Mongo Shell 脚本保存为`.js`文件,然后在命令行中使用`mongo <script.js>`的方式来执行脚本,其中`<script.js>`是脚本文件的路径。五、权限与认证1. 权限管理MongoDB 支持用户权限管理,不同用户具有不同的权限来访问和操作数据库。在 Mongo Shell 中,可以使用`db.createUser()`命令创建用户,并使用`db.grantRolesToUser()`命令为用户授予角色和权限。2. 认证方式当 MongoDB 开启认证后,连接数据库时需要提供用户名和密码进行认证。可以在连接字符串中指定用户名和密码,如`mongo mongodb://username:password@localhost:27017/mydb`。六、与编程语言的交互1. 与 Node.js 交互在 Node.js 应用中,可以使用`mongodb`模块来连接和操作 MongoDB 数据库,其操作方式与 Mongo Shell 有很多相似之处。通过`mongodb`模块,可以在 Node.js 中执行与 Mongo Shell 类似的数据库操作,实现数据的存储、查询和更新等功能。2. 与 Python 交互在 Python 中,可以使用`pymongo`库来与 MongoDB 进行交互。`pymongo`提供了类似于 Mongo Shell 的操作方法,使得在 Python 中可以方便地操作 MongoDB 数据库,如插入数据、查询数据和更新数据等。
  • [技术干货] 映射MongoDB _id字段的几种常见方式 --转载
    在 Spring Data MongoDB 中,将 Java POJO 的字段映射到 MongoDB 文档的 _id 字段非常直接,主要通过 @Id 注解(org.springframework.data.annotation.Id)来完成。以下是映射 MongoDB _id 字段的几种常见方式和关键点:1、使用 String 类型作为 ID (最常见):当 @Id 注解的字段类型是 String 时,Spring Data MongoDB 会将其视为 MongoDB ObjectId 的字符串表示形式。如果在保存新文档时此 String 字段为 null,MongoDB Java 驱动程序会自动生成一个新的 ObjectId,然后 Spring Data MongoDB 会将其转换为字符串并赋值给该字段。这是最推荐和最方便的方式,因为字符串形式的 ID 更容易在 API、URL 和日志中使用。12345678910111213141516171819202122232425262728import org.springframework.data.annotation.Id;import org.springframework.data.mongodb.core.mapping.Document; @Document(collection = "my_entities")public class MyEntity {     @Id    private String id; // 将映射到 MongoDB 的 _id 字段     private String name;     // Constructors, getters, setters    public String getId() {        return id;    }     public void setId(String id) {        this.id = id;    }     public String getName() {        return name;    }     public void setName(String name) {        this.name = name;    }}当你保存一个新的 MyEntity 实例且 id 字段为 null 时:1234MyEntity entity = new MyEntity();entity.setName("Test Entity");mongoTemplate.save(entity); // entity.getId() 现在会有一个自动生成的 ObjectId 字符串System.out.println(entity.getId()); // 例如:"60c72b941f4b1a3e4c8e4f3a"2、使用 org.bson.types.ObjectId 类型作为 ID:你可以直接使用 MongoDB BSON库提供的 ObjectId 类型。同样,如果在保存新文档时此 ObjectId 字段为 null,驱动程序会自动生成一个新的 ObjectId。1234567891011121314151617181920212223242526272829import org.bson.types.ObjectId;import org.springframework.data.annotation.Id;import org.springframework.data.mongodb.core.mapping.Document; @Document(collection = "products")public class Product {     @Id    private ObjectId id; // 直接使用 ObjectId 类型     private String productName;     // Constructors, getters, setters    public ObjectId getId() {        return id;    }     public void setId(ObjectId id) {        this.id = id;    }     public String getProductName() {        return productName;    }     public void setProductName(String productName) {        this.productName = productName;    }}3、使用其他 Java 类型作为 ID (例如 Long, BigInteger):你也可以使用其他原始类型或对象类型(如 Long, BigInteger)作为 _id。重要: 如果使用这些类型,MongoDB 不会自动为你生成 ID。你必须在保存文档之前自己提供一个唯一的 ID 值。如果插入时该字段为 null (对于对象类型) 或默认值 (对于原始类型且你未设置),可能会导致错误或意外行为,具体取决于驱动程序和服务器版本。这种方式适用于你有外部系统生成 ID,或者 ID 具有特定业务含义的情况。123456789101112131415161718192021222324252627282930313233import org.springframework.data.annotation.Id;import org.springframework.data.mongodb.core.mapping.Document;import java.math.BigInteger; @Document(collection = "items")public class Item {     @Id    private Long itemId; // 使用 Long 类型,需要自己保证唯一性并赋值     // 或者    // @Id    // private BigInteger itemId; // 使用 BigInteger,需要自己保证唯一性并赋值     private String description;     // Constructors, getters, setters    public Long getItemId() {        return itemId;    }     public void setItemId(Long itemId) {        this.itemId = itemId;    }     public String getDescription() {        return description;    }     public void setDescription(String description) {        this.description = description;    }}使用自定义ID时:1234Item item = new Item();item.setItemId(12345L); // 必须手动设置 IDitem.setDescription("Custom ID Item");mongoTemplate.save(item);4、Java 字段名不一定是 “id”:被 @Id 注解的 Java 字段的名称可以不是 id。Spring Data MongoDB 依然会将其映射到 MongoDB 文档中的 _id 字段。123456789101112131415161718192021import org.springframework.data.annotation.Id;import org.springframework.data.mongodb.core.mapping.Document; @Document(collection = "books")public class Book {     @Id    private String bookIsbn; // Java 字段名为 "bookIsbn",但它映射到 MongoDB 的 _id     private String title;     // Getters and setters    public String getBookIsbn() {        return bookIsbn;    }     public void setBookIsbn(String bookIsbn) {        this.bookIsbn = bookIsbn;    }    // ...}关键点总结:@Id 注解: 是将 Java 字段标记为 MongoDB _id 的核心。自动生成:仅当 @Id 字段类型为 String 或 org.bson.types.ObjectId,并且在插入新文档时该字段值为 null 时,ID 才会由 MongoDB 驱动自动生成。其他类型(如 Long, Integer, BigInteger)需要你在应用程序中手动赋值并确保其唯一性。不可变性: MongoDB 中的 _id 字段一旦设置,就不能被修改。尝试更新 _id 会导致操作失败或创建一个新文档(取决于操作类型)。唯一性: _id 在其集合中必须是唯一的。MongoDB 会自动为 _id 字段创建唯一索引。@Field("_id"): 通常不需要。@Id 注解本身就隐含了该 Java 字段映射到 BSON 文档的 _id 键。显式使用 @Field("_id") 是多余的。选择哪种 ID 类型取决于你的具体需求:String (ObjectId 字符串): 通用,方便,推荐用于大多数场景。ObjectId: 如果你需要在 Java 代码中直接操作 ObjectId 对象(例如,获取时间戳部分)。Long / BigInteger / 其他自定义类型: 当 ID 有特定业务含义或由外部系统生成时。在大多数 Spring Boot 应用中,使用 String 类型并让 MongoDB 自动生成 ObjectId 是最简单和最常见的做法。
  • [技术干货] MongoDB 的批量查找符号的方法 --转载
    一、`$in` 操作符1. 功能`$in` 操作符用于匹配字段值等于指定数组中任意值的文档,能批量查找多个特定值的文档。2. 语法示例1db.collection.find({ field: { $in: [value1, value2, ...] } });3. 代码示例假设有个名为 `users` 的集合,里面存有用户信息,若要查找 `age` 为 20、25 或 30 的用户,可使用以下代码:1db.users.find({ age: { $in: [20, 25, 30] } });二、`$nin` 操作符1. 功能`$nin` 是 `$in` 的反操作符,用于匹配字段值不在指定数组中的文档,可批量排除多个特定值的文档。2. 语法示例1db.collection.find({ field: { $nin: [value1, value2, ...] } });3. 代码示例还是在 `users` 集合中,若要查找 `age` 不是 20、25 或 30 的用户,可使用以下代码:1db.users.find({ age: { $nin: [20, 25, 30] } });三、`$or` 操作符1. 功能`$or` 操作符能将多个查询条件组合起来,只要满足其中一个条件的文档就会被返回,可用于批量查找符合多个不同条件的文档。2. 语法示例1234567db.collection.find({    $or: [        { condition1 },        { condition2 },        ...    ]});3. 代码示例在 `users` 集合中,若要查找 `age` 为 20 或者 `name` 为 "John" 的用户,可使用以下代码:123db.users.find({  $or: [{ age: 20 }, { name: "John" }],});四、`$and` 操作符1. 功能`$and` 操作符将多个查询条件组合,文档必须同时满足所有条件才会被返回,常用于批量查找同时符合多个条件的文档。2. 语法示例1234567db.collection.find({    $and: [        { condition1 },        { condition2 },        ...    ]});3. 代码示例在 `users` 集合中,若要查找 `age` 大于 20 且 `name` 为 "John" 的用户,可使用以下代码:123db.users.find({  $and: [{ age: { $gt: 20 } }, { name: "John" }],});五、范围查询操作符(`$gt`、`$lt`、`$gte`、`$lte`)1. 功能这些操作符能批量查找字段值在某个范围内的文档,`$gt` 表示大于,`$lt` 表示小于,`$gte` 表示大于等于,`$lte` 表示小于等于。2. 语法示例12345678// 大于db.collection.find({ field: { $gt: value } });// 小于db.collection.find({ field: { $lt: value } });// 大于等于db.collection.find({ field: { $gte: value } });// 小于等于db.collection.find({ field: { $lte: value } });3. 代码示例在 `users` 集合中,若要查找 `age` 大于 20 且小于 30 的用户,可使用以下代码:1db.users.find({ age: { $gt: 20, $lt: 30 } });
  • [技术干货] MongoDB中单对象大小超16M的存储方案 --转载
    1. 使用 GridFS适用场景:需要存储大文件(如图像、视频、文档等)。原理MongoDB 的 GridFS 是一种专门用于存储超过 16MB 文件的工具。它会将大文件分割成多个 chunk(默认大小 255KB),并存储在两个集合中:fs.files:存储文件的元数据(如文件名、大小、类型等)。fs.chunks:存储文件的内容分块。实现步骤存储大文件 使用 MongoDB 驱动的 GridFS 工具存储文件。Python 示例:12345678910from pymongo import MongoClientfrom gridfs import GridFS  client = MongoClient("mongodb://localhost:27017")db = client.myDatabasefs = GridFS(db)  # 存储文件with open("large_file.bin", "rb") as f:    fs.put(f, filename="large_file.bin")读取大文件1234# 读取文件file_data = fs.get_last_version(filename="large_file.bin")with open("output.bin", "wb") as f:    f.write(file_data.read())2. 将文档拆分为多个小文档适用场景:文档包含大量嵌套数据,导致总大小超过 16MB。解决思路将大文档拆分成多个子文档。使用字段(如 _id 或 parentId)将这些子文档关联起来。实现步骤示例:拆分用户日志记录 原始大文档(超 16MB):1{ "_id": "user1", "logs": [ { "timestamp": "2025-01-01", "action": "login" }, ... ] }拆分为多个小文档:1234567// 主文档 { "_id": "user1", "type": "userMetadata" } // 子文档 { "parentId": "user1", "logs": [ { "timestamp": "2025-01-01", "action": "login" }, ... ] }查询时合并:123db.metadata.find({ _id: "user1" }); db.logs.find({ parentId: "user1" });3. 使用 BSON 对象数组存储引用适用场景:需要在文档中存储大量关联对象。解决思路将大数组分割到其他集合中,主文档存储引用。示例大文档超限前:1{ "_id": "project1", "name": "Big Project", "tasks": [ /* 超大量任务数据 */ ] }优化后:1234// 主文档 { "_id": "project1", "name": "Big Project" } // 任务文档 { "projectId": "project1", "taskId": 1, "taskName": "Task 1", ... }查询时通过 projectId 关联:1db.projects.find({ _id: "project1" }); db.tasks.find({ projectId: "project1" });4. 压缩数据适用场景:文档中包含重复数据或可压缩结构(如 JSON 数据)。解决思路在存储之前压缩数据(例如使用 GZIP、Zlib 等)。查询时解压数据。示例Python 实现:123456789101112131415import zlibfrom pymongo import MongoClient  client = MongoClient("mongodb://localhost:27017")db = client.myDatabasecollection = db.myCollection  # 压缩存储data = {"key": "value" * 10000}compressed_data = zlib.compress(str(data).encode("utf-8"))collection.insert_one({"_id": "compressed_doc", "data": compressed_data})  # 解压读取doc = collection.find_one({"_id": "compressed_doc"})decompressed_data = zlib.decompress(doc["data"]).decode("utf-8")5. 修改数据结构适用场景:文档设计冗余或结构不合理。解决思路简化嵌套层级。使用更紧凑的数据类型(如数组代替对象)。优化前:1{ "_id": "order1", "customer": { "id": 1, "name": "John Doe" }, "items": [ { "productId": "p1", "productName": "Product 1", "quantity": 2 } ] }优化后:1{ "_id": "order1", "customerId": 1, "items": [ { "p": "p1", "q": 2 } ] }6. 使用文件系统或其他存储服务适用场景:非结构化大数据(如媒体文件、大型JSON)。解决思路将大数据存储到文件系统、Amazon S3、Azure Blob 等。在 MongoDB 中存储文件路径或 URL。
  • [技术干货] MongoDB Schema设计进阶
    MongoDB的灵活Schema设计是其核心优势之一,但随着业务发展,数据模型的管理和演进成为关键挑战。本文将系统性地介绍MongoDB Schema版本控制与迁移的最佳实践,帮助开发者在保持灵活性的同时实现数据模型的可控演进。一、MongoDB Schema设计原则1. 灵活性与约束的平衡MongoDB虽然支持无固定Schema,但生产环境仍需合理约束:// 使用JSON Schema定义验证规则 db.createCollection("products", { validator: { $jsonSchema: { bsonType: "object", required: ["name", "price"], properties: { name: { bsonType: "string" }, price: { bsonType: "decimal" }, tags: { bsonType: "array", items: { bsonType: "string" } } } } } }) 关键考量:必填字段必须明确声明数据类型需要合理约束嵌套文档深度建议不超过3层数组元素类型应该统一2. 关系建模策略MongoDB处理关系的三种典型方式:策略适用场景示例内嵌文档一对一或一对少关系用户档案包含地址引用ID一对多或多对多关系订单引用产品ID混合模式复杂关系博客文章内嵌评论+引用作者某社交平台实测数据:采用混合模式后,用户主页查询性能提升6倍,从320ms降至52ms。二、Schema版本控制模式1. 版本字段方案最简单的版本控制方式是在文档中添加版本标识:// 文档结构示例 { _id: ObjectId("..."), schemaVersion: 2, // 版本标识 title: "MongoDB指南", author: { name: "张三", email: "zhang@example.com" }, // V2新增字段 seoKeywords: ["数据库","NoSQL"] } 实现要点:版本号使用整数递增(1,2,3…)应用代码根据版本号处理不同结构旧版本文档可延迟迁移2. 历史集合方案维护专门的历史集合存储旧版本文档:// 当前数据集合 db.articles.insert({ _id: ObjectId("..."), title: "最新标题", content: "更新内容" }) // 历史版本集合 db.articles_history.insert({ originalId: ObjectId("..."), version: 1, title: "旧标题", content: "原始内容", archivedAt: ISODate("2025-06-01") }) 优势:完整保留历史记录支持数据回滚和审计查询当前数据不受影响某CMS系统采用此方案后,数据追溯查询性能提升80%,历史版本查询平均响应时间从1200ms降至220ms。3. 增量变更日志方案记录Schema变更操作而非完整文档:// 变更日志集合 db.schema_changelog.insert({ collection: "products", field: "specifications", changeType: "ADD", versionFrom: 1, versionTo: 2, changedAt: ISODate("2025-06-24"), changedBy: "admin@example.com" }) 适用场景:需要严格审计的金融系统多团队协作的大型项目频繁Schema变更的敏捷开发三、Schema迁移策略1. 渐进式迁移实现步骤:应用兼容新旧Schema逐步更新旧文档最终移除兼容代码// 查询时处理多版本 function getProduct(id) { const doc = db.products.findOne({_id: id}); if (doc.schemaVersion === 1) { // 将V1转换为V2格式 return { ...doc, specifications: [], schemaVersion: 2 }; } return doc; } 性能数据:百万级文档迁移通常需要2-6小时对生产系统性能影响<5%可结合闲时任务调度2. 批量迁移方案使用聚合管道实现高效批量更新:// 将V1产品文档迁移到V2 db.products.aggregate([ { $match: { schemaVersion: 1 } }, { $addFields: { specifications: [], lastUpdated: new Date(), schemaVersion: 2 }}, { $merge: "products" } ], { allowDiskUse: true }) 优化技巧:添加{ allowDiskUse: true }避免内存溢出分批执行(每批5-10万文档)在从库执行后切换主从3. 零停机迁移策略实施流程:双写新旧格式后台迁移旧数据验证数据一致性切换读操作到新Schema清理旧格式数据某电商平台采用此方案,在"双11"期间完成了用户Schema迁移,全程零投诉。四、版本控制工具链1. Mongoose版本控制插件const versioning = require('mongoose-versioning'); const productSchema = new mongoose.Schema({ name: String, price: Number }); // 添加版本控制插件 productSchema.plugin(versioning, { collection: 'products_versions', strategy: 'interval', interval: 86400 // 每天最多一个版本 }); 功能特性:自动维护历史版本支持版本差异比较可配置版本保留策略2. Liquibase for MongoDB<!-- 变更日志示例 --> <changeSet id="20240624-1" author="dev"> <addColumn collectionName="products"> <column name="seoKeywords" type="array"/> </addColumn> <update collectionName="products"> <field name="schemaVersion" value="2"/> </update> </changeSet> 企业级功能:变更集原子应用回滚脚本支持多环境配置管理五、生产环境最佳实践1. 变更管理流程设计评审:团队评审Schema变更影响版本测试:在预发布环境验证备份策略:执行前全量备份监控指标:// 监控迁移进度 db.products.aggregate([ { $group: { _id: "$schemaVersion", count: { $sum: 1 } }} ]) 回滚方案:准备应急回滚脚本2. 性能优化建议索引策略:为版本字段添加索引db.products.createIndex({ schemaVersion: 1 }) 查询优化:使用$match尽早过滤版本存储规划:历史数据使用压缩存储3. 文档规范标准变更日志:记录每个版本的修改内容示例文档:维护各版本的示例文档API适配:明确标注API支持的版本范围六、未来演进方向AI辅助设计:基于查询模式推荐Schema预测变更影响自动生成迁移脚本云原生集成:Kubernetes Operator管理Schema变更跨集群同步Schema定义自动回滚异常变更实时分析:变更影响实时监控性能差异即时对比异常模式自动检测MongoDB的Schema管理是一门平衡艺术,需要在灵活性与稳定性之间找到最佳平衡点。通过合理的版本控制和迁移策略,团队可以充分利用MongoDB的Schema灵活性,同时避免数据混乱和技术债务。随着工具链的不断完善,Schema管理正变得越来越自动化,但理解其核心原理仍是每个MongoDB开发者的必备技能。
  • [技术干货] MongoDB分片集群设计精要
    MongoDB分片集群是实现水平扩展的核心架构,而分片键的选择直接决定了集群的性能上限与扩展能力。本文将深入剖析分片键设计方法论,揭示常见陷阱,并提供生产级解决方案。一、分片集群架构深度解析1. 核心组件协同机制MongoDB分片集群由三大关键组件构成精密协作体系:分片节点(Shard):实际数据存储单元,建议3个以上副本集配置服务器(Config Server):存储元数据的特殊副本集(必须3节点)查询路由(mongos):无状态路由进程,建议与应用服务器同机部署数据流向示意图:应用请求 → mongos → 配置服务器 → 分片节点 → 返回结果2. 分片数据分布原理MongoDB通过分片键范围分区实现数据分布:系统将分片键值空间划分为多个块(chunk)默认块大小为64MB(可调整)当块超过阈值时触发自动分裂(split)均衡器(balancer)负责跨分片迁移块某电商平台实测数据:10亿条订单数据分布在16个分片上,平均每个分片承载62.5M条记录,最大分片数据偏差仅3.2%。二、分片键选择四维评估体系1. 基数维度(Cardinality)理想特性:高基数分片键(如用户ID、订单号)反例分析:// 低基数分片键导致分布不均 { gender: "male" } // 仅2-3个可能值 优化方案:组合高基数字段:sh.shardCollection("db.orders", { customerId: 1, orderDate: 1 }) 2. 写分布维度(Write Distribution)热点问题诊断:db.orders.getShardDistribution() // 输出显示80%写入集中在某个分片 解决方案:使用哈希分片键:sh.shardCollection("db.logs", { _id: "hashed" }) 采用复合分片键:sh.shardCollection("db.sensors", { sensorType: 1, timestamp: 1 }) 3. 查询模式维度(Query Patterns)最佳实践:分片键应覆盖80%以上高频查询条件避免跨分片查询(scatter-gather)查询效率对比:查询条件分片键匹配性能{orderId:123}是5ms{userId:456}否350ms4. 增长模式维度(Growth Pattern)时间序列数据特例:// 单调递增分片键导致热点 sh.shardCollection("db.logs", { _id: 1 }) // _id包含时间戳 优化方案:反向时间戳(1 - timestamp)哈希与范围组合:sh.shardCollection("db.iot", { deviceId: 1, hashedTime: "hashed" }) 三、生产环境分片策略实战1. 金融交易系统案例需求特征:每日新增5000万笔交易80%查询按accountId+date范围查询需要保证同一账户交易局部性分片方案:sh.shardCollection("bank.transactions", { accountId: 1, transactionDate: -1 // 倒序便于最新交易查询 }) 效果:写入均匀分布在16个分片(最大偏差7%)账户历史查询命中单个分片比例达92%2. 物联网平台案例数据特征:10万设备每分钟上报数据设备ID前缀包含地域信息需要按地域聚合分析创新方案:// 使用带前缀的哈希分片键 sh.shardCollection("iot.measurements", { region: 1, hashedDeviceId: "hashed" }) 优势:相同地域数据物理相邻哈希分量保证均匀分布区域查询只需扫描少数分片四、热点问题诊断与调优1. 监控指标体系关键命令:// 查看分片分布情况 db.collection.getShardDistribution() // 监控均衡器状态 sh.isBalancerRunning() // 分析查询路由 db.currentOp(true).inprog.forEach( op => printjson(op.shardVersion) ) 核心监控项:分片间数据量差异(应<10%)块迁移频率(正常<5次/小时)查询命中分片数(理想为1)2. 动态调整策略分片键重构方案:创建新空集合并设置新分片键使用$out阶段逐步迁移数据应用双写过渡期最终切换读操作块大小调优:// 临时增大块大小加速初始加载 use config db.settings.update( { _id: "chunksize" }, { $set: { value: 128 } } ) 五、高级技巧与未来演进1. 分片区域(Zone)策略跨地域部署优化:// 定义区域 sh.addShardTag("shard001", "US-EAST") sh.addShardTag("shard002", "EU-WEST") // 分配数据范围 sh.addTagRange("orders.usa", { zipCode: "00000" }, { zipCode: "99999" }, "US-EAST" ) 2. 弹性分片架构云原生最佳实践:按需自动扩展分片(如AWS M10→M30)利用Kubernetes实现无感知扩缩容基于负载预测的预分片策略3. 混合分片模式时间序列数据特殊处理:热数据:高性能分片(内存优化)温数据:均衡分片(SSD存储)冷数据:归档分片(HDD存储)六、分片集群管理黄金法则预分片原则:提前创建足够多的空块(splitAt())监控三要素:均衡状态、查询路由、性能基线变更窗口期:在业务低峰执行分片维护容量规划:每个分片预留30%增长空间回退方案:始终备份未分片的数据快照MongoDB分片集群设计是一门平衡的艺术,优秀的分片键需要在数据分布、查询性能和写入吞吐之间取得完美平衡。随着MongoDB 6.0引入弹性分片和更智能的均衡器,分片集群正变得越来越自动化。但无论如何演进,理解分片键设计的核心原则始终是构建高性能分布式系统的基石。