YoloKokura

NOTE:本文为6.824(分布式系统)Lab 4的回顾,实验要求见这里。因为要遵守课程的Collaboration Policy,所以本文不会分享任何实现细节的代码(可能还是会有一些逻辑性的简单代码帮助阐明思路)。

我们在Lab 3中基于Raft算法实现了简单的分布式KV存储系统,服务器维护一个统一的map,以存储KV,并支持快照存储数据库状态。但是在实际项目里,服务器可能无法承载过量数据,我们自然想要将数据分配到不同的子集群上,当子集群出于某些原因需要暂时下线时,我们还需要做好分片迁移工作,这意味着我们需要另一个服务器(或者集群,这样容错更强)负责配置数据的分配,其地位相当于MapReduce的Coordinator。

服务架构

上图中的shardctrlershardkv都是基于Raft的集群,前者负责维护一个数据分片配置文件,后者将定期从shardctrler中fetch配置文件,并根据最新配置向兄弟集群请求数据分片。

shardctrler

shardctrler支持四种RPC接口:

  • Join:一个新的子集群需要接入整个集群。
  • Leave:一个子集群要离开集群。
  • Move:将一个分片从A子集群迁移到B子集群中。
  • Query:子集群LEADER或者用户查询指定版本的数据分片配置情况。

每当有子集群调用Join或者Leave时,shardctrler将自动修改当前配置情况,更新配置文件后再返回RPC。

关于自动配置,我采用的是尽量在每个子集群平均分配分片,并且尽量减少分片迁移的次数,我想这部分策略应该也可以让用户配置。

配置的结构大致如下:

go
type Config struct {
    Num int
    Shards [NShards]int
    Groups map[int][]string
}

该结构体记录了版本号、数据切片的分配情况(分片id -> 集群id)和集群中其它成员信息(集群id -> 主机名),用于互相传递消息。

每当Config更新时,版本号就会递增,以便子集群判断是否要进行数据分片迁移。

shardkv

shardkv的大部分逻辑和Lab 3别无二致,只是之前存储所有的数据,现在只需要存储一个数据分片。用户在进行数据库相关操作时,也会向shardctrl查询分片配置情况,然后自行计算数据对应分片,向负责的子集群LEADER发送RPC。

真正复杂的地方在于,如何实现配置的更新和数据迁移。

首先看配置更新,服务器有一个独立的goroutine周期性访问shardctrler调用Query RPC,但是每次只会请求下一个版本的Config,这样可以保证整个大集群能够渐进式地往最新版本迁移,否则一旦有某个集群意外离开大集群,同时没有将自身的数据分片迁移,那么所有的集群将无法按照配置迁移其他数据。

我是用拉取的方式来实现分片迁移,但推方式似乎也没有任何问题,为了保证高可用性,一旦服务器不再对外提供某个分片的服务,就会自行进行清理。

上述两种行为也是通过goroutine对现有数据分片状态进行周期性扫描来实现的,结合前面的周期性获取下一版本Config,服务器一共有三个守护goroutine:

  1. query:向shardctrler周期性请求下一版本Config,如果请求成功,则开始分片迁移工作。
  2. puller:周期性检查切片状态,如果发现需要拉取的分片,根据上一版本Config的信息寻找前主人集群,拉取分片。
  3. eraser:周期性检查切片状态,如果发现需要清理的分片,根据最新协议询问新主人是否得到了该分片,然后进行清理。

为了代码的简洁,我们也可以使用一个updater函数来封装周期性执行函数的逻辑,使前面三个函数之需要考虑自身逻辑。

可以看到,pullereraser其实都是以切片状态驱动的,有点像是一个状态机。我定义的切片状态如下:

go
type ShardState int

const (
    Serving ShardState = iota
    Pulling
    Pulled
    Removing
)
  • Serving:本集群在最新版本Config中提供该分片服务,且已经拿到该分片,正在服务。
  • Pulling:本集群在最新版本Config中提供该分片服务,但尚未拉取到该分片,正在拉取过程中。
  • Pulled:本集群在最新版本Config中不再提供该分片服务,即被拉取的一方。
  • Removing:本集群在最新版本Config中提供该分片服务,且已经获取到该分片,需要其原主人进行分片清理。

我们假设在初始状态,每个子集群都已经拿到了它应该存储的数据切片,在下一个版本的Config中,切片分配存在变化。

首先,服务器向shardctrler请求下一版本Config,拿到之后向自己集群同步Reconfiguration操作。在该操作中,服务器将已有的Config放到preConfig中,以便后续数据迁移时使用,然后根据两个版本Config的区别,寻找如下两种情况:

  1. 新Config中需要新增的数据分片,标记为Pulling
  2. 新Config中删除了的数据分片,标记为Pulled

我们的puller协程发现有数据分片为Pulling状态,即本集群应该提供该分片的服务,但是还没有拉取到该分片,此时参考preConfig中的信息,找到该分片的原主人,拉取分片数据,如果成功,则向子集群同步INSERT_SHARD操作。

我们的eraser协程遍历数据分片后发现,有分片处于Removing状态,因此首先确认对方已经删除该分片,然后同步调整我方集群的分片状态为Serving

上述两个步骤是在RPC发送方的角度描述的,对于RPC接收方,不论收到的是什么请求,首先对比两边的Config版本(对方的Config会写在请求参数里),只有两边Config相同,才往下执行。

对于数据拉取的请求,我们同时对分片数据操作历史进行深拷贝,然后发送给对方,发送操作历史是为了方便对方忽略已经处理过的请求,因为可能存在用户在切换了目标分片服务器之后,重新发送冗余请求的可能。

对于分片擦除的请求,我方检查分片状态,如果是Pulled,说明状态正常,可以清理该分片,否则向对方报错。

总结来看,分片迁移过程中,拉取一方的数据状态变化为:Pulling -> Removing -> Serving,而被拉取一方则是:Serving -> Pulled -> Serving。我这里同时用Serving标识正在服务状态和已经删除后的状态,因为用户在判断目标子集群时总是通过Config得到的,不会被干扰,对一个服务器来说,则可利用Serving状态和Config信息共同判断自己是否提供某个数据分片的服务。

Tags: