MIT6.824 2020 Lab3A&B Fault-Tolerance KV Service with Log Compaction

Lab 3A

实现一个 Fault Tolerance KV Service.

Tasks

  1. 实现 KV server 的 PutAppendGet RPC。
  2. Clerk 可以向 KV server 发送 PutAppend 以及 Get 请求。
  3. KV server 通过 StartOp 来附加 Raft 的日志。
  4. KV server 通过 ApplyCh 来获取 Raft 已经应用的指令。

Hints

  1. Clerk 的请求只有发送给 Leader 才是有效的,对于非 Leader 的节点需要立即返回错误。
  2. Clerk 中的所有请求需要不断重试,直到 KV Server 正确返回响应。
  3. Clerk 在每次请求成功后,需要记录上一次成功请求的 Leader Server ID,这样在下一次请求中就不必浪费时间找 Leader。
  4. 所有的 Clerk 请求只有在取得大多数节点共识的情况下(raft applied),才能在 KV Server 中返回。
  5. 即便在网络分区的情况下,也需要保证客户端的每个请求仅会执行一次。

Implementation

Clerk

Clerk 的工作是实现 Put, Append, Get 这三个接口,向 KV Server 不断发送对应的请求,直到 Server 返回成功。
Clerk 需要额外记录的信息包括自身的唯一标识 ClientID,请求的自增序号 SerialID ,还有目前的 server LeaderID

https://tva1.sinaimg.cn/large/008i3skNgy1gy6co57izej30800aq0st.jpg

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (ck *Clerk) sendRequestToServer(op string, key string, value string, serverID int) (string, bool) {
success := true
result := ""

ck.mu.Lock()
args := Args{
Op: op,
Key: key,
Value: value,
ClientID: ck.clientID,
SerialID: ck.serialID,
}
reply := Reply{}
ck.mu.Unlock()

err := RpcCallWithTimeout(ck.servers[serverID], RpcNameKVRequest, &args, &reply, rpcTimeoutLimit)
if err != nil || reply.Err == ErrWrongLeader {
success = false
} else if reply.Err == OK {
result = reply.Value
}
return result, success
}

KV Server

KV Server 的工作是实现 GetPutAppend RPC,使 Clerk 发送过来的请求能够在集群中达成共识并将操作应用到本地的存储中,并给 Clerk 返回取得共识的响应。

KV Server 需要处理的问题:

  1. 保证同一个 Clerk 同一个 SerialID 的请求只会执行一次。
  2. 只有发送给 Leader 的请求才是有效的。
  3. 只有在操作取得共识的情况下,才能返回给 Clerk。

解决方法:

  1. 我们在 Server 维护一个 map[ClientID]SerialID 的数据结构,记录每个 ClientID 最大的 SerialID,新进来的请求的 SerialID > map 中的对应 SerialID 时,我们才将在 local storage 中执行对应操作。这里要注意我们虽然在 request 进来的时候会做一遍检查,但是更新 map[ClientID]SerialID 的时机是在操作完 local storage 的时候,因此我们在操作 local storage 之前也需要做一遍检查。
  2. Server 调用 raft.Start() 的时候,会返回是否为 Leader,这个时候如果非 Leader 直接返回即可。
  3. 这个稍微复杂,我们需要维护一个 map[logIndex]chan Op 的数据结构。
    • handleEvent goroutine:当 raft.Start() 返回的时候会有一个 logIndex,我们将它初始化,并等待 channel 中有结果产出。
    • applyCh goroutine:监听 raft 的 applyCh,当 raft 有 op applied 时,对其进行处理,然后将结果放入对应 logIndex 的 channel 中,通知 handleEvent goroutine 能够返回结果了。

https://tva1.sinaimg.cn/large/008i3skNgy1gy78xc6jmyj30gq0hj3z5.jpg

踩过的坑点

  1. 需要格外注意 channel 的关闭和初始化,这里可能会有 data race,goroutine 泄漏,goroutine 阻塞等情况。让我 debug 了好久。
  2. Clerk 请求以及 waitForOp 这两个操作都需要设置一定的超时时间。超时时间的设置会影响程序同时运行的 goroutine 数量,可能需要调参 :(。

Lab 3B

实现带 Log Compaction 的 Fault Tolerance KV Service.

Tasks

  1. 修改 Raft,使其能够根据一个给定的 index,丢弃 index 之前的日志,并在只保存这个 index 之后的日志的情况下,能够继续正常工作。
  2. 修改 KV server,使其能够在检测到 Raft 持久化的状态过大的情况下,给 raft 一个 snapshot 并告诉 raft 丢弃旧的日志,raft 可以通过 persister.SaveStateAndSnapshot() 来持久化 snapshot。 KV server 在重启的时候需要从 persister 来恢复 snapshot。
  3. 修改 Raft,使 Leader 能够在发现 Follower 需要的日志已经被丢弃的时候,发送 InstallSnapshot RPC 给 Follower。Follower 在接收到 InstallSnapshot RPC 的时候,需要将 snapshot 传给其 KV Server。可以在 applyMsg 中添加新的信息,并通过 applyCh 完成这个工作。

Implementation

KV Server

KV Server 需要做的工作:

  1. 不断轮询 raft 的状态大小是否超过限制,若超过限制则开始做 snapshot。snapshot 中的信息需要包括 store 和 map[ClientID]SerialID
  2. applyCh 中获取 Op 的时候需要判断是否为 snapshot,若为 snapshot 则用 snapshot 来更新自身的 store 和 map[ClientID]SerialID。

Raft

Raft 需要做的工作:

  1. 提供一个接口给 KV Server,使其能够做 snapshot。该接口需要:
    • 实现日志压缩。
    • 将 KV Server 传过来的 snapshot 以及 Raft 本身的状态通过 persister.SaveStateAndSnapshot() 持久化。
  2. 由于日志压缩,旧的日志会丢失,所以我们无法直接通过 index 获取日志,需要考虑偏移量 rf.lastIncludedIndex。强烈建议先把这部分工作先单独做了,测通 lab2B,否则后面可能各种问题混在一起很难 debug。这里我的实现中有一个 trick 是,空日志用 LogEntries{&LogEntry{Command: nil, Term: lastIncludedTerm}} 来初始化,这样有效的 index 就从 1 开始,和 raft 论文的描述是一致的,而且对于 term 的判断也很简单,实现起来比较方便。
  3. 初始化的时候需要将 snapshot 通过 ApplyCh 发送给 KV Server。

doSnapshot

InstallSnapshot RPC

发送方:当 leader 丢失 follower 日志时,需要发送 InstallSnapshot RPC。这里可以使用 rf.nextIndex[followerID] < rf.lastIncludedIndex 来判断是否丢失。

接收者:这里需要判断 snapshot 包含的最后一个 logIndex 与 follower 日志的关系决定对日志进行什么操作。操作完日志后,需要将 snapshot 通过 applyCh 发送给 KV Server。

  1. args.LastIncludedIndex>rf.LastLogIndex,代表 leader 的 snapshot 完全包含了该 follower 的日志,因此可以把 follower 的日志都覆盖。
  2. args.LastIncludedIndex<rf.LastLogIndex && args.LastIncludedTerm!=rf.log[args.LastIncludedIndex].Term,说明 follower 的日志和 leader 发生冲突,因此也将 follower 的日志都覆盖。
  3. 以上两个条件都不满足,说明 follower 有部分日志是 snapshot 没有的并且可能是一致的,那么就截断 follower 在 args.LastIncludedIndex 之前的日志,后面的日志都保留。

踩过的坑点

  1. 日志 index 的偏移计算需要封装好,这样实现起来比较方便,这里折腾了不少时间。index 计算修改完后先跑过 Lab2B 的 test,能减少后面 debug 成本。
  2. KV Server 在初始化的时候需要从 raft 读取 persister 里面的 snapshot,raft 会通过 applyCh 来将 snapshot 传递给 KV Server。由于这两者是需要在同一个 goroutine 中处理的,所以 applyCh 的缓冲大小需要改为 1,否则会发生阻塞.

小结

至此 MIT6.824 Lab1-3 终于完结了,Lab4 暂时先不会去做了。做这个课程断断续续地做了好久,自己业余浪费不少时间在摸鱼了QuQ,后面为了赶年度任务才 push 自己做完的。
在这个过程中,至少是熟悉了 Raft 的大部分细节,以及如何基于 Raft 搭建简单的分布式应用。同时也让自己保持一种学习的状态吧。继续加油~