Redis Cluster vs Codis Slot Rebalance 算法
Redis Cluster Slot Rebalance算法
迁移原则
- 尽可能的均匀分配Slots;
- 尽量减少迁移的Slots的数量;
Slots分配方案
- 计算集群的总权重,节点数量;
- 计算每个集群节点需要移出或移入的槽数,balance 如果为正数代表需要移出的槽数,如果为负数代表需要出入槽数;
- 把weightedNodes 根据 balance 排序从小到大;
- 执行槽位移动,并更新集群节点 balance;
代码实现
static int clusterManagerCommandRebalance(int argc, char **argv) {
int port = 0;
char *ip = NULL;
clusterManagerNode **weightedNodes = NULL;
list *involved = NULL;
if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args;
clusterManagerNode *node = clusterManagerNewNode(ip, port, 0);
if (!clusterManagerLoadInfoFromNode(node)) return 0;
int result = 1, i;
if (config.cluster_manager_command.weight != NULL) {
for (i = 0; i < config.cluster_manager_command.weight_argc; i++) {
char *name = config.cluster_manager_command.weight[i];
char *p = strchr(name, '=');
if (p == NULL) {
clusterManagerLogErr("*** invalid input %s\n", name);
result = 0;
goto cleanup;
}
*p = '\0';
float w = atof(++p);
clusterManagerNode *n = clusterManagerNodeByAbbreviatedName(name);
if (n == NULL) {
clusterManagerLogErr("*** No such master node %s\n", name);
result = 0;
goto cleanup;
}
n->weight = w;
}
}
float total_weight = 0;
int nodes_involved = 0;
int use_empty = config.cluster_manager_command.flags &
CLUSTER_MANAGER_CMD_FLAG_EMPTYMASTER;
involved = listCreate();
listIter li;
listNode *ln;
listRewind(cluster_manager.nodes, &li);
/* Compute the total cluster weight. */
// 计算集群的总权重
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE || n->replicate)
continue;
if (!use_empty && n->slots_count == 0) {
n->weight = 0;
continue;
}
total_weight += n->weight;
nodes_involved++; // 集群节点数量
listAddNodeTail(involved, n);
}
// 分配内存空间
weightedNodes = zmalloc(nodes_involved * sizeof(clusterManagerNode *));
if (weightedNodes == NULL) goto cleanup;
/* Check cluster, only proceed if it looks sane. */
// 检查集群,只有在它看起来正常时才继续。
clusterManagerCheckCluster(1);
if (cluster_manager.errors && listLength(cluster_manager.errors) > 0) {
clusterManagerLogErr("*** Please fix your cluster problems "
"before rebalancing\n");
result = 0;
goto cleanup;
}
/* Calculate the slots balance for each node. It's the number of
* slots the node should lose (if positive) or gain (if negative)
* in order to be balanced. */
// 计算每个节点的槽位数量。
// 它是节点为了平衡而应该移出(如果为正)或移入(如果为负)的槽位数量。
int threshold_reached = 0, total_balance = 0;
float threshold = config.cluster_manager_command.threshold;
i = 0;
listRewind(involved, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
weightedNodes[i++] = n;
// 计算预期槽数
int expected = (int) (((float)CLUSTER_MANAGER_SLOTS / total_weight) *
n->weight);
// 均衡前的哈希槽位和均衡后的哈希槽位的差值
n->balance = n->slots_count - expected;
total_balance += n->balance;
/* Compute the percentage of difference between the
* expected number of slots and the real one, to see
* if it's over the threshold specified by the user. */
// 计算期望槽数与实际槽数之间的差异百分比,看它是否超过用户指定的阈值。
int over_threshold = 0;
if (threshold > 0) {
if (n->slots_count > 0) {
float err_perc = fabs((100-(100.0*expected/n->slots_count)));
if (err_perc > threshold) over_threshold = 1;
} else if (expected > 1) {
over_threshold = 1;
}
}
if (over_threshold) threshold_reached = 1; // 标记需要平衡
}
if (!threshold_reached) {
clusterManagerLogWarn("*** No rebalancing needed! "
"All nodes are within the %.2f%% threshold.\n",
config.cluster_manager_command.threshold);
goto cleanup;
}
/* Because of rounding, it is possible that the balance of all nodes
* summed does not give 0. Make sure that nodes that have to provide
* slots are always matched by nodes receiving slots. */
// 由于四舍五入,所有节点的槽数总和可能不为 0。必须确保移出的槽数与移入的槽数匹配。
while (total_balance > 0) {
listRewind(involved, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
// 把多余槽数轮询分发给移入的节点
if (n->balance <= 0 && total_balance > 0) {
n->balance--;
total_balance--;
}
}
}
/* Sort nodes by their slots balance. */
// 按槽数对节点进行排序。
qsort(weightedNodes, nodes_involved, sizeof(clusterManagerNode *),
clusterManagerCompareNodeBalance); // 根据 balance 排序从小到大
clusterManagerLogInfo(">>> Rebalancing across %d nodes. "
"Total weight = %.2f\n",
nodes_involved, total_weight);
if (config.verbose) {
for (i = 0; i < nodes_involved; i++) {
clusterManagerNode *n = weightedNodes[i];
printf("%s:%d balance is %d slots\n", n->ip, n->port, n->balance);
}
}
/* Now we have at the start of the 'sn' array nodes that should get
* slots, at the end nodes that must give slots.
* We take two indexes, one at the start, and one at the end,
* incrementing or decrementing the indexes accordingly til we
* find nodes that need to get/provide slots. */
// 现在我们在“sn”数组节点的开始处应该获得槽,在末端节点处必须提供槽。
// 我们有两个索引,一个在开始,一个在结束,
// 相应地递增或递减索引,直到我们找到需要移出/移入槽的节点。
int dst_idx = 0;
int src_idx = nodes_involved - 1;
int simulate = config.cluster_manager_command.flags &
CLUSTER_MANAGER_CMD_FLAG_SIMULATE;
// 例如100个槽位,原来4个节点,现在5个节点。
// 25,25,25,25 -> 20,20,20,20,20 weightedNodes:-20,5,5,5,5
while (dst_idx < src_idx) {
// balance<0 表示要从集群其它节点的槽位迁移到本节点的槽位,所以此节点是槽位的目的节点
clusterManagerNode *dst = weightedNodes[dst_idx];
// balance>0 表示要从集群本节点的槽位迁移到其它节点的槽位,所以此节点是槽位的源节点
clusterManagerNode *src = weightedNodes[src_idx];
int db = abs(dst->balance);
int sb = abs(src->balance);
int numslots = (db < sb ? db : sb);
if (numslots > 0) {
printf("Moving %d slots from %s:%d to %s:%d\n", numslots,
src->ip,
src->port,
dst->ip,
dst->port);
/* Actually move the slots. */
// 执行实际移动槽位
list *lsrc = listCreate(), *table = NULL;
listAddNodeTail(lsrc, src);
// 获取要迁移的槽,优先获取移动slot编号小的槽
table = clusterManagerComputeReshardTable(lsrc, numslots);
listRelease(lsrc);
int table_len = (int) listLength(table);
if (!table || table_len != numslots) {
clusterManagerLogErr("*** Assertion failed: Reshard table "
"!= number of slots");
result = 0;
goto end_move;
}
if (simulate) {
for (i = 0; i < table_len; i++) printf("#");
} else {
int opts = CLUSTER_MANAGER_OPT_QUIET |
CLUSTER_MANAGER_OPT_UPDATE;
listRewind(table, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerReshardTableItem *item = ln->value;
char *err;
// 迁移槽位
result = clusterManagerMoveSlot(item->source,
dst,
item->slot,
opts, &err);
if (!result) {
clusterManagerLogErr("*** clusterManagerMoveSlot: %s\n", err);
zfree(err);
goto end_move;
}
printf("#");
fflush(stdout);
}
}
printf("\n");
end_move:
clusterManagerReleaseReshardTable(table);
if (!result) goto cleanup;
}
/* Update nodes balance. */
// 更新节点 balance。
dst->balance += numslots;
src->balance -= numslots;
if (dst->balance == 0) dst_idx++;
if (src->balance == 0) src_idx --;
}
cleanup:
if (involved != NULL) listRelease(involved);
if (weightedNodes != NULL) zfree(weightedNodes);
return result;
invalid_args:
fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG);
return 0;
}
Codis Slot Rebalance算法
使用场景
Codis
的集群初始为2个分片
,当业务增长需要扩容到4个分片
的时候,我们可以手动指定slots指挥Codis
进行数据迁移,也可以使用AutoRebalance
让Codis
自动的进行Slots数据迁移。
迁移原则
- 尽可能的均匀分配Slots;
- 尽量减少迁移的Slots的数量;
Slots分配方案
- 统计当前迁移中
Slots
的结果,用于当前迁移方案的基础数据; - 按照每个
Group
可分配Slots的最大限制,统计Group中需要迁入/出的Slots
信息; - 依据现有的
Group
中Slots
的数量构建红黑树,统计分配Slots
; - 审核并存储迁移方案;
代码实现
func (s *Topom) SlotsRebalance1(confirm bool) (map[int]int, error) {
s.mu.Lock()
defer s.mu.Unlock()
ctx, err := s.newContext()
if err != nil {
return nil, err
}
// 获取所有group的id,每一个group必须拥有redis实例,
// 依据id从小到大排序group,其中group的id最小值为1
var groupIds []int
for _, g := range ctx.group {
if len(g.Servers) != 0 {
groupIds = append(groupIds, g.Id)
}
}
sort.Ints(groupIds) // 按groupId从小到大排序
if len(groupIds) == 0 { // 如果group为空,返回错误
return nil, errors.Errorf("no valid group could be found")
}
// 每一个分片(组)都拥有3个属性
var (
// 需要给当前的group分配的slots的数量
assigned = make(map[int]int)
// 当前group需要移出的slots信息,其中key为group的id,value为slots的数组
pendings = make(map[int][]int)
// 当前group需要移出/入(为负数时代表移入)的slots数量,
// 其中key为group的id,value为slots的数量
moveout = make(map[int]int)
docking []int // 为需要最终操作的slots的列表
)
// 获取group的当前的slots的数量
var groupSize = func(gid int) int {
return assigned[gid] + len(pendings[gid]) - moveout[gid]
}
// don't migrate slot if it's being migrated
// 如果正在迁移,请不要迁移插槽
// 遍历slots,获取正在迁移中的slots的迁移结果并该结果计入本次的迁移统计
for _, m := range ctx.slots {
if m.Action.State != models.ActionNothing {
assigned[m.Action.TargetId]++ // 累加计算每个groupId已经分配的slot数量
}
}
// 按照平均值计算每个group可以分到的slots的数量下限(总量为1024)
var lowerBound = MaxSlotNum / len(groupIds)
// don't migrate slot if groupSize < lowerBound
// 如果 groupSize < lowerBound,则不要迁移插槽
// 遍历slots,统计需要迁移的slots信息
for _, m := range ctx.slots {
// 对于处于迁移状态中的slots不执行任何操作
if m.Action.State != models.ActionNothing {
continue
}
// 当前的slots属于集群中的一个group
if m.GroupId != 0 {
// slot所归属group中的slots的数量小于group的平均值,
// 则需要往这个group中分配新的slot
if groupSize(m.GroupId) < lowerBound {
assigned[m.GroupId]++
} else {
// slot所归属group中的slots的数量大于group的平均值,
// 则需要将这个slot移出它所归属的group
pendings[m.GroupId] = append(pendings[m.GroupId], m.Id)
}
}
}
// 创建一个自定义比较器的红黑树,这棵树代表着需要进行slots迁移的所有group
// key是group的id,slots最少的在左面,slots最多的在右面,key是group的id
var tree = rbtree.NewWith(func(x, y interface{}) int {
var gid1 = x.(int)
var gid2 = y.(int)
if gid1 != gid2 {
if d := groupSize(gid1) - groupSize(gid2); d != 0 {
return d
}
return gid1 - gid2
}
return 0
})
for _, gid := range groupIds {
tree.Put(gid, nil)
}
// assign offline slots to the smallest group
// 将离线插槽分配给最小的组
for _, m := range ctx.slots { // 遍历所有的slots
// 对于处于迁移状态中的slots不执行任何操作
if m.Action.State != models.ActionNothing {
continue
}
if m.GroupId != 0 {
continue
}
// 有一些slots不属于任何group,需要将这些slots分配给slots最少的group,
// 也就是红黑树左面的最小的group
dest := tree.Left().Key.(int)
tree.Remove(dest)
docking = append(docking, m.Id) // 对接,记录slot
moveout[dest]-- // 记录最小分组需要move的个数
tree.Put(dest, nil)
}
// 每一个group能够获取slots的数量的上限,其实约等于 lowerBound + 1
var upperBound = (MaxSlotNum + len(groupIds) - 1) / len(groupIds)
// rebalance between different server groups
// 在不同的服务器组之间重新平衡
// 树中需要迁移的group大于等于2则需要进行rebalance,只有一个group就不需要了
for tree.Size() >= 2 {
from := tree.Right().Key.(int)
tree.Remove(from)
// 当前group已经把所有需要移出的slots迁移出完毕
if len(pendings[from]) == moveout[from] {
continue
}
dest := tree.Left().Key.(int)
tree.Remove(dest)
var (
fromSize = groupSize(from)
destSize = groupSize(dest)
)
// 右面的group中slots的数量小于等于每个group的平均值,
// 则表示该group迁移完成,不需要再次加入tree中
if fromSize <= lowerBound {
break
}
// 左面的group中slots的数量大于等于每个group的最大值,
// 则表示该group也迁移完成,不需要再次加入tree中
if destSize >= upperBound {
break
}
// 左右group中的slots的数量相差小于等于1,
// 则表示这个两个group也不需要再次加入tree中了
if d := fromSize - destSize; d <= 1 {
break
}
// 右面的group移出了一个,左面的group加入了一个
moveout[from]++
moveout[dest]--
// 还需要继续迁移,将这两个group继续加入树中
tree.Put(from, nil)
tree.Put(dest, nil)
}
for gid, n := range moveout {
if n < 0 {
continue
}
// 当前group需要移出n个slots
if n > 0 {
// 倒序遍历需要移出的slots的列表,将需要迁移的slots加入到docking中
sids := pendings[gid]
sort.Sort(sort.Reverse(sort.IntSlice(sids)))
docking = append(docking, sids[0:n]...)
pendings[gid] = sids[n:]
}
delete(moveout, gid)
}
sort.Ints(docking) // 排序需要操作的slots列表
// 遍历group,获取每一个group需要迁入多少个slots并将docking中的slots分配给对应的group,
// plans就是最终的分配方案,将某一个slot分配给某一个group
var plans = make(map[int]int)
// 构建迁移方案,填充平衡需要移动slot=>gid
for _, gid := range groupIds {
var in = -moveout[gid]
for i := 0; i < in && len(docking) != 0; i++ {
plans[docking[0]] = gid
docking = docking[1:]
}
}
// 审批该方案
if !confirm {
return plans, nil
}
// 存储slots与group的分配方案后续执行
var slotIds []int
for sid, _ := range plans {
slotIds = append(slotIds, sid)
}
sort.Ints(slotIds)
for _, sid := range slotIds {
m, err := ctx.getSlotMapping(sid)
if err != nil {
return nil, err
}
defer s.dirtySlotsCache(m.Id)
m.Action.State = models.ActionPending
m.Action.Index = ctx.maxSlotActionIndex() + 1
m.Action.TargetId = plans[sid]
if err := s.storeUpdateSlotMapping(m); err != nil {
return nil, err
}
}
return plans, nil
}