实验准备
- 实验代码:
git://g.csail.mit.edu/6.824-golabs-2021/src/raft
- 如何测试:
go test -run 2A -race
- 相关论文:Raft Extended Section 5.2
- 实验指导:6.824 Lab 2: Raft (mit.edu)
实验目标
实现Raft算法中Leader Election(RequestVote RPC
)和Heartbeats(AppendEntries RPC
)。确保只有一个Leader被选中,且若无错误该Leader会一直唯一存在,当该Leader下线或发生其他错误导致发出的数据无法被成功接收,则会产生新的Leader来替代。
一些提示
- 参考论文的Figure 2实现相应的结构体和函数。
- 通过
Make()
创建一个后台goroutine,用于一段时间(election timeout
)没收到其他节点的消息时,通过RequestVote RPC
发起选举。 - 尽量保证不同节点的
election timeout
不会让他们在同一时间发起选举,避免所有节点只为自己投票,可以通过设置随机的election timeout
来实现。 - 测试要求Hearbeats频率每秒不高于10次。
- 测试要求New Leader在Old Leader下线后5秒内出现,考虑到一次换届多轮选举的情况(提示3的情况),election timeout应当足够短。
- 论文中对于
election timeout
设定在150ms – 300ms之间,前提是Heartbeat频率远远超过150ms一次。由于提示4的限制,实验中election timeout
应该更大。 - 推荐使用
time.Sleep()
而不是time.Timer
或time.Ticker
来实现定期或延迟行为。 - 不要忘记实现
GetState()
。 - 使用
rf.killed()
判断测试是否关闭了该节点。 - RPC相关结构字段都应使用大写字母开头,这和Go语言的语法有关。
Raft简介
日志被理解为来自客户端的请求序列,在一个集群中,有唯一的一个节点用于接收客户端请求,称为”Leader Node”,为了保证数据的安全性,”Leader Node”的日志应该复制给若干个节点用于备份,称为”Follower Node”。”Follower Node”的日志需要和”Leader Node”保持一致,Raft就是一种为了管理日志复制而产生的一致性算法。
领导者选举
Raft集群通常有奇数个节点,设为N,集群允许N/2个节点失效,在正常情况下,集群有1个Leader和N-1个Follower组成,当Leader失效时,会产生除了Leader和Follower外的第三种身份:Candidate。
Follower在election timeout后,身份转换为Candidate,在获取(N-1)/2个其他节点的选票后,身份转换为Leader。
主要结构
首先是Raft结构,具体的属性在论文的Figure 2中已经给出,此外还需要额外的两个属性。
role
:当前节点的身份。lastRecv
:上一次收到其他节点消息的时间。
被注释的字段在Part A中可以忽略,且在本小节中,为了方便理解,请先忽略currentTerm字段。
type Raft struct {
mu sync.Mutex
peers []*labrpc.ClientEnd// 集群中所有的节点
// persister *Persister
me int// 当前节点在peers中的索引
dead int32// 标记当前节点是否存活lastRecv time.Time
role RolecurrentTerm int
votedFor int
// log []LogEntry// commitIndex int
// lastApplied int// nextIndex []int
// matchIndex []int
}
超时选举
如果当前节点不是Leader,且超过election timeout未收到其他节点的消息,则发起选举。
此处设置election timeout在150ms – 300ms之间。
func (rf *Raft) electionTimeout() time.Duration {
return time.Duration(150 + rand.Int31n(150)) * time.Millisecond
}
发起选举后,身份转换为Candidate,并通过RequestVote RPC获取其余节点的选票。在获取(N-1)/2个其他节点的选票后,身份转换为Leader。成为Leader后,需要立即向其余节点发送心跳,宣告自己的存在。
代码中的注释即论文Figure 2中的逻辑。
func (rf *Raft) elect() {
for !rf.killed() {
if rf.role == Leader || time.Since(rf.lastRecv) < rf.electionTimeout() {
return
}
/* On conversion to candidate, start election. */
rf.role = Candidate
/* Vote for self. */
rf.voteFor = rf.me
voteCount := 1
/* Reset election timer. */
rf.lastRecv = time.Now()
/* Send RequestVote RPCs to all other servers. */
for i, peer := range rf.peers {
if i == rf.me {
continue
}
reply := RequestVoteReply{}
peer.Call("Raft.RequestVote", &RequestVoteArgs{
CandidateId: rf.me,
}, &reply) if reply.VoteGranted {
voteCount++
}
}
/* If votes received from majority of servers: become leader. */
if voteCount > len(rf.peers)/2 {
rf.role = Leader
rf.votedFor = -1
rf.heartbeat()
}
time.Sleep(10 * time.Millisecond)
}
}
Explain 1:如何理解
rf.role == Leader
?Follower和Candidate都可以参加选举,Candidate可以参加的原因在于,选出一个Leader可能不止一轮选举,假设非常不幸,所有节点都在同一时刻发起选举,他们都把自己的选票投给了自己,那么本轮选举将无法选出Leader。
这时候将开启第二轮选举,因此不能限制只有Follower可以参与选举。
发送心跳
不携带日志的日志复制即心跳,Leader通过心跳刷新其余节点的election timeout。Hint 4限制了心跳频率在每秒10次,因此这里让心跳一次后休眠100ms。
func (rf *Raft) heartbeatInterval() {
return 100 * time.Millisecond
}func (rf *Raft) heartbeat() {
for !rf.killed() {
if rf.role != Leader {
return
}
for i, peer := range rf.peers {
if i == rf.me {
continue
}
reply := AppendEntriesReply{}
peer.Call("Raft.AppendEntries", &AppendEntriesArgs{}, &reply)
}
}
time.Sleep(rf.heartbeatInterval())
}
RPC全称Remote Procedure Call,即远程过程调用,通俗的讲就是调用其他节点上的函数。例如
peer.Call("Raft.AppendEntries", &args, &reply)
,就是调用了对应节点的AppendEntries函数,参数是args,返回值保存在reply中。heartbeat是主动,AppendEntries是被动。elect是主动,RequestVote是被动。
RequestVote
Candidate通过远程调用RequestVote,向其他节点索要选票。论文的Figure 2中也给出了RequestVoteArgs和RequestVoteReply的定义。在Part A中不需要关注LastLogIndex和LastLogTerm两个字段。同样的,为了方便理解,请先忽略Term的概念。
type RequestVoteArgs struct {
Term int
CandidateId int
// LastLogIndex int
// LastLogTerm int
}type RequestVoteReply struct {
Term int
VoteGranted bool
}
在一轮投票中,每个节点只有一张选票,Candidate会投给自己,而Follower会投给第一个向他索要选票的Candidate。
代码中的注释即论文Figure 2中的逻辑。
RequestVote中需要刷新election timeout,一次换届多轮选举的情况。
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
reply.VoteGranted = false
rf.lastRecv = time.Now()
/* If votedFor is null or candidateId, grant vote. */
if rf.votedFor == -1 || rf.votedFor == args.CandidateId {
rf.votedFor = args.CandidateId
reply.VoteGranted = true
}
}
Explain 2:如何理解
rf.votedFor == args.CandidateId
?逻辑上来说这个条件是没必要的,去掉这个条件依旧能通过所有测试。我猜测这个条件是为了防止回复的网络包丢失,发送方重传,因此需要接收方再次投出选票。
AppendEntries
Leader通过AppendEntries远程调用,刷新其他节点的election timeout,保证在自己存活期间,不会有其他节点发起选举。论文的Figure 2中也给出了AppendEntriesArgs和AppendEntriesReply的定义。
被注释的字段在Part A中不需要关注,同样的,为了方便理解,请先忽略Term字段。
type AppendEntriesArgs struct {
Term int
// LeaderId int
// PrevLogIndex int
// PrevLogTerm int
// Entries []LogEntry
// LeaderCommit int
}type AppendEntriesReply struct {
Term int
Success bool
}
Hint 2中,收到其他节点的消息,就刷新election timeout。因此RequestVote和AppendEntries中都需要更新rf.lastRecv
。
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
reply.Success = true
rf.lastRecv = time.Now()
}
唯一的Leader
竞选成功的条件为voteCount > len(rf.peers)/2
且每个节点只有一张选票,这保证了最多只有一个节点达到竞选成功条件,保证了Leader的唯一性。
现在考虑唯一的Leader因为某些网络问题导致Leader的心跳无法发出,那么剩余的N-1个节点将会选出新的Leader,剩余的N-1个节点可以继续提供正常的服务。那么如果Old Leader的网络问题因为某些原因恢复了,整个集群将同时出现两个Leader,这样整个集群的日志的一致性就不能保证。
引入任期
Term(任期)解决了可能出现多个Leader的问题。
Term是一个单调递增的整型值,所有节点的Term应保持一致,Term的自增只发生在从Follower到Candidate的转换中,即只有选举的时候,Term才会自增1。
再次考虑上面的问题,当Old Leader恢复后,由于剩余的N-1个节点又经历了至少一次选举,因此剩余的N-1个节点包括New Leader的Term都大于Old Leader的Term,Raft算法规定,任意节点感知到Term更高的节点,将转换为Follower;任意节点感知到Term更低的节点,将忽略对方的消息,并告知对方自己的Term。
这样,当Old Leader收到New Leader的更高Term的心跳时,会将自己的身份转换为Follower,保证了Leader的唯一性。
什么是感知到其他节点?
AppendEntries或RequestVote两个RPC的请求或回复中都包含Term,Old Leader感知到New Leader有两种途径。
- 收到New Leader的心跳,发现AppendEntriesArgs.Term更高。
- 向New Leader或其余节点发送心跳,发现AppendEntriesReply.Term更高。
实验总结
上面的图片就是本文多次提及的论文的Figure 2,我用绿色的线框选了Part A需要实现的部分。
引入Term后的代码本文就不再给出了,参照图片补充剩余的实现就可以,需要注意的是,本文为了简洁代码,省略了数据同步问题,-race
可以暴露出你代码的data race问题,记得为临界资源上锁。
最后,为了证明我不是在乱写,附上我的测试结果。