Back

mit-6.824 lab3: RaftKV

实现一个基于Raft的分布式KV存储服务

Lab3: KVRaft

lab链接 https://pdos.csail.mit.edu/6.824/labs/lab-kvraft.html

本次lab中我们需要使用lab2中实现的Raft库来构建一个可容错的 Key/Value 存储服务,要求其对外提供强一致性(Strong consistency)。

这个KV存储服务支持Get/Put/Append三种客户端操作。客户端通过RPC与集群中的leader通信,leader接收到请求后将其包装在一条Raft日志中下放到Raft层进行共识,日志被apply后返回客户端结果。

一些思考

在PartA的描述中提到,leader在将一个请求下放到raft层之后,commit之前宕机,这时它无法回复Clerk。又或者是,这条日志成功commit,但返回的RPC丢失。Clerk在规定时间内没有收到结果,会向另一台主机(可能是新选出的leader)发送RPC,这条日志最终被commit之后又会被应用于状态机,从而状态机执行了两次相同的请求。

这要求我们能够判断重复的请求。因此每个请求都需要被唯一标识,请求中需要加上(ClientID, RequestID),Clerk每次请求成功之后自增RequestID。

我们还需要在遇到重复的请求时直接返回第一次请求时的结果,这需要我们保存每一个Clerk的最后一次请求的结果ClientID -> (RequestID, LastResponse)。只需要保存最后一次请求结果是因为如果服务端收到RequestID = x的RPC,说明这个Clerk已经收到了RequestID为[1, x-1]之间内的所有请求的结果,服务端如果再次收到这个RequestID在区间之内的请求说明该RPC过期,直接丢弃即可。

client

我将3种请求共用了一个RPC,简化了逻辑。

Clerk保存一个leaderID,请求失败了再换另一个server,请求成功了自增requestID。

type Clerk struct {
	servers []*labrpc.ClientEnd
	// You will have to modify this struct.
	leaderID  int
	clientID  int64
	requestID int
}

func (ck *Clerk) Get(key string) string {
	return ck.Command(key, "", OpGet)
}

func (ck *Clerk) Put(key string, value string) {
	ck.Command(key, value, OpPut)
}

func (ck *Clerk) Append(key string, value string) {
	ck.Command(key, value, OpAppend)
}

func (ck *Clerk) Command(key, value string, op Operation) string {
	req := getCommandRequest(key, value, op, int(ck.clientID), ck.requestID)	
	for {
		resp := CommandResponse{}
		if !ck.servers[ck.leaderID].Call("KVServer.Command", &req, &resp) || resp.Err == ErrWrongLeader || resp.Err == ErrTimeout {		
			ck.leaderID = (ck.leaderID + 1) % len(ck.servers)
			continue
		}

		ck.requestID++
		return	resp.Value	
	}
}

server

KVServer的结构体如下:

type KVServer struct {
	mu      sync.RWMutex
	me      int
	rf      *raft.Raft
	applyCh chan raft.ApplyMsg
	dead    int32 // set by Kill()

	maxraftstate int // snapshot if log grows this big

	// Your definitions here.
	persister        *raft.Persister
	waitChs      	 map[int]chan *CommandResponse
	db               map[string]string
	lastSessions     map[int]Session
	lastAppliedIndex int
}

state-machine

本次lab中只需使用一个内存版本的 KV 状态机 map[string]string。

RPC handler

客户端请求来临时,Server端会启动一个协程作为 RPC handler 来处理客户端请求,其中会调用 Raft.Start 函数将请求下放到 Raft 层形成一条日志去做共识。在 Raft 层,每条被commit的日志会按照 index 的顺序写入 applyCh 中,上层必须也按 index 序从applyCh中读出日志并应用于状态机,这样才能保证不同节点上的数据一致。

这要求必须有一个单独的applier协程来循环读applyCh,并应用于状态机。由于来自不同客户端的请求是并发的,如果在RPC handler协程中直接读applyCh无法保证index序。返回给客户端的response要根据日志应用于状态机的结果来生成,这需要我们处理RPC handler和applier协程之间的通信问题。

自然想到使用channel来通信,使用一个 waitChs map (log index -> response) 来记录每一个请求对应的channel。在RPC handler协程将日志下放到Raft层之后,在 waitChs 中注册一个channel并阻塞读,applier协程读出日志,应用于状态机之后生成response写入这个channel。RPC handler协程在规定时间内读出结果则正常返回客户端,若超时则返回超时。

func (kv *KVServer) Command(req *CommandRequest, resp *CommandResponse) {
	// Your code here.
	kv.mu.RLock()
	defer DPrintf("[KVServer %d] reply %+v for Request %+v", kv.me, resp, req)

	DPrintf("[KVServer %d] received Request %+v from Clerk", kv.me, req)
	if req.Op != OpGet && kv.isDuplicatedRequest(req.ClientID, req.RequestID) {
		resp.Err = kv.lastSessions[req.ClientID].Err
		kv.mu.RUnlock()
		return
	}
	kv.mu.RUnlock()

	index, _, isLeader := kv.rf.Start(*req)
	if !isLeader {
		resp.Err = ErrWrongLeader
		return
	}

	DPrintf("[KVServer %d] add command into raft layer [index %d]", kv.me, index)
	kv.mu.Lock()
	ch := kv.getWaitCh(index)
	kv.mu.Unlock()

	select {
	case result := <-ch:
		resp.Value = result.Value
		resp.Err = result.Err

	case <-time.NewTimer(500 * time.Millisecond).C:
		resp.Err = ErrTimeout
	}

	go func() {
		kv.mu.Lock()
		kv.removeWaitCh(index)
		kv.mu.Unlock()
	}()
}

func (kv *KVServer) applier() {
	for !kv.killed() {
		select {
		case applyMsg := <-kv.applyCh:
			if applyMsg.CommandValid {
				command := applyMsg.Command.(CommandRequest)
				
				kv.mu.Lock()
				if applyMsg.CommandIndex <= kv.lastAppliedIndex {
					DPrintf("[KVServer %d] discard out-of-date apply Msg in [index %d]", kv.me, applyMsg.CommandIndex)
					kv.mu.Unlock()
					continue
				}
				
				kv.lastAppliedIndex = applyMsg.CommandIndex
				var response *CommandResponse
				if command.Op != OpGet && kv.isDuplicatedRequest(command.ClientID, command.RequestID) {
					DPrintf("[KVServer %d] received a duplicated command in [index %d]", kv.me, applyMsg.CommandIndex)
					response = kv.lastSessions[command.ClientID].CommandResponse
				} else {
					response = kv.applyCommand(command)
					if command.Op != OpGet {
						kv.lastSessions[command.ClientID] = Session{
							RequestID:       command.RequestID,
							CommandResponse: response,
						}
					}
				}

				if kv.needToSnapshot(applyMsg.RaftStateSize) {
					DPrintf("[KVServer %d] reach maxraftstate, take a snapshot till [index %d]", kv.me, applyMsg.CommandIndex)
					kv.takeSnapshot(applyMsg.CommandIndex)
				}

				if currentTerm, isLeader := kv.rf.GetState(); currentTerm == applyMsg.CommandTerm && isLeader {
					ch := kv.getWaitCh(applyMsg.CommandIndex)
					ch <- response
				}

				kv.mu.Unlock()
			} else {
				kv.mu.Lock()
				DPrintf("[KVServer %d] received a snapshot from raft layer [index %d, term %d]", kv.me, applyMsg.SnapshotIndex, applyMsg.SnapshotTerm)
				if kv.rf.CondInstallSnapshot(applyMsg.SnapshotTerm, applyMsg.SnapshotIndex, applyMsg.Snapshot) {
					kv.applySnapshotToService(applyMsg.Snapshot)
					kv.lastAppliedIndex = applyMsg.SnapshotIndex
				}
				kv.mu.Unlock()
			}
		}
	}
}

有几点需要注意:

  • apply日志时需要防止状态机回滚。在lab2中提到作为follower的节点可能收到leader的install snapshot,将snapshot写入applyCh中,此时读applyCh的顺序是:旧日志1 -> 新快照 -> 旧日志2。应用了新快照之后要避免再次应用旧日志,所以应用快照之后也要更新 lastAppliedIndex,应用日志时要先判断是否 applyMsg.CommandIndex <= kv.lastAppliedIndex。

  • 仅对leader的waitCh进行通知。每个节点在读出日志后都要提交到状态机,且更新lastSessions。但只有leader需要将response写入waitCh。leader可能会在提交日志后失去leader身份,所以在applier中写入response前要先判断。此时RPC handler协程就让其超时。

  • 客户端的非读请求需要两次去重。重复的请求到来时,之前相同的请求可能已经被应用于该节点的状态机,也可能其对应的日志还没被commit。因此需要在RPC handler中调用Start之前以及日志commit之后应用于状态机之前两次去重。

  • leader在调用Start提交日志后去获取waitCh来阻塞读 以及 applier 在commit日志并应用于状态机之后获取waitCh来写入response 这二者之间顺序无法保证。因此channel容量设置为1,先获取channel的协程要负责创建channel,这个过程要加写锁。

func (kv *KVServer) getWaitCh(index int) chan *CommandResponse {
	ch, ok := kv.waitChs[index]
	if !ok {
		ch := make(chan *CommandResponse, 1)
		kv.waitChs[index] = ch
		return ch
	}

	return ch
}

snapshot

part B中要求我们在raft state size达到阈值时给raft层下发快照。快照中不仅需要包含KV状态机,还需要包含lastSessions客户端请求去重表。由于快照是和lastIncludeIndex对应的,所以需要由applier协程在将对应的index的日志应用于状态机后继续阻塞的生成快照。

测试结果

3A

Test: one client (3A) ...
  ... Passed --  15.1  5  9881  941
Test: ops complete fast enough (3A) ...
  ... Passed --  15.5  3  7032    0
Test: many clients (3A) ...
  ... Passed --  15.6  5 11160 1171
Test: unreliable net, many clients (3A) ...
  ... Passed --  16.2  5  7462  901
Test: concurrent append to same key, unreliable (3A) ...
  ... Passed --   1.5  3   282   52
Test: progress in majority (3A) ...
  ... Passed --   1.1  5   108    2
Test: no progress in minority (3A) ...
  ... Passed --   1.0  5   184    3
Test: completion after heal (3A) ...
  ... Passed --   1.0  5    63    3
Test: partitions, one client (3A) ...
  ... Passed --  22.5  5  9323  643
Test: partitions, many clients (3A) ...
  ... Passed --  23.3  5 18230  837
Test: restarts, one client (3A) ...
  ... Passed --  22.0  5 12689  822
Test: restarts, many clients (3A) ...
  ... Passed --  22.9  5 24109 1144
Test: unreliable net, restarts, many clients (3A) ...
  ... Passed --  25.7  5  9398  873
Test: restarts, partitions, many clients (3A) ...
  ... Passed --  30.3  5 19984  848
Test: unreliable net, restarts, partitions, many clients (3A) ...
  ... Passed --  29.9  5  7443  622
Test: unreliable net, restarts, partitions, random keys, many clients (3A) ...
  ... Passed --  32.9  7 14185  904
PASS
ok  	6.824/kvraft	277.625s

3B

Test: InstallSnapshot RPC (3B) ...
  ... Passed --   3.4  3  2151   63
Test: snapshot size is reasonable (3B) ...
  ... Passed --   2.8  3  5727  800
Test: ops complete fast enough (3B) ...
  ... Passed --   3.2  3  6962    0
Test: restarts, snapshots, one client (3B) ...
  ... Passed --  21.2  5 46519 4431
Test: restarts, snapshots, many clients (3B) ...
  ... Passed --  21.6  5 53053 4531
Test: unreliable net, snapshots, many clients (3B) ...
  ... Passed --  15.9  5 11057 1341
Test: unreliable net, restarts, snapshots, many clients (3B) ...
  ... Passed --  22.3  5 12478 1412
Test: unreliable net, restarts, partitions, snapshots, many clients (3B) ...
  ... Passed --  29.6  5  8653  760
Test: unreliable net, restarts, partitions, snapshots, random keys, many clients (3B) ...
  ... Passed --  31.6  7 25416 1802
PASS
ok  	6.824/kvraft	151.660s
Built with Hugo
Theme Stack designed by Jimmy