Redis Cluster vs Codis Slot Rebalance 算法

2023-03-26
7分钟阅读时长

Redis Cluster Slot Rebalance算法

迁移原则

  • 尽可能的均匀分配Slots;
  • 尽量减少迁移的Slots的数量;

Slots分配方案

  • 计算集群的总权重,节点数量;
  • 计算每个集群节点需要移出或移入的槽数,balance 如果为正数代表需要移出的槽数,如果为负数代表需要出入槽数;
  • 把weightedNodes 根据 balance 排序从小到大;
  • 执行槽位移动,并更新集群节点 balance;

代码实现

static int clusterManagerCommandRebalance(int argc, char **argv) {
    int port = 0;
    char *ip = NULL;
    clusterManagerNode **weightedNodes = NULL;
    list *involved = NULL;
    if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args;
    clusterManagerNode *node = clusterManagerNewNode(ip, port, 0);
    if (!clusterManagerLoadInfoFromNode(node)) return 0;
    int result = 1, i;
    if (config.cluster_manager_command.weight != NULL) {
        for (i = 0; i < config.cluster_manager_command.weight_argc; i++) {
            char *name = config.cluster_manager_command.weight[i];
            char *p = strchr(name, '=');
            if (p == NULL) {
                clusterManagerLogErr("*** invalid input %s\n", name);
                result = 0;
                goto cleanup;
            }
            *p = '\0';
            float w = atof(++p);
            clusterManagerNode *n = clusterManagerNodeByAbbreviatedName(name);
            if (n == NULL) {
                clusterManagerLogErr("*** No such master node %s\n", name);
                result = 0;
                goto cleanup;
            }
            n->weight = w;
        }
    }
    float total_weight = 0;
    int nodes_involved = 0;
    int use_empty = config.cluster_manager_command.flags &
                    CLUSTER_MANAGER_CMD_FLAG_EMPTYMASTER;
    involved = listCreate();
    listIter li;
    listNode *ln;
    listRewind(cluster_manager.nodes, &li);
    /* Compute the total cluster weight. */
    // 计算集群的总权重
    while ((ln = listNext(&li)) != NULL) {
        clusterManagerNode *n = ln->value;
        if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE || n->replicate)
            continue;
        if (!use_empty && n->slots_count == 0) {
            n->weight = 0;
            continue;
        }
        total_weight += n->weight;
        nodes_involved++; // 集群节点数量
        listAddNodeTail(involved, n);
    }
    // 分配内存空间
    weightedNodes = zmalloc(nodes_involved * sizeof(clusterManagerNode *));
    if (weightedNodes == NULL) goto cleanup;
    /* Check cluster, only proceed if it looks sane. */
    // 检查集群,只有在它看起来正常时才继续。
    clusterManagerCheckCluster(1);
    if (cluster_manager.errors && listLength(cluster_manager.errors) > 0) {
        clusterManagerLogErr("*** Please fix your cluster problems "
                             "before rebalancing\n");
        result = 0;
        goto cleanup;
    }
    /* Calculate the slots balance for each node. It's the number of
     * slots the node should lose (if positive) or gain (if negative)
     * in order to be balanced. */
    // 计算每个节点的槽位数量。
    // 它是节点为了平衡而应该移出(如果为正)或移入(如果为负)的槽位数量。
    int threshold_reached = 0, total_balance = 0;
    float threshold = config.cluster_manager_command.threshold;
    i = 0;
    listRewind(involved, &li);
    while ((ln = listNext(&li)) != NULL) {
        clusterManagerNode *n = ln->value;
        weightedNodes[i++] = n;
        // 计算预期槽数
        int expected = (int) (((float)CLUSTER_MANAGER_SLOTS / total_weight) *
                        n->weight);
        // 均衡前的哈希槽位和均衡后的哈希槽位的差值
        n->balance = n->slots_count - expected;
        total_balance += n->balance;
        /* Compute the percentage of difference between the
         * expected number of slots and the real one, to see
         * if it's over the threshold specified by the user. */
        // 计算期望槽数与实际槽数之间的差异百分比,看它是否超过用户指定的阈值。
        int over_threshold = 0;
        if (threshold > 0) {
            if (n->slots_count > 0) {
                float err_perc = fabs((100-(100.0*expected/n->slots_count)));
                if (err_perc > threshold) over_threshold = 1;
            } else if (expected > 1) {
                over_threshold = 1;
            }
        }
        if (over_threshold) threshold_reached = 1; // 标记需要平衡
    }
    if (!threshold_reached) {
        clusterManagerLogWarn("*** No rebalancing needed! "
                             "All nodes are within the %.2f%% threshold.\n",
                             config.cluster_manager_command.threshold);
        goto cleanup;
    }
    /* Because of rounding, it is possible that the balance of all nodes
     * summed does not give 0. Make sure that nodes that have to provide
     * slots are always matched by nodes receiving slots. */
    // 由于四舍五入,所有节点的槽数总和可能不为 0。必须确保移出的槽数与移入的槽数匹配。
    while (total_balance > 0) {
        listRewind(involved, &li);
        while ((ln = listNext(&li)) != NULL) {
            clusterManagerNode *n = ln->value;
            // 把多余槽数轮询分发给移入的节点
            if (n->balance <= 0 && total_balance > 0) {
                n->balance--;
                total_balance--;
            }
        }
    }
    /* Sort nodes by their slots balance. */
    // 按槽数对节点进行排序。
    qsort(weightedNodes, nodes_involved, sizeof(clusterManagerNode *),
          clusterManagerCompareNodeBalance); // 根据 balance 排序从小到大
    clusterManagerLogInfo(">>> Rebalancing across %d nodes. "
                          "Total weight = %.2f\n",
                          nodes_involved, total_weight);
    if (config.verbose) {
        for (i = 0; i < nodes_involved; i++) {
            clusterManagerNode *n = weightedNodes[i];
            printf("%s:%d balance is %d slots\n", n->ip, n->port, n->balance);
        }
    }
    /* Now we have at the start of the 'sn' array nodes that should get
     * slots, at the end nodes that must give slots.
     * We take two indexes, one at the start, and one at the end,
     * incrementing or decrementing the indexes accordingly til we
     * find nodes that need to get/provide slots. */
    // 现在我们在“sn”数组节点的开始处应该获得槽,在末端节点处必须提供槽。
    // 我们有两个索引,一个在开始,一个在结束,
    // 相应地递增或递减索引,直到我们找到需要移出/移入槽的节点。
    int dst_idx = 0;
    int src_idx = nodes_involved - 1;
    int simulate = config.cluster_manager_command.flags &
                   CLUSTER_MANAGER_CMD_FLAG_SIMULATE;
    // 例如100个槽位,原来4个节点,现在5个节点。   
    // 25,25,25,25 -> 20,20,20,20,20  weightedNodes:-20,5,5,5,5
    while (dst_idx < src_idx) {
        // balance<0 表示要从集群其它节点的槽位迁移到本节点的槽位,所以此节点是槽位的目的节点
        clusterManagerNode *dst = weightedNodes[dst_idx];
        // balance>0 表示要从集群本节点的槽位迁移到其它节点的槽位,所以此节点是槽位的源节点
        clusterManagerNode *src = weightedNodes[src_idx];
        int db = abs(dst->balance);
        int sb = abs(src->balance);
        int numslots = (db < sb ? db : sb);
        if (numslots > 0) {
            printf("Moving %d slots from %s:%d to %s:%d\n", numslots,
                                                            src->ip,
                                                            src->port,
                                                            dst->ip,
                                                            dst->port);
            /* Actually move the slots. */
            // 执行实际移动槽位
            list *lsrc = listCreate(), *table = NULL;
            listAddNodeTail(lsrc, src);
            // 获取要迁移的槽,优先获取移动slot编号小的槽
            table = clusterManagerComputeReshardTable(lsrc, numslots);
            listRelease(lsrc);
            int table_len = (int) listLength(table);
            if (!table || table_len != numslots) {
                clusterManagerLogErr("*** Assertion failed: Reshard table "
                                     "!= number of slots");
                result = 0;
                goto end_move;
            }
            if (simulate) {
                for (i = 0; i < table_len; i++) printf("#");
            } else {
                int opts = CLUSTER_MANAGER_OPT_QUIET |
                           CLUSTER_MANAGER_OPT_UPDATE;
                listRewind(table, &li);
                while ((ln = listNext(&li)) != NULL) {
                    clusterManagerReshardTableItem *item = ln->value;
                    char *err;
                    // 迁移槽位
                    result = clusterManagerMoveSlot(item->source,
                                                    dst,
                                                    item->slot,
                                                    opts, &err);
                    if (!result) {
                        clusterManagerLogErr("*** clusterManagerMoveSlot: %s\n", err);
                        zfree(err);
                        goto end_move;
                    }
                    printf("#");
                    fflush(stdout);
                }
            }
            printf("\n");
end_move:
            clusterManagerReleaseReshardTable(table);
            if (!result) goto cleanup;
        }
        /* Update nodes balance. */
        // 更新节点 balance。
        dst->balance += numslots;
        src->balance -= numslots;
        if (dst->balance == 0) dst_idx++;
        if (src->balance == 0) src_idx --;
    }
cleanup:
    if (involved != NULL) listRelease(involved);
    if (weightedNodes != NULL) zfree(weightedNodes);
    return result;
invalid_args:
    fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG);
    return 0;
}

Codis Slot Rebalance算法

使用场景

Codis的集群初始为2个分片,当业务增长需要扩容到4个分片的时候,我们可以手动指定slots指挥Codis进行数据迁移,也可以使用AutoRebalanceCodis自动的进行Slots数据迁移。

迁移原则

  • 尽可能的均匀分配Slots;
  • 尽量减少迁移的Slots的数量;

Slots分配方案

  • 统计当前迁移中Slots的结果,用于当前迁移方案的基础数据;
  • 按照每个Group可分配Slots的最大限制,统计Group中需要迁入/出的Slots信息;
  • 依据现有的GroupSlots的数量构建红黑树,统计分配Slots
  • 审核并存储迁移方案;

代码实现

func (s *Topom) SlotsRebalance1(confirm bool) (map[int]int, error) {
    s.mu.Lock()
    defer s.mu.Unlock()
    ctx, err := s.newContext()
    if err != nil {
        return nil, err
    }

    // 获取所有group的id,每一个group必须拥有redis实例,
    // 依据id从小到大排序group,其中group的id最小值为1
    var groupIds []int
    for _, g := range ctx.group {
        if len(g.Servers) != 0 {
            groupIds = append(groupIds, g.Id)
        }
    }
    sort.Ints(groupIds) // 按groupId从小到大排序

    if len(groupIds) == 0 { // 如果group为空,返回错误
        return nil, errors.Errorf("no valid group could be found")
    }

    // 每一个分片(组)都拥有3个属性
    var (
        // 需要给当前的group分配的slots的数量
        assigned = make(map[int]int)
        // 当前group需要移出的slots信息,其中key为group的id,value为slots的数组
        pendings = make(map[int][]int)
        // 当前group需要移出/入(为负数时代表移入)的slots数量,
        // 其中key为group的id,value为slots的数量
        moveout  = make(map[int]int)
        docking  []int // 为需要最终操作的slots的列表
    )
    // 获取group的当前的slots的数量
    var groupSize = func(gid int) int {
        return assigned[gid] + len(pendings[gid]) - moveout[gid]
    }

    // don't migrate slot if it's being migrated
    // 如果正在迁移,请不要迁移插槽
    // 遍历slots,获取正在迁移中的slots的迁移结果并该结果计入本次的迁移统计
    for _, m := range ctx.slots {
        if m.Action.State != models.ActionNothing {
            assigned[m.Action.TargetId]++ // 累加计算每个groupId已经分配的slot数量
        }
    }

    // 按照平均值计算每个group可以分到的slots的数量下限(总量为1024)
    var lowerBound = MaxSlotNum / len(groupIds)

    // don't migrate slot if groupSize < lowerBound
    // 如果 groupSize < lowerBound,则不要迁移插槽
    // 遍历slots,统计需要迁移的slots信息
    for _, m := range ctx.slots {
        // 对于处于迁移状态中的slots不执行任何操作
        if m.Action.State != models.ActionNothing {
            continue
        }
        // 当前的slots属于集群中的一个group
        if m.GroupId != 0 {
            // slot所归属group中的slots的数量小于group的平均值,
            // 则需要往这个group中分配新的slot
            if groupSize(m.GroupId) < lowerBound {
                assigned[m.GroupId]++
            } else {
                // slot所归属group中的slots的数量大于group的平均值,
                // 则需要将这个slot移出它所归属的group
                pendings[m.GroupId] = append(pendings[m.GroupId], m.Id)
            }
        }
    }

    // 创建一个自定义比较器的红黑树,这棵树代表着需要进行slots迁移的所有group
    // key是group的id,slots最少的在左面,slots最多的在右面,key是group的id
    var tree = rbtree.NewWith(func(x, y interface{}) int {
        var gid1 = x.(int)
        var gid2 = y.(int)
        if gid1 != gid2 {
            if d := groupSize(gid1) - groupSize(gid2); d != 0 {
                return d
            }
            return gid1 - gid2
        }
        return 0
    })
    for _, gid := range groupIds {
        tree.Put(gid, nil)
    }

    // assign offline slots to the smallest group
    // 将离线插槽分配给最小的组
    for _, m := range ctx.slots { // 遍历所有的slots
        // 对于处于迁移状态中的slots不执行任何操作
        if m.Action.State != models.ActionNothing {
            continue
        }
        if m.GroupId != 0 {
            continue
        }
        // 有一些slots不属于任何group,需要将这些slots分配给slots最少的group,
        // 也就是红黑树左面的最小的group
        dest := tree.Left().Key.(int)
        tree.Remove(dest)

        docking = append(docking, m.Id) // 对接,记录slot
        moveout[dest]-- // 记录最小分组需要move的个数

        tree.Put(dest, nil)
    }

    // 每一个group能够获取slots的数量的上限,其实约等于 lowerBound + 1
    var upperBound = (MaxSlotNum + len(groupIds) - 1) / len(groupIds)

    // rebalance between different server groups
    // 在不同的服务器组之间重新平衡
    // 树中需要迁移的group大于等于2则需要进行rebalance,只有一个group就不需要了
    for tree.Size() >= 2 {
        from := tree.Right().Key.(int)
        tree.Remove(from)

        // 当前group已经把所有需要移出的slots迁移出完毕
        if len(pendings[from]) == moveout[from] {
            continue
        }
        dest := tree.Left().Key.(int)
        tree.Remove(dest)

        var (
            fromSize = groupSize(from)
            destSize = groupSize(dest)
        )
        // 右面的group中slots的数量小于等于每个group的平均值,
        // 则表示该group迁移完成,不需要再次加入tree中
        if fromSize <= lowerBound {
            break
        }
        // 左面的group中slots的数量大于等于每个group的最大值,
        // 则表示该group也迁移完成,不需要再次加入tree中
        if destSize >= upperBound {
            break
        }
        // 左右group中的slots的数量相差小于等于1,
        // 则表示这个两个group也不需要再次加入tree中了
        if d := fromSize - destSize; d <= 1 {
            break
        }
        // 右面的group移出了一个,左面的group加入了一个
        moveout[from]++
        moveout[dest]--

        // 还需要继续迁移,将这两个group继续加入树中
        tree.Put(from, nil)
        tree.Put(dest, nil)
    }

    for gid, n := range moveout {
        if n < 0 {
            continue
        }
        // 当前group需要移出n个slots
        if n > 0 {
            // 倒序遍历需要移出的slots的列表,将需要迁移的slots加入到docking中
            sids := pendings[gid]
            sort.Sort(sort.Reverse(sort.IntSlice(sids)))

            docking = append(docking, sids[0:n]...)
            pendings[gid] = sids[n:]
        }
        delete(moveout, gid)
    }
    sort.Ints(docking) // 排序需要操作的slots列表

    // 遍历group,获取每一个group需要迁入多少个slots并将docking中的slots分配给对应的group,
    // plans就是最终的分配方案,将某一个slot分配给某一个group
    var plans = make(map[int]int)
    // 构建迁移方案,填充平衡需要移动slot=>gid
    for _, gid := range groupIds {
        var in = -moveout[gid]
        for i := 0; i < in && len(docking) != 0; i++ {
            plans[docking[0]] = gid
            docking = docking[1:]
        }
    }

    // 审批该方案
    if !confirm {
        return plans, nil
    }

    // 存储slots与group的分配方案后续执行
    var slotIds []int
    for sid, _ := range plans {
        slotIds = append(slotIds, sid)
    }
    sort.Ints(slotIds)

    for _, sid := range slotIds {
        m, err := ctx.getSlotMapping(sid)
        if err != nil {
            return nil, err
        }
        defer s.dirtySlotsCache(m.Id)

        m.Action.State = models.ActionPending
        m.Action.Index = ctx.maxSlotActionIndex() + 1
        m.Action.TargetId = plans[sid]
        if err := s.storeUpdateSlotMapping(m); err != nil {
            return nil, err
        }
    }
    return plans, nil
}

参考链接

关注公众号获得更多精彩文章

公众号:程序员大兵