课程介绍
Distributed Systems课程一共包含Lab1-4共4个大作业,Lab1是实现Mapreduce原型,Lab2-4是实现Raft以及基于Raft实现分布式KV存储。
本次实现Raft,该实验(2020 年版本)分为三个部分,目标是开发一个容错的KV系统,分别是 Part 2A:leader 选举、Part 2B:日志同步、lab2C:状态备份。
实验要求

- 每个 Replica Group 负责存储一组 K/V,通过 Raft 实现组内一致性;
- Shard Master 管理配置信息,决定如何分片,通过 Raft 实现容错;
- 客户端请求 Shard Master 来查询 Key 所在的 Replica Group,可以将配置信息存储在客户端本地,可直在请求失败时才重新获取配置;
- 分片需要在不同 Replica Group 之间移动,以实现 Replica Group 加入或离开集群时负载均衡;
- 负载均衡时,需要尽量少地移动分片;
- Shard Master 需要保留历史配置,不同的配置直接拥有唯一编号标示。第一个配置的编号为 0,不包含任何 Replica Group,所有的碎片都被分配到 GID 0(此时 GID 0 是无效的);
- 分片的数量远多于 Replica Groups 数量,即每个 Replica Group 管理许多分片,以支持细粒度的负载转移;
- Shard Master 需要实现如下 RPC 接口:
Join
:加入新 Replica Group;
Leave
:将 Replica Group 从集群剔除;
Move
:将分片移动到指定 Replica Group,单纯用于测试,之后的Join
和Leave
触发的负载均衡将覆盖Move
的影响;
Query
:查询特定编号的配置,如果参数为 -1 或大于最大编号,则返回最新配置;
- Replica Group 至少需要 100ms/次 的频率向 Shard Master 了解最新配置;
- 允许在迁移完分片后,不删除不再拥有的分片,这样可以简化服务的设计;
- 迁移分片时,允许直接发送整个数据库的数据,以简化传输逻辑;
Shard Master
逻辑流程与 Lab 3 的 kvraft 一致,将Get
、PutAppend
的处理换成了Join
、Leave
、Move
、Query
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| type groupSlot struct { gid int valid bool shards []int }
type ShardMaster struct { slots []groupSlot }
func (sm *ShardMaster) join(servers map[int][]string) { shards := sm.lastedConfig().Shards groups := make(map[int][]string) for gid, servers := range sm.lastedConfig().Groups { groups[gid] = servers } }
func (sm *ShardMaster) leave(gids []int) { }
func (sm *ShardMaster) move(shard, gid int) { }
func StartServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister) *ShardMaster { var shards []int for shard := 0; shard < NShards; shard++ { shards = append(shards, shard) } sm.slots = append(sm.slots, groupSlot{0, false, shards}) }
|
负载均衡策略:
- 为了尽可能少的转移分片,不应该以最大
slot
到最小slot
的方式循环迭代转移(如图箭头 2,slot0
、slot1
都需要需要同时和slot2
、slot3
通信),而应该批量转移(如图箭头 1,slot0
只需要和slot2
通信,slot1
只需要和slot3
通信);

- 负载均衡状态下的容量上限不是唯一值(如图),要确定每个
slot
需要转移(接受)多少分片,等价于,确定每个slot
应该满足哪个容量上限。为了尽可能少的转移分片,应该让含有分片多的slot
满足limMax
,让含有分片少的slot
满足limMin
;

负载均衡实现:
- 对
slots
排序,优先valid == false
,shards
多的;
- 更新
slots
,将前面invalid
的裁了;
- 令前的
nMax
个slot
需满足limMax
,后nMin
个需满足limMin
(如图);
- 将
invalid
中的分片转移到不足容量上限的slot
;
- 将超过容量上限分片转移到不足容量上限的
slot
;

Shift Shards
思路
为了避免重复请求,只由 Leader 向 Shard Master 请求配置,再使用 Raft 达成共识;
为了避免重复请求,只由 Leader 进行分片转移,再使用 Raft 达成共识;
应该单独开一个 goroutine 主动检查并进行分片转移,而非配置更新时被动触发,可能分片转移的结果没能在 Raft 达到共识,需要重试;
一次分片转移的完整过程为:从 Raft 中读取到配置更新,到 Raft 中读取到分片更新,在这期间应该停止对旧分片和新分片的操作,避免转移的结果覆盖操作;
不应该直接更新到最新的配置,而应该更新到下一任配置,预防出现这样的情况:
- 分片一开始在 A Group;
- 分片转移到 B Group;
- 分片转移到 C Group;
C Group 直接从配置 1 更新到配置 3,从而只从 A Group 请求分片,丢失了 B Group 的更新;
应该拒绝将分片转移给配置比自己新的 Group,预防 A Group 还没更新配置,B Group 先更新并从 A Group 获取了分片,之后 A Group 执行的数据更新操作都没有转移到 B Group;
应该针对每个分片都维护一个“每个客户端最后执行的命令 Id”来避免操作重复,在转移分片的时候将该记录也转移,预防 A Group 执行完了操作但没向客户端反馈,接着将分片转移给 B Group,客户端继续向 B Group 发送重复请求;
流程

在 Lab 3 kvraft 的基础上实现,省略了 kvraft 的部分
实现细节
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
| func (kv *ShardKV) genSnapshot() []byte { e.Encode(kv.oldConfig) e.Encode(kv.shifting) }
func (kv *ShardKV) applyCommand(op Op) { if kv.shifting || kv.config.Shards[key2shard(op.Key)] != kv.gid { return } }
func (kv *ShardKV) applyPulled(pulled Pulled) { if !kv.shifting || pulled.ConfigNum != kv.config.Num { return } for _, shard := range pulled.Shards { for clerkId, opId := range pulled.LastApplied[shard] { kv.lastApplied[shard][clerkId] = opId } } }
func (kv *ShardKV) ShiftShards(args *ShiftArgs, reply *ShiftReply) { kv.registerFeedback(feedbackCh, opId, func() { reply.KV = make(map[string]string) for _, shard := range args.Shards { for k, v := range kv.kv { if key2shard(k) == shard { reply.KV[k] = v reply.LastApplied[shard] = make(map[int64]int64) for clerkId, opId := range kv.lastApplied[shard] { reply.LastApplied[shard][clerkId] = opId } } } } }) }
func (kv *ShardKV) checkPull() { for { var pulled Pulled pulled.KV = make(map[string]string) pulled.ConfigNum = config.Num for gid, shards := range tobePull { servers := oldConfig.Groups[gid] } } }
func (kv *ShardKV) applyConfig(config shardmaster.Config) { if kv.config.Num >= config.Num || kv.shifting { return } kv.oldConfig = kv.config }
func (kv *ShardKV) Get(args *GetArgs, reply *GetReply) { kv.registerFeedback(feedbackCh, opId, func() { if kv.shifting || kv.config.Shards[key2shard(args.Key)] != kv.gid { reply.Err = ErrWrongGroup return } reply.Value = kv.kv[args.Key] reply.Err = OK }) }
|
Challenge(TODO)
删除分片思路:
- 防止 Raft 日志重放时触发分片转移,分片转移后应该 Snapshot;
- Snapshot 结束后,向源 Group 发送一个 RPC 信号,通知其可以删除分片;