首页/文章列表/文章详情

hashicorp/raft模块实现的raft集群存在节点跨集群身份冲突问题

编程知识582025-05-14评论

我通过模块github.com/hashicorp/raft使用golang实现了一个raft集群功能,发现如下场景中会遇到一个问题:

测试启动如下2个raft集群,集群名称,和集群node与IP地址如下,raft集群均通过BootstrapCluster方法初始化:

Cluster1 BootstrapCluster servers:
- node1: {raft.ServerID: c1-node1, raft.ServerAddress: 192.168.100.1:7000}
- node2: {raft.ServerID: c1-node2, raft.ServerAddress: 192.168.100.2:7000}
- node3: {raft.ServerID: c1-node3, raft.ServerAddress: 192.168.100.3:7000}

Cluster2 BootstrapCluster servers:
- node3: {raft.ServerID: c2-node3, raft.ServerAddress: 192.168.100.3:7000}
- node4: {raft.ServerID: c2-node4, raft.ServerAddress: 192.168.100.4:7000}
- node5: {raft.ServerID: c2-node5, raft.ServerAddress: 192.168.100.5:7000}

其中,"node3"的地址会存在2个集群中。

  1. "node1","node2"按照"Cluster1"启动:

sudo ./raft_svr -cluster 'c1-node1,127.0.0.1,800;c1-node2,127.0.0.2,800;c1-node3,127.0.0.3,800' -id c1-node1

sudo ./raft_svr -cluster 'c1-node1,127.0.0.1,800;c1-node2,127.0.0.2,800;c1-node3,127.0.0.3,800' -id c1-node2

  1. "node3","node4","node5"先按照"Cluster2"启动:

sudo ./raft_svr -cluster 'c2-node3,127.0.0.3,800;c2-node4,127.0.0.4,800;c2-node5,127.0.0.5,800' -id c2-node3

sudo ./raft_svr -cluster 'c2-node3,127.0.0.3,800;c2-node4,127.0.0.4,800;c2-node5,127.0.0.5,800' -id c2-node4

sudo ./raft_svr -cluster 'c2-node3,127.0.0.3,800;c2-node4,127.0.0.4,800;c2-node5,127.0.0.5,800' -id c2-node5

然后就会发现"node3"会在"Cluster1"和"Cluster2"之间来回切换,一会属于"Cluster1",一会属于"Cluster2".

INFO[0170] current state:Follower, leader address:127.0.0.5:800, servers:[{Suffrage:Voter ID:c2-node3 Address:127.0.0.3:800} {Suffrage:Voter ID:c2-node4 Address:127.0.0.4:800} {Suffrage:Voter ID:c2-node5 Address:127.0.0.5:800}], last contact:2025-05-14 15:35:53.330867 +0800 CST m=+169.779019126INFO[0171] current state:Follower, leader address:127.0.0.1:800, servers:[{Suffrage:Voter ID:c2-node3 Address:127.0.0.3:800} {Suffrage:Voter ID:c2-node4 Address:127.0.0.4:800} {Suffrage:Voter ID:c2-node5 Address:127.0.0.5:800}], last contact:2025-05-14 15:35:54.308388 +0800 CST m=+170.756576126

我的代码如下:

package mainimport ("flag""fmt""io""net""os""strconv""strings""time""github.com/hashicorp/raft"log"github.com/sirupsen/logrus")type raftCluster struct {localRaftID raft.ServerIDservers map[raft.ServerID]raft.ServerAddress // raftID : raftAddressPortraft *raft.RaftelectionTimeout time.Duration}func (r *raftCluster) Start() error {config := raft.DefaultConfig()config.HeartbeatTimeout = 2000 * time.Millisecondconfig.ElectionTimeout = 5000 * time.Millisecondconfig.CommitTimeout = 2000 * time.Millisecondconfig.LeaderLeaseTimeout = 2000 * time.Millisecondconfig.LocalID = r.localRaftIDconfig.LogOutput = log.StandardLogger().Outr.electionTimeout = config.ElectionTimeout * time.Duration(len(r.servers)*2)localAddressPort := string(r.servers[r.localRaftID])tcpAddr, err := net.ResolveTCPAddr("tcp", localAddressPort)if err != nil {return fmt.Errorf("resolve tcp address %s, %v", localAddressPort, err)}transport, err := raft.NewTCPTransport(localAddressPort, tcpAddr, 2, 10*time.Second, log.StandardLogger().Out)if err != nil {return fmt.Errorf("fail to create tcp transport, localAddressPort:%s, tcpAddr:%v, %v",localAddressPort, tcpAddr, err)}snapshots := raft.NewInmemSnapshotStore()logStore := raft.NewInmemStore()stableStore := raft.NewInmemStore()fm := NewFsm()r.raft, err = raft.NewRaft(config, fm, logStore, stableStore, snapshots, transport)if err != nil {return fmt.Errorf("create raft error, %v", err)}var configuration raft.Configurationfor sID, addr := range r.servers {server := raft.Server{ID: sID,Address: addr,}configuration.Servers = append(configuration.Servers, server)}err = r.raft.BootstrapCluster(configuration).Error()if err != nil {return fmt.Errorf("raft bootstrap faild, conf:%v, %v", configuration, err)}log.Infof("bootstrap cluster as config: %v", configuration)return nil}func (r *raftCluster) checkLeaderState() {ticker := time.NewTicker(time.Second)for {select {case leader := <-r.raft.LeaderCh():log.Infof("im leader:%v, state:%s, leader address:%s", leader, r.raft.State(), r.raft.Leader())case <-ticker.C:verifyErr := r.raft.VerifyLeader().Error()servers := r.raft.GetConfiguration().Configuration().Serversswitch verifyErr {case nil:log.Infof("im leader, servers:%v", servers)case raft.ErrNotLeader:// check cluster leaderlog.Infof("current state:%v, servers:%+v, leader address:%v, last contact:%v",r.raft.State(), servers, r.raft.Leader(), r.raft.LastContact())}}}}func main() {var (clusters = flag.String("cluster","","cluster node address, fmt: ID,IP,Port;ID,IP,Port")clusterId = flag.String("id","","cluster id"))flag.Parse()if *clusterId =="" {log.Infof("cluster id messing")os.Exit(1)}servers := make(map[raft.ServerID]raft.ServerAddress)for _, cluster := range strings.Split(*clusters,";") {info := strings.Split(cluster,",")var (nid stringnip net.IPnport interr error)switch {case len(info) == 3:nid = info[0]nip = net.ParseIP(info[1])if nip == nil {log.Infof("cluster %s ip %s parse failed", cluster, info[1])os.Exit(1)}nport, err = strconv.Atoi(info[2])if err != nil {log.Infof("cluster %s port %s parse failed, %v", cluster, info[2], err)}default:log.Infof("cluster args value is bad format")os.Exit(1)}log.Infof("cluster node id:%s, ip:%v, port:%d", nid, nip, nport)addr := net.TCPAddr{IP: nip, Port: nport}servers[raft.ServerID(nid)] = raft.ServerAddress(addr.String())}r := raftCluster{localRaftID: raft.ServerID(*clusterId),servers: servers,}err := r.Start()if err != nil {log.Infof("rafter cluster start failed, %v", err)os.Exit(1)}r.checkLeaderState()}// SimpleFsm: 实现一个简单的Fsmtype SimpleFsm struct {db database}func NewFsm() *SimpleFsm {fsm := &SimpleFsm{db: NewDatabase(),}return fsm}func (f *SimpleFsm) Apply(l *raft.Log) interface{} {return nil}func (f *SimpleFsm) Snapshot() (raft.FSMSnapshot, error) {return &f.db, nil}func (f *SimpleFsm) Restore(io.ReadCloser) error {return nil}type database struct{}func NewDatabase() database {return database{}}func (d *database) Get(key string) string {return"not implemented"}func (d *database) Set(key, value string) {}func (d *database) Persist(sink raft.SnapshotSink) error {_, _ = sink.Write([]byte{})_ = sink.Close()return nil}func (d *database) Release() {}

测试了hashicorp/raft多个版本都是相同的情况,以当前最新版本v1.7.3分析了下,应该是如下原因导致的:

  1. 集群启动后各个节点都通过BootstrapCluster初始化,并引导集群选举,在node3上可以看见如下日志,说明在选举阶段node3能判断自己不属于Cluster1集群。
[WARN] raft: rejecting appendEntries request since node is not in configuration: from=c1-node1
  1. 但是当Cluster1选举出leader后,node3就可能变成Cluster的成员了,这是因为Cluster1的leader会不断通过心跳向集群内node发送日志,而在这个过程中:
    1. fllower节点是不会判断这个请求的leader是否是自己集群中的设备。
    2. fllower节点只对比请求日志的编号是否比自己本地的大,如果比本地的大,就接收存下来,并将发起请求的leader设置为自己集群的leader。
    3. 同样的,在Cluster2选举出leader后,Cluster2的leader也会向node3不断通过心跳发送日志请求。这就导致node3一会属于Cluster1,一会属于Cluster2

这个过程中的漏洞出在raft节点接收日志修改leader的过程,代码位置为hashicop/raft模块中的raft.go:L1440位置的func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest)函数

修改改函数,增加对请求Leader的ID的判断,则可避免这个问题:

// Ignore an older termif a.Term < r.getCurrentTerm() {return}// yzc add,这里是我们添加的函数,注意,拒绝之后需要返回错误,否则会导致另外一个集群不断重新选举if len(r.configurations.latest.Servers) > 0 && !inConfiguration(r.configurations.latest, ServerID(a.ID)) {r.logger.Warn("rejecting appendEntries request since node is not in configuration","from", ServerID(a.ID))rpcErr = fmt.Errorf("node is not in configuration")return}// Increase the term if we see a newer one, also transition to follower// if we ever get an appendEntries callif a.Term > r.getCurrentTerm() || (r.getState() != Follower && !r.candidateFromLeadershipTransfer.Load()) {// Ensure transition to followerr.setState(Follower)r.setCurrentTerm(a.Term)resp.Term = a.Term}

重新编译运行后,我们看到node3始终保持在Cluster2中,并且可以看到如下日志

[WARN] raft: rejecting appendEntries request since node is not in configuration: from=c1-node1

Cluster1的leader日志中,我们可以看到该leader向node3发送心跳失败的日志:

[DEBUG] raft: failed to contact: server-id=c1-node3 time=1m29.121143167s[ERROR] raft: failed to heartbeat to: peer=127.0.0.3:800 backoff time=1s error="node is not in configuration"

川川籽

这个人很懒...

用户评论 (0)

发表评论

captcha