实验指导:6.5840 Lab
2: Key / Value Server
这个实验要实现一个单机的、线形一致的 KV server,并用这个 KV server
实现一个分布式锁。
KV Server / Client
Server
这里的线性一致,强调:
首先 Put / Get 操作是原子的,这意味着:后来的 Get 只能读到 Put
写完的结果,不会读到一个写了一半的;A 和 B 同时 Put 同一个
Key,结果要么是 A 覆盖 B,要么是 B 覆盖
A。在这个实验中可以用互斥锁来实现这个原子性。
At-most-once - 每个 Put
操作最多被操作一次。这里的挑战在于网络是不可靠的,我们需要处理丢包重传的情况。对于
client 丢包,retry 就可以解决;对于 server
丢包,需要关注这样一种情况:client 尝试 Put 一组 KV,server
收到了请求并且处理了请求,但是 server 的 reply 丢了,此时 client 做
retry Put 同一组 KV,server 需要识别出这是一个 retry
并且不做处理。这个实验给每个 Put request 制定了一个 version
字段来解决这个问题。
Version 的具体做法是:server 为每个 key 维护一个 version,如果 client
尝试 Put 的 version 和 server 维护的 version 不相符的话,server 返回一个
ErrVersion;每次成功 Put(k, v) 的时候,k 对应的 version 就 +1。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 type VersionedValue struct { Value string Version rpc.Tversion } type KVServer struct { mu sync.Mutex store map [string ]VersionedValue } func MakeKVServer () *KVServer { kv := &KVServer{} kv.store = make (map [string ]VersionedValue) return kv } func (kv *KVServer) Get (args *rpc.GetArgs, reply *rpc.GetReply) { kv.mu.Lock() defer kv.mu.Unlock() if v, exists := kv.store[args.Key]; exists { reply.Value = v.Value reply.Version = v.Version reply.Err = rpc.OK } else { reply.Err = rpc.ErrNoKey } } func (kv *KVServer) Put (args *rpc.PutArgs, reply *rpc.PutReply) { kv.mu.Lock() defer kv.mu.Unlock() v, exists := kv.store[args.Key] if exists { goto handle_key_exists } else { goto handle_key_not_exists } handle_key_exists: if args.Version == v.Version { kv.store[args.Key] = VersionedValue{ Value: args.Value, Version: v.Version + 1 , } reply.Err = rpc.OK } else { reply.Err = rpc.ErrVersion } DPrintf("KVServer.Put called with args: %+v. Reply: %+v" , args, reply) return handle_key_not_exists: if args.Version == 0 { kv.store[args.Key] = VersionedValue{ Value: args.Value, Version: v.Version + 1 , } reply.Err = rpc.OK } else { reply.Err = rpc.ErrNoKey } DPrintf("KVServer.Put called with args: %+v. Reply: %+v" , args, reply) }
Client
这里的 Clerk 是客户端和服务器做 RPC 通信的 Proxy,它负责对 RPC
做封装、对网络失败做 retry 以及做 error handling。上层应用只想简单调用
Put(k, v, ver) 和 Get(k),并不关心网络是怎么通信的。
为了防止把服务器打挂,客户端在每次重试之前要 sleep
一段时间。在现实世界中,客户端是不能无限重试的,重试达到一定的次数之后依旧失败的话,可以认为服务器挂掉了;并且在工业级的系统中往往还需要带抖动的指数退避(Exponential
Backoff with Jitter)。但是在这个实验中,我们不需要这么做。
对于前文中提到的服务器成功处理请求后 reply
丢包,客户端发起重试,服务端可以通过 version 来判断是否需要执行当前的
Put,客户端收到 ErrVersion。但是客户端其实没有办法分辨这个 ErrVersion
到底是因为其他客户端 Put 了这个 key 还是因为自己 retry 前的 Put
已经被执行了。这个时候,如果客户端第一次 Put 就收到了
ErrVersion,那就说明一定是其他客户端 Put 了这个 key,给应用返回
ErrVersion;否则,重试的时候返回的 ErrVersion
可能是两种情况中的一种,给应用返回 ErrMaybe。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 func (ck *Clerk) Get (key string ) (string , rpc.Tversion, rpc.Err) { args := rpc.GetArgs{Key: key} for { reply := rpc.GetReply{} ok := ck.clnt.Call(ck.server, "KVServer.Get" , &args, &reply) if ok && reply.Err == rpc.OK { return reply.Value, reply.Version, rpc.OK } if ok && reply.Err == rpc.ErrNoKey { return "" , 0 , rpc.ErrNoKey } time.Sleep(100 * time.Millisecond) } } func (ck *Clerk) Put (key, value string , version rpc.Tversion) rpc .Err { args := rpc.PutArgs{Key: key, Value: value, Version: version} firstRpc := true for { reply := rpc.PutReply{} ok := ck.clnt.Call(ck.server, "KVServer.Put" , &args, &reply) if ok && reply.Err == rpc.OK { return rpc.OK } if ok && reply.Err == rpc.ErrNoKey { return rpc.ErrNoKey } if ok && reply.Err == rpc.ErrVersion { if firstRpc { return rpc.ErrVersion } else { return rpc.ErrMaybe } } firstRpc = false time.Sleep(100 * time.Millisecond) } }
分布式锁
下面我们要用这个 KV server 实现一个分布式锁。
这个实验中要实现的分布式锁的本质可以理解为分布式的 spin-lock:通过
CAS 机制来 acquire / release,这一点和 6.1810 xv6 kernel 中的 spin-lock
十分相似。
这里对于一个分布式锁,不光需要记录这个锁是否被
acquired,还有记录谁正在使用这个锁被谁持有,以免发生 A 持有了锁被 B
释放的情况。所以 lock state 应该维护一个持有者的 client
ID,在获得锁的时候,用 CAS Put(lock-name,
client-id);在释放锁的时候,先判断是否被当前用户持有,如果是的话就把
lock-name 对应的 client-id Put 成空。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 type Lock struct { ck kvtest.IKVClerk name string clientId string } const OwnerNone string = "" func (lk *Lock) Acquire () { for { value, version, rpcErr := lk.ck.Get(lk.name) if rpcErr == rpc.OK { if value != OwnerNone && value != lk.clientId { goto sleep_and_retry } if lk.ck.Put(lk.name, lk.clientId, version) == rpc.OK { return } else { goto sleep_and_retry } } if rpcErr == rpc.ErrNoKey { if lk.ck.Put(lk.name, lk.clientId, 0 ) == rpc.OK { return } else { goto sleep_and_retry } } sleep_and_retry: time.Sleep(100 * time.Millisecond) } } func (lk *Lock) Release () { for { value, version, rpcErr := lk.ck.Get(lk.name) if rpcErr == rpc.ErrNoKey || value != lk.clientId { return } rpcErr = lk.ck.Put(lk.name, OwnerNone, version) if rpcErr == rpc.OK { return } time.Sleep(100 * time.Millisecond) } }
起初,我并没有实现这个 unique client ID,获取锁的时候单纯的把 value
设置成了 「locked」,也通过了测试,但是后来发现实验指导里提到使用
kvtest.RandomValue(8) 生成随机 string 作为 client ID 的提示,意识单纯用
「locked」 表示状态是不够的,还需要记录谁持有锁。加了一个 test
case:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 func TestNonOwnerReleaseReliable (t *testing.T) { ts := kvsrv.MakeTestKV(t, true ) defer ts.Cleanup() ck := ts.MakeClerk() lkA := MakeLock(ck, "test-lock" ) lkB := MakeLock(ck, "test-lock" ) lkA.Acquire() lkB.Release() lkC := MakeLock(ck, "test-lock" ) done := make (chan bool ) go func () { lkC.Acquire() done <- true }() select { case <-done: t.Fatalf("Fatal Error: C acquired lock while A is still holding it! Mutual exclusion failed." ) case <-time.After(1 * time.Second): } }
实验结果
KV server:
Lock: