YoloKokura

NOTE:本文为6.824(分布式系统)Lab 2的回顾,实验要求见这里。因为要遵守课程的Collaboration Policy,所以本文不会分享任何实现细节的代码(可能还是会有一些逻辑性的简单代码帮助阐明思路)。

Lab 2要求我们用Go语言实现Raft算法。客户将与Raft集群中的Leader节点通信,其操作会被记录到Leader节点的Log中。集群中的若干Raft节点可以通过RPC通信,以确保Leader节点的Log能够复制到集群中的大部分节点中,从而实现容错。

课程提供了不少学习资料:

  • Raft论文:这篇论文对算法中关键API进行了十分详尽的描述,尤其是Figure 2和Figure 13,在编码时应该反复琢磨。事实上,大多数编码的bug都是因为没有完全按照论文的思路来。
  • Raft动画:这个动画可以帮助我们理解Raft算法的执行过程,我在实现Leader Election的时候反复参考了这个动画。
  • Students' Guide to Raft:这是TA写的一篇博客,对我们在做Lab时可能踩的坑都做了说明。

你可能和我一样,这么多资料读下来直接被overwhelmed了,anyway,最根本的一点是,要对原论文的思路保持敬畏,不要有意无意地对算法细节进行“优化”。

下面我们从4个子Lab的顺序回顾一下实现过程。

Part 2A: leader election

在一个Raft集群中,只有Leader节点能接收客户端的请求,并向集群中的其他节点复制Log。当Leader节点宕机后,集群中的其他节点需要能意识到这一点,并选举出新的Leader节点。由于之前的Leader能够将Log复制到大多数节点中,所以新的Leader节点保有之前的Log,从外界看,集群的状态是连续的。

我们首先看看Figure 2中对Raft节点状态的描述:

我们当前只关注和选举有关的状态:

  • currentTerm:当前任期(我们后面还是称Term),Raft节点通过Term判断整个集群的状态,Term越大则代表该节点处于更新的状态。其他节点如果在通信过程中遇到了Term更新的节点,则必须无条件接收新的Term。同时,只有Term最大的节点有资格被选举为Leader。
  • votedFor:在每一轮选举中,一个节点只能投出一票,当它遇到了下一个向它争取选票的节点时,如果它和候选者Term相等,则首先检查这个变量看是否已经投票,如果没有,则投给这个节点。(如果Term不相等,则根据两者Term大小,如果候选者Term更大,则更新自己的Term,然后投票给候选者;反之忽视这次通信,在回复中写入自己的Term)
  • phase:这个变量不在论文的描述中,但是在实现中很有用。

Raft节点的状态转移

Raft节点在全过程中会在如下几个状态切换:

go
const (
    FOLLOWER = iota
    CANDIDATE
    LEADER
)

原论文中有这样一个状态转移图,同时前面提到的动画对这个过程的描述也很生动。

  • 在初始状态下,所有节点都是FOLLOWER,FOLLOWER状态的节点希望能每隔一段时间就收到LEADER的AppendEntries请求,即心跳包。
  • 如果在这段时间没有收到心跳包,则认为LEADER宕机,它将自己的Term加1,变成CANDIDATE状态,向其它节点发送RequestVote请求,争取选票。
  • 其它的节点(不论状态),在收到RequestVote后,如果自己的Term更大,则拒绝投票,并将自己的Term写入回复,否则首先检查自己的votedFor,如果没有投票,则投给候选者,并将自己的Term写入回复。
  • 如果候选者收到了大多数节点的投票,则成为LEADER,否则继续保持CANDIDATE状态,直到下一轮选举。
  • 如果一个节点在收到的消息中发现,对方有更大的Term,则它无条件变为FOLLOWER,同时更新自己的Term。

在实现过程中,每个节点需要保留一个electionTimeout状态,每当FOLLOWER收到心跳包时,就重置这个状态,如果在这个状态内没有收到心跳包,则变为CANDIDATE。同理,CANDIDATE在超时之后也将Term加1,并发起一轮新的选举。LEADER则不用管这个状态。

总结起来,在选举过程中有两个时间变量需要考虑:

  1. electionTimeout:FOLLOWER和CANDIDATE的超时时间,如果在这个时间内没有收到心跳包,则变为CANDIDATE,发起新的选举。
  2. heartbeatInterval:LEADER的心跳包发送间隔。

在实现过程中,我每隔50ms检查一次当前状态,如果是LEADER,就向其它节点发送心跳包,如果是FOLLOWER或CANDIDATE,则检查electionTimeout,如果超时,则变为CANDIDATE,发起新的选举。

心跳机制

我们首先关注已经存在一个LEADER,并且不会出现LEADER宕机的情况,这样我们就只需要实现AppendEntries:

该函数有两重作用:

  1. 作为心跳包,用于维持LEADER的地位。
  2. 用于复制Log,当客户端向LEADER发送请求时,LEADER会将请求写入自己的Log,并向集群中的其他节点发送AppendEntries请求,要求它们也将这条Log写入自己的Log中。

为了不让问题复杂化,此时我们只关心第一个作用。我在Raft节点启动时,设置一个独立的goroutine来处理心跳包的发送和electionTimeout的检查,这样就不会影响到其他的函数调用。

go
func (rf *Raft) ticker() {
    for !rf.killed {
        time.Sleep(50 * time.Millisecond)
        rf.mu.Lock()
        if rf.phase == LEADER {
            rf.replicateLog(true)
        }
        if time.Now().After(rf.electionTimeout) {
            rf.startElection()
        }
        rf.mu.Unlock()
    }
}

这里的replicateLog函数实际上综合了前面所提的两个作用,参数表明此时是发送心跳包还是复制Log。对于LEADER而言,在其视角中,其它的Raft节点都是FOLLOWER,因此它只需要遍历rf.peers,向每个节点发送AppendEntries请求即可。在这个过程中,LEADER会将心跳包所需的参数填入请求中,Figure 2中其它的参数可以简单设置零值,此时还不用管,注意发送RPC请求应该是一个并发的过程,每一次发送都在一个新的goroutine中进行。完成发送之后,LEADER会检查每个节点的回复,这个阶段只需要像所有节点一样,检查Term的大小即可:

  1. 回复中的Term更大:LEADER变为FOLLOWER,更新自己的Term,并且重新设置electionTimeout
  2. 回复中的Term更小:忽略这个回复。
  3. 两者相等:这是一次正常的收发。

对于FOLLOWER而言,它同样进行检查Term的过程,如果是合法的TERM(不考虑选举,则始终相等),就重新设置electionTimeout即可。

实现选举过程

现在我们增加难度,考虑LEADER可能宕机的情况:

如果一个FOLLOWER在electionTimeout内没有收到心跳包,则认为LEADER宕机,它将自己的Term加1,变成CANDIDATE状态,,重新设置electionTimeout,并向其它节点发送RequestVote请求,争取选票。

当对方回复后,我们需要检查两件事:

  1. Term是否合法:这和前面的Term检查逻辑一致。
  2. 此时该节点是否仍然是CANDIDATE:有可能集群中多个节点同时发起选举,而该节点收到了Term更大的CANDIDATE的RequestVote,自动变回了FOLLOWER,此时它应该忽略自己之前争取的投票结果,

我使用一个voteCounter变量来记录得票数,此外,为了确保从CANDIDATE到LEADER的转变只发生一次(超过半数即转变,但是超过半数后可能还会收到新的赞成票),我使用了sync.Once

go
func (rf *Raft) startElection() {
    ...
    var becomeLeader sync.Once
    voteCounter := 1 // vote for itself

    for i := range rf.peers {
        if i == rf.me {
            continue
        }

        go rf.candidateRequestVote(i, &args, &voteCounter, &becomeLeader)
    }
}

func (rf *Raft) candidateRequestVote(server int, args *RequestVoteArgs, voteCounter *int, becomeLeader *sync.Once) {
    ... 

    if reply.VoteGranted {
        *voteCounter++
        if *voteCounter > len(rf.peers)/2 {
            becomeLeader.Do(func() {
                ... // become LEADER
            })
        }
    }
}

另外,electionTimeout需要用随机数重置,这么做是为了防止多个节点同时发起选举,均未获得过半选票,又同时等待相同时间再选举的活锁现象。

Part 2B: log

在这一部分我们需要实现Log的复制,这也是Raft的核心功能。LEADER需要保存用户操作,并将其复制给其他节点。当大部分节点都复制了这条Log后,LEADER才能将其应用到状态机中,这样才能保证集群中的所有节点都有相同的状态。在一些场景中,LEADER可能由于网络原因暂时失联,但是仍然可以保存用户操作,当它重新连接到集群,需要能在不丢失数据的情况下被重新选举为LEADER,并将这些操作复制给其他节点。这意味着我们需要在Part 2A的选举功能上额外考虑用户操作的时效性。

在该阶段,用户操作日志可以直接保存到一个Log数组中,需要注意的是论文中log数组的索引从一开始,在编程时需要对数组索引进行转换。

在Lab中,用户通过Start函数向Raft节点发送请求,大致逻辑如下:

  1. 如果当前节点不是LEADER,则返回false。
  2. 初始化Log,将其加入到Log数组中。
  3. LEADER尝试复制Log,与此同时向客户端返回预期的Log索引。

复制Log实际上使用的是前面提到的replicateLog函数,编码的时候我们很容易想要将心跳包的发送和Log的复制分开,但是这样会加重编码负担,而且在后面心跳包中也会检测Log的复制情况,两者的逻辑大致相似,不如直接写到一起。

每个Log包含以下信息:

go
type Log struct {
    Command interface{}
    Term    int
    Index   int
}

Command是用户操作,Term是该Log所在的Term,Index是该Log在Log数组中的索引。Raft使用Term和Index来标识一个Log,Term可以用来检测Log是否连续(想象一下如果一个节点在中间很多Term中都没有Log,可能是因为网络原因导致它有很长时间脱离集群,这个时候需要从它最早的没有冲突的Log位置开始复制),Index用来标识Log在数组中的位置,此时的实现可能是简单的索引值加1,但是后面当我们引入Log Compaction后,Log的索引将会重写。

区分commit和apply

我们回头看下Raft节点需要保存的状态:

此时我们需要弄清楚commitIndex和lastApplied的区别,或者commit和apply的区别,我在最开始做Lab时也对此感到疑惑,但是实际上:

  • commit代表一个当前节点已经记录的Log。
  • apply代表一个当前节点已经应用到状态机的Log。

当LEADER发现大多数节点都已经commit了某个Log,则可以将其apply到状态机中。在Lab的实现中,apply的Log会被放入一个applyCh通道中通知上层应用,这是通过一个条件变量实现的。

go
func (rf *Raft) apply() {
    rf.applyCond.Broadcast()
}

func (rf *Raft) applier() {
    rf.mu.Lock()
    defer rf.mu.Unlock()

    for !rf.killed() {
        if rf.commitIndex > rf.lastApplied && rf.lastLogIndex() > rf.lastApplied {
            ... // write applyCh
        } else {
            rf.applyCond.Wait()
        }
    }
}

复制日志

LEADER通过nextIndexmatchIndex来记录其他节点的复制情况,nextIndex代表下一个需要复制的Log的索引,matchIndex代表已经复制的Log的索引。两者在每次选出新LEADER后重新初始化。

LEADER将根据nextIndex向FOLLOWER发送log,如果正常返回,则将更新nextIndexmatchIndex,否则将nextIndex减一,重新发送。LEADER会不断重复发送,直到FOLLOWER接收到Log。每次log复制完成后,LEADER将检查matchIndex,如果大多数节点的matchIndex都大于commitIndex,则将commitIndex更新为matchIndex中的最小值,并且将该Log应用到状态机中。

可见,matchIndex代表和LEADER中log一致的最新log索引,nextIndex代表下一个要发送的log位置。另外,在检查matchIndex时,还需要限定matchIndex的Term必须和当前Term一致,这是为了防止在选举过程中,新的LEADER将旧的Log应用到状态机中,见Figure 8,即当前任期的LEADER无法确定之前的log是否成功复制到了大多数节点。

Part 2C: persistence

这部分主要实现Raft节点状态持久化,使得节点重启后也能够恢复到之前的状态。这部分在编码上难度不大,只需要利用在persost函数和readPersist函数中利用Lab特供的gob库进行编码解码即可。参与编码解码的数据为之前状态图中的持久状态,每次状态改变时即调用persist函数存档,当一个Raft节点重启时,将在Make函数中调用readPersist函数恢复状态。

Part 2C 还额外要求对日志复制进行优化,相关内容在原论文中的表述为:

If desired, the protocol can be optimized to reduce the number of rejected AppendEntries RPCs. For example, when rejecting an AppendEntries request, the follower can include the term of the conflicting entry and the first index it stores for that term. With this information, the leader can decrement nextIndex to bypass all of the conflicting entries in that term; one AppendEntries RPC will be required for each term with conflicting entries, rather than one RPC per entry. In practice, we doubt this optimization is necessary, since failures happen infrequently and it is unlikely that there will be many inconsistent entries.

这其实是说之前发现冲突时,LEADER会将nextIndex减一,即每次退回一步重新尝试发送log,但是这样做的效率很低,可能要回退多次才能到达两个节点一致的日志位置,因此不如让FOLLOWER返回两者第一次发生冲突的位置,LEADER直接从这个位置开始发送即可。

Part 2D: log compaction

一个长时间运行的Raft服务可能包括大量的log数据,将这些数据全部放在内存中某个数组里显然是不现实的,因此需要对log进行压缩,即将一些旧的log持久化,这样log数组就可以删除这些log。另外,LEADER还必须将保存的这部分快照发送给FOLLOWER,如果FOLLOWER落后太多,这样还能加速FOLLOWER的同步过程。为了实现这个功能,我们需要在每个Raft节点中加入lastSnapshotIndexlastSnapshotTerm状态,用于记录当前快照中的最新log信息,方便在发送快照时进行判断。

修改log数组结构

可以预见,一旦我们删除log数组中较旧的一部分元素,就会破坏利用index定位log的机制,我的想法是用一个offset值记录数组中第一个log在整个逻辑log数组中的位置,之后利用index-offset定位log。这样,当我们删除log数组中的一部分时,只需要更新offset即可,而不需要对整个数组进行移动。除此以外,还可以给新的结构增加一些工具性的函数,比如获取最后一个log元素的Term或者Index,删除某个位置之前的所有log等。

完成设计之后我们需要仔细查看之前的代码,用新的结构替换原来的log数组。

快照发送

Lab提供了Snapshot接口供应用层调用,也就是说,将何时压缩日志的选择权交给了用户,我们因此只需要关注如何实现快照发送即可。

论文中考虑的是快照容量很大,需要切片分块发送的场景,因此需要offsetdone两个参数来提示FOLLOWER当前接收进度。Lab中只要求我们一次将快照发送完毕,因此不需要做上述考虑。

当LEADER发送AppendEntries请求复制日志时,它首先判断nextIndexlastSnapshotIndex的大小关系,如果将要发送的log在快照之前,说明FOLLOWER落痕太多,则这一轮发送改为发送InstallSnapshot请求(也可以实现为发送快照之后紧接着发送一次快照之后的日志),下次则进入正常的日志复制流程。收到应答之后仍然像正常的日志复制一样检查Term合法性,然后更新nextIndexmatchIndex

FOLLOWER在收到快照后同样进行合法性检查,然后根据内存中的数组和快照中保存的数组的重叠情况,对内存中数组进行裁剪即可。此外,Lab要求我们在收到快照后,向应用层返回一个快照版本的ApplySmg

Tags: