6.5840 Lab 2 - Key / Value Server

实验指导:6.5840 Lab 2: Key / Value Server

这个实验要实现一个单机的、线形一致的 KV server,并用这个 KV server 实现一个分布式锁。

KV Server / Client

Server

这里的线性一致,强调:

  • 首先 Put / Get 操作是原子的,这意味着:后来的 Get 只能读到 Put 写完的结果,不会读到一个写了一半的;A 和 B 同时 Put 同一个 Key,结果要么是 A 覆盖 B,要么是 B 覆盖 A。在这个实验中可以用互斥锁来实现这个原子性。
  • At-most-once - 每个 Put 操作最多被操作一次。这里的挑战在于网络是不可靠的,我们需要处理丢包重传的情况。对于 client 丢包,retry 就可以解决;对于 server 丢包,需要关注这样一种情况:client 尝试 Put 一组 KV,server 收到了请求并且处理了请求,但是 server 的 reply 丢了,此时 client 做 retry Put 同一组 KV,server 需要识别出这是一个 retry 并且不做处理。这个实验给每个 Put request 制定了一个 version 字段来解决这个问题。

Version 的具体做法是:server 为每个 key 维护一个 version,如果 client 尝试 Put 的 version 和 server 维护的 version 不相符的话,server 返回一个 ErrVersion;每次成功 Put(k, v) 的时候,k 对应的 version 就 +1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
type VersionedValue struct {
Value string
Version rpc.Tversion
}

type KVServer struct {
mu sync.Mutex
store map[string]VersionedValue
}

func MakeKVServer() *KVServer {
kv := &KVServer{}
kv.store = make(map[string]VersionedValue)
return kv
}

func (kv *KVServer) Get(args *rpc.GetArgs, reply *rpc.GetReply) {
kv.mu.Lock()
defer kv.mu.Unlock()

if v, exists := kv.store[args.Key]; exists {
reply.Value = v.Value
reply.Version = v.Version
reply.Err = rpc.OK
} else {
reply.Err = rpc.ErrNoKey
}
}

func (kv *KVServer) Put(args *rpc.PutArgs, reply *rpc.PutReply) {
kv.mu.Lock()
defer kv.mu.Unlock()

v, exists := kv.store[args.Key]
if exists {
goto handle_key_exists
} else {
goto handle_key_not_exists
}

handle_key_exists:
if args.Version == v.Version {
kv.store[args.Key] = VersionedValue{
Value: args.Value,
Version: v.Version + 1,
}
reply.Err = rpc.OK
} else {
reply.Err = rpc.ErrVersion
}
DPrintf("KVServer.Put called with args: %+v. Reply: %+v", args, reply)
return

handle_key_not_exists:
if args.Version == 0 {
kv.store[args.Key] = VersionedValue{
Value: args.Value,
Version: v.Version + 1,
}
reply.Err = rpc.OK
} else {
reply.Err = rpc.ErrNoKey
}
DPrintf("KVServer.Put called with args: %+v. Reply: %+v", args, reply)
}

Client

这里的 Clerk 是客户端和服务器做 RPC 通信的 Proxy,它负责对 RPC 做封装、对网络失败做 retry 以及做 error handling。上层应用只想简单调用 Put(k, v, ver) 和 Get(k),并不关心网络是怎么通信的。

为了防止把服务器打挂,客户端在每次重试之前要 sleep 一段时间。在现实世界中,客户端是不能无限重试的,重试达到一定的次数之后依旧失败的话,可以认为服务器挂掉了;并且在工业级的系统中往往还需要带抖动的指数退避(Exponential Backoff with Jitter)。但是在这个实验中,我们不需要这么做。

对于前文中提到的服务器成功处理请求后 reply 丢包,客户端发起重试,服务端可以通过 version 来判断是否需要执行当前的 Put,客户端收到 ErrVersion。但是客户端其实没有办法分辨这个 ErrVersion 到底是因为其他客户端 Put 了这个 key 还是因为自己 retry 前的 Put 已经被执行了。这个时候,如果客户端第一次 Put 就收到了 ErrVersion,那就说明一定是其他客户端 Put 了这个 key,给应用返回 ErrVersion;否则,重试的时候返回的 ErrVersion 可能是两种情况中的一种,给应用返回 ErrMaybe。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func (ck *Clerk) Get(key string) (string, rpc.Tversion, rpc.Err) {
args := rpc.GetArgs{Key: key}

for {
reply := rpc.GetReply{}
ok := ck.clnt.Call(ck.server, "KVServer.Get", &args, &reply)

if ok && reply.Err == rpc.OK {
return reply.Value, reply.Version, rpc.OK
}

if ok && reply.Err == rpc.ErrNoKey {
return "", 0, rpc.ErrNoKey
}

time.Sleep(100 * time.Millisecond)
}
}

func (ck *Clerk) Put(key, value string, version rpc.Tversion) rpc.Err {
args := rpc.PutArgs{Key: key, Value: value, Version: version}
firstRpc := true

for {
reply := rpc.PutReply{}
ok := ck.clnt.Call(ck.server, "KVServer.Put", &args, &reply)

if ok && reply.Err == rpc.OK {
return rpc.OK
}

if ok && reply.Err == rpc.ErrNoKey {
return rpc.ErrNoKey
}

if ok && reply.Err == rpc.ErrVersion {
if firstRpc {
return rpc.ErrVersion
} else {
return rpc.ErrMaybe
}
}

firstRpc = false

time.Sleep(100 * time.Millisecond)
}
}

分布式锁

下面我们要用这个 KV server 实现一个分布式锁。

这个实验中要实现的分布式锁的本质可以理解为分布式的 spin-lock:通过 CAS 机制来 acquire / release,这一点和 6.1810 xv6 kernel 中的 spin-lock 十分相似。

这里对于一个分布式锁,不光需要记录这个锁是否被 acquired,还有记录谁正在使用这个锁被谁持有,以免发生 A 持有了锁被 B 释放的情况。所以 lock state 应该维护一个持有者的 client ID,在获得锁的时候,用 CAS Put(lock-name, client-id);在释放锁的时候,先判断是否被当前用户持有,如果是的话就把 lock-name 对应的 client-id Put 成空。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
type Lock struct {
ck kvtest.IKVClerk
name string
clientId string
}

const OwnerNone string = ""

func (lk *Lock) Acquire() {
for {
value, version, rpcErr := lk.ck.Get(lk.name)

if rpcErr == rpc.OK {
if value != OwnerNone && value != lk.clientId {
// Leased by others.
goto sleep_and_retry
}

if lk.ck.Put(lk.name, lk.clientId, version) == rpc.OK {
// Acquired.
return
} else {
// Version changed, retry.
goto sleep_and_retry
}
}

if rpcErr == rpc.ErrNoKey {
if lk.ck.Put(lk.name, lk.clientId, 0) == rpc.OK {
// Acquired.
return
} else {
// Version changed, retry.
goto sleep_and_retry
}
}

sleep_and_retry:
time.Sleep(100 * time.Millisecond)
}
}

func (lk *Lock) Release() {
for {
value, version, rpcErr := lk.ck.Get(lk.name)

if rpcErr == rpc.ErrNoKey || value != lk.clientId {
return
}

rpcErr = lk.ck.Put(lk.name, OwnerNone, version)
if rpcErr == rpc.OK {
return
}

// ErrMaybe: retry to ensure the lock is released.
time.Sleep(100 * time.Millisecond)
}
}

起初,我并没有实现这个 unique client ID,获取锁的时候单纯的把 value 设置成了 「locked」,也通过了测试,但是后来发现实验指导里提到使用 kvtest.RandomValue(8) 生成随机 string 作为 client ID 的提示,意识单纯用 「locked」 表示状态是不够的,还需要记录谁持有锁。加了一个 test case:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func TestNonOwnerReleaseReliable(t *testing.T) {
ts := kvsrv.MakeTestKV(t, true)
defer ts.Cleanup()
ck := ts.MakeClerk()

lkA := MakeLock(ck, "test-lock")
lkB := MakeLock(ck, "test-lock")

// Step 1: A acquires the lock
lkA.Acquire()

// Step 2: B (maliciously) calls Release
lkB.Release()

// Step 3: C tries to acquire the lock;
// should be blocked since A is still holding it
lkC := MakeLock(ck, "test-lock")

done := make(chan bool)
go func() {
lkC.Acquire()
done <- true
}()

select {
case <-done:
t.Fatalf("Fatal Error: C acquired lock while A is still holding it! Mutual exclusion failed.")
case <-time.After(1 * time.Second):
// Passed: C is correctly blocked.
}
}

实验结果

KV server:

Lock: