首页/文章列表/文章详情

mongo变更流使用及windows下副本集五分钟搭建

编程知识2092024-08-10评论

mongodb的变更流解释:

变更流(Change Streams)允许应用程序访问实时数据变更,从而避免事先手动追踪  oplog 的复杂性和风险。应用程序可使用变更流来订阅针对单个集合、数据库或整个部署的所有数据变更,并立即对它们做出响应。由于变更流采用聚合框架,因此,应用程序还可对特定变更进行过滤,或是随意转换通知。(Change Streams - MongoDB Manual v5.0)

使用场景,需要websocket推送实时数据的时候,我们把数据写入mongo的同时,websocket实时监听mongo数据,拿到后推送到订阅组用户。

这里只做一端新增另一端服务监听测试,及windows下副本集快速搭建流程。

 

sub端代码

package main

import (
	"context"
	"fmt"
	"go.mongodb.org/mongo-driver/bson"
	"go.mongodb.org/mongo-driver/mongo"
	"go.mongodb.org/mongo-driver/mongo/options"
	"log"
)

func main() {
	// 设置 MongoDB 客户端mongo单机模式不支持这种监听 单机报错 2024/08/10 11:18:54 (Location40573) The $changeStream stage is only supported on replica sets
	clientOptions := options.Client().ApplyURI("mongodb://localhost:27017")
	client, err := mongo.Connect(context.TODO(), clientOptions)
	if err != nil {
		log.Fatal(err)
	}
	defer client.Disconnect(context.TODO())

	// 获取数据库和集合
	collection := client.Database("testdb").Collection("items")

	// 设置 Change Stream
	pipeline := mongo.Pipeline{}
	changeStreamOptions := options.ChangeStream().SetFullDocument(options.UpdateLookup)
	changeStream, err := collection.Watch(context.TODO(), pipeline, changeStreamOptions)
	if err != nil {
		log.Fatal(err)
	}
	defer changeStream.Close(context.TODO())

	fmt.Println("开始监听 Change Stream...")

	// 读取 Change Stream
	for changeStream.Next(context.TODO()) {
		var changeEvent bson.M
		if err := changeStream.Decode(&changeEvent); err != nil {
			log.Fatal(err)
		}

		fmt.Printf("检测到更改: %+v\n", changeEvent)
	}

	if err := changeStream.Err(); err != nil {
		log.Fatal(err)
	}
}

 

pub端代码

package mainimport ( "context""fmt""time""go.mongodb.org/mongo-driver/bson""go.mongodb.org/mongo-driver/mongo""go.mongodb.org/mongo-driver/mongo/options")func main() { // 设置 MongoDB 客户端 clientOptions := options.Client().ApplyURI("mongodb://localhost:27017") client, err := mongo.Connect(context.TODO(), clientOptions) if err != nil { fmt.Println("连接 MongoDB 失败:", err) return } defer client.Disconnect(context.TODO()) //获取数据库和集合 collection := client.Database("testdb").Collection("items")//插入数据for i := 1; i <= 5; i++ { item := bson.D{{"name", fmt.Sprintf("item%d", i)}, {"value", i}} _, err := collection.InsertOne(context.TODO(), item) if err != nil { fmt.Println("插入数据失败:", err) return}//fmt.Printf("插入数据: %+v\n", item)fmt.Printf("插入数据第 %d 条", i) time.Sleep(2 * time.Second) //模拟一些延迟}}

执行结果 pub端

 执行结果 sub端

 

数据库不用新建集合,自动生成很方便

 

 

 

下面是windows下安装副本集步骤一字不拉

https://www.mongodb.com/try/download/community 下载zip包解压 bin目录同级创建data-data4(data内部需要创建好db目录),log-log4 MongoDB shell version v5.0.28 注意 data目录下没有db文件夹net start MongoDB执行服务起不来 192.168.2.6本机ipmongod.exe--config"E:\mongodb\mongod.conf"--serviceName"MongoDB"--serviceDisplayName"MongoDB"--installmongod.exe--config"E:\mongodb\mongod1.conf"--serviceName"MongoDB1"--serviceDisplayName"MongoDB1"--installmongod.exe--config"E:\mongodb\mongod2.conf"--serviceName"MongoDB2"--serviceDisplayName"MongoDB2"--installmongod.exe--config"E:\mongodb\mongod3.conf"--serviceName"MongoDB3"--serviceDisplayName"MongoDB3"--installnet start MongoDBnet start MongoDB1net start MongoDB2net start MongoDB3bin目录下打开cmd执行mongo.exe rs_conf={_id:"rs",members:[{_id:0,host:"192.168.2.6:27017",priority:1}, {_id:1,host:"192.168.2.6:27018",priority:2}, {_id:2,host:"192.168.2.6:27019",priority:3}, {_id:4,host:"192.168.2.6:27020", arbiterOnly:true}]}返回这个代表成功:{"_id":"rs","members" : [ { "_id":0,"host":"192.168.2.6:27017","priority":1 }, { "_id":1,"host":"192.168.2.6:27018","priority":2 }, { "_id":2,"host":"192.168.2.6:27019","priority":3 }, { "_id":4,"host":"192.168.2.6:27020","arbiterOnly":true } ]}rs.initiate(rs_conf) 执行配置{"ok":1}rs.status() 查看状态{ "set":"rs","date" : ISODate("2024-08-10T02:40:20.391Z"),"myState":2,"term" : NumberLong(2),"syncSourceHost":"192.168.2.6:27019","syncSourceId":2,"heartbeatIntervalMillis" : NumberLong(2000),"majorityVoteCount":3,"writeMajorityCount":3,"votingMembersCount":4,"writableVotingMembersCount":3,"optimes" : { "lastCommittedOpTime" : { "ts" : Timestamp(1723257616,1),"t" : NumberLong(2) }, "lastCommittedWallTime" : ISODate("2024-08-10T02:40:16.003Z"),"readConcernMajorityOpTime" : { "ts" : Timestamp(1723257616,1),"t" : NumberLong(2) }, "appliedOpTime" : { "ts" : Timestamp(1723257616,1),"t" : NumberLong(2) }, "durableOpTime" : { "ts" : Timestamp(1723257616,1),"t" : NumberLong(2) }, "lastAppliedWallTime" : ISODate("2024-08-10T02:40:16.003Z"),"lastDurableWallTime" : ISODate("2024-08-10T02:40:16.003Z") }, "lastStableRecoveryTimestamp" : Timestamp(1723257586,1),"electionParticipantMetrics" : { "votedForCandidate":true,"electionTerm" : NumberLong(2),"lastVoteDate" : ISODate("2024-08-10T02:39:15.909Z"),"electionCandidateMemberId":2,"voteReason":"","lastAppliedOpTimeAtElection" : { "ts" : Timestamp(1723257547,5),"t" : NumberLong(1) }, "maxAppliedOpTimeInSet" : { "ts" : Timestamp(1723257547,5),"t" : NumberLong(1) }, "priorityAtElection":1,"newTermStartDate" : ISODate("2024-08-10T02:39:15.997Z"),"newTermAppliedDate" : ISODate("2024-08-10T02:39:16.928Z") }, "members" : [ { "_id":0,"name":"192.168.2.6:27017","health":1,"state":2,"stateStr":"SECONDARY","uptime":2677,"optime" : { "ts" : Timestamp(1723257616,1),"t" : NumberLong(2) }, "optimeDate" : ISODate("2024-08-10T02:40:16Z"),"lastAppliedWallTime" : ISODate("2024-08-10T02:40:16.003Z"),"lastDurableWallTime" : ISODate("2024-08-10T02:40:16.003Z"),"syncSourceHost":"192.168.2.6:27019","syncSourceId":2,"infoMessage":"","configVersion":1,"configTerm":2,"self":true,"lastHeartbeatMessage":"" }, { "_id":1,"name":"192.168.2.6:27018","health":1,"state":2,"stateStr":"SECONDARY","uptime":85,"optime" : { "ts" : Timestamp(1723257616,1),"t" : NumberLong(2) }, "optimeDurable" : { "ts" : Timestamp(1723257616,1),"t" : NumberLong(2) }, "optimeDate" : ISODate("2024-08-10T02:40:16Z"),"optimeDurableDate" : ISODate("2024-08-10T02:40:16Z"),"lastAppliedWallTime" : ISODate("2024-08-10T02:40:16.003Z"),"lastDurableWallTime" : ISODate("2024-08-10T02:40:16.003Z"),"lastHeartbeat" : ISODate("2024-08-10T02:40:19.059Z"),"lastHeartbeatRecv" : ISODate("2024-08-10T02:40:20.083Z"),"pingMs" : NumberLong(0),"lastHeartbeatMessage":"","syncSourceHost":"192.168.2.6:27017","syncSourceId":0,"infoMessage":"","configVersion":1,"configTerm":2 }, { "_id":2,"name":"192.168.2.6:27019","health":1,"state":1,"stateStr":"PRIMARY","uptime":85,"optime" : { "ts" : Timestamp(1723257616,1),"t" : NumberLong(2) }, "optimeDurable" : { "ts" : Timestamp(1723257616,1),"t" : NumberLong(2) }, "optimeDate" : ISODate("2024-08-10T02:40:16Z"),"optimeDurableDate" : ISODate("2024-08-10T02:40:16Z"),"lastAppliedWallTime" : ISODate("2024-08-10T02:40:16.003Z"),"lastDurableWallTime" : ISODate("2024-08-10T02:40:16.003Z"),"lastHeartbeat" : ISODate("2024-08-10T02:40:19.060Z"),"lastHeartbeatRecv" : ISODate("2024-08-10T02:40:20.022Z"),"pingMs" : NumberLong(0),"lastHeartbeatMessage":"","syncSourceHost":"","syncSourceId" : -1,"infoMessage":"","electionTime" : Timestamp(1723257555,1),"electionDate" : ISODate("2024-08-10T02:39:15Z"),"configVersion":1,"configTerm":2 }, { "_id":4,"name":"192.168.2.6:27020","health":1,"state":7,"stateStr":"ARBITER","uptime":85,"lastHeartbeat" : ISODate("2024-08-10T02:40:19.059Z"),"lastHeartbeatRecv" : ISODate("2024-08-10T02:40:20.092Z"),"pingMs" : NumberLong(0),"lastHeartbeatMessage":"","syncSourceHost":"","syncSourceId" : -1,"infoMessage":"","configVersion":1,"configTerm":2 } ], "ok":1,"$clusterTime" : { "clusterTime" : Timestamp(1723257616,1),"signature" : { "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),"keyId" : NumberLong(0) } }, "operationTime" : Timestamp(1723257616,1)}

 

demo代码链接

go/mongochangestreamsdemo/demo at main · liuzhixin405/go (github.com)

mongo配置链接

config/mongo windows集群 at main · liuzhixin405/config (github.com)

博客园

这个人很懒...

用户评论 (0)

发表评论

captcha