【腾讯云 Finops Crane 集训营】EHPA控制器核心源码分析
官方文档:https://gocrane.io/
Github: https://github.com/gocrane/crane
启动流程
main 函数
craned 启动入口在cmd/craned/main.go
文件中,main
方法代码如下
func main() {
logs.InitLogs()
defer logs.FlushLogs()
ctx := signals.SetupSignalHandler()
if err := app.NewManagerCommand(ctx).Execute(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}
下面进行逐行分析:
- 2~3 行初始化日志,历史defer在程序退出前刷新缓存区
- 5行通过
controller-runtime
包获取信号监听可取消的context - 7行创建
cobra.Command
命令行实例并直接调用Execute
方法启动应用,如果出错退出程序
app.NewManagerCommand
方法在cmd/craned/app/manager.go
文件中,函数中主要是创建Options
和cobra.Command
,并初始化命令行参数选项。cobra.Command
在执行的时候回调用cmd/craned/app/manager.go
文件中的Run
函数。
Run 函数
下面来重点分析下Run
函数。代码如下:
// Run runs the craned with options. This should never exit.
func Run(ctx context.Context, opts *options.Options) error {
config := ctrl.GetConfigOrDie()
config.QPS = float32(opts.ApiQps)
config.Burst = opts.ApiBurst
ctrlOptions := ctrl.Options{
Scheme: scheme,
MetricsBindAddress: opts.MetricsAddr,
Port: 9443,
HealthProbeBindAddress: opts.BindAddr,
LeaderElection: opts.LeaderElection.LeaderElect,
LeaderElectionID: "craned",
LeaderElectionNamespace: known.CraneSystemNamespace,
}
if opts.CacheUnstructured {
ctrlOptions.NewClient = NewCacheUnstructuredClient
}
mgr, err := ctrl.NewManager(config, ctrlOptions)
if err != nil {
klog.ErrorS(err, "unable to start crane manager")
return err
}
if err := mgr.AddHealthzCheck("ping", healthz.Ping); err != nil {
klog.ErrorS(err, "failed to add health check endpoint")
return err
}
// initialize data sources and predictor
realtimeDataSources, historyDataSources, dataSourceProviders := initDataSources(mgr, opts)
predictorMgr := initPredictorManager(opts, realtimeDataSources, historyDataSources)
initScheme()
initFieldIndexer(mgr)
initWebhooks(mgr, opts)
podOOMRecorder := &oom.PodOOMRecorder{
Client: mgr.GetClient(),
OOMRecordMaxNumber: opts.OOMRecordMaxNumber,
}
if err := podOOMRecorder.SetupWithManager(mgr); err != nil {
klog.Exit(err, "Unable to create controller", "PodOOMRecorder")
}
go func() {
if err := podOOMRecorder.Run(ctx.Done()); err != nil {
klog.Warningf("Run oom recorder failed: %v", err)
}
}()
recommenderMgr := initRecommenderManager(opts, podOOMRecorder, realtimeDataSources, historyDataSources)
initControllers(podOOMRecorder, mgr, opts, predictorMgr, recommenderMgr, historyDataSources[providers.PrometheusDataSource])
// initialize custom collector metrics
initMetricCollector(mgr)
runAll(ctx, mgr, predictorMgr, dataSourceProviders[providers.PrometheusDataSource], opts)
return nil
}
逐行分析:
- 7-18行根据命令行参数初始化
ctrl.Options
实例,主要参数有metrics地址、健康检查地址、LeaderElection等 - 20~29行创建控制器管理器实例,并配置健康检查地址和对应的函数
- 31行初始化历史指标查询数据源
- 32行初始化预测算法管理器
- 34行根据配置启动
feature
- 35行初始化工作节点名索引器
- 36行初始化k8s自定义crd的webhook服务
- 38~49行实例化Pod OOM控制器获取Pod OOM的事件记录
- 51行初始化推荐框架管理器
- 52行初始化自定义crd的控制器,比如ehpa、evpa、rr(推荐规则)等k8s控制器
- 54行初始化自定义指标收集器
- 55行传递相关参数调用
runAll
函数启动服务,在runAll
中利用errorgroup
启动了预测管理、容器运行时管理、crane的dashboard接口服务。
如果对初始化阶段调用的各个函数感兴趣的话,可以自行深入去通读相关源码,启动分析到此结束,下面把主要精力放在ehpa控制源码的分析上面。
EHPA 核心源码
初始化
ehpa 控制器初始化,并启动控制器循环机制的代码逻辑在cmd/craned/app/manager.go
文件中,更具体点是在initControllers
函数中,代码如下:
var ehpaController = &ehpa.EffectiveHPAController{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
RestMapper: mgr.GetRESTMapper(),
Recorder: mgr.GetEventRecorderFor("effective-hpa-controller"),
ScaleClient: scaleClient,
Config: opts.EhpaControllerConfig,
}
if err := (ehpaController).SetupWithManager(mgr); err != nil {
klog.Exit(err, "unable to create controller", "controller", "EffectiveHPAController")
}
逐行分析:
- 2~7行给控制器核心字段赋值
- Client是k8s apisever的客户端,客户端可以对 Kubernetes 对象执行 CRUD 操作。
- Scheme 定义了序列化和反序列化 API 对象的方法,一个类型 用于在 Go 之间转换组、版本和种类信息的注册表模式,以及不同版本的 Go 模式之间的映射。 一个方案是随着时间的推移,版本化 API 和版本化配置的基础。
- RestMapper 允许客户端将资源映射到种类,并映射种类和版本到用于操作这些对象的接口。
- Recorder 事件上报客户端
- ScaleClient 副本数控制客户端
- Config ehpa的核心配置,主要配置是创建hpa时要向下传递的Label和Annotation,它们都有根据前缀匹配ehpa的配置和直接根据配置向下传播的配置
- 10行调用
SetupWithManager
启动ehpa控制,其中最核心的一行代码如下。
return ctrl.NewControllerManagedBy(mgr).
For(&autoscalingapi.EffectiveHorizontalPodAutoscaler{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Owns(&autoscalingv2.HorizontalPodAutoscaler{}).
Owns(&predictionapi.TimeSeriesPrediction{}).
Complete(c)
控制循环 Reconcile
func (c *EffectiveHPAController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
klog.V(4).Infof("Got ehpa %s", req.NamespacedName)
ehpa := &autoscalingapi.EffectiveHorizontalPodAutoscaler{}
err := c.Client.Get(ctx, req.NamespacedName, ehpa)
if err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
RecordMetrics(ehpa)
...
}
- 4~8 行更加参数 ctrl.Request 获取 EHPA 配置数据,如果已经删除直接退出本次后续逻辑
- 10 行记录metrics数据,主要记录副本数统计数据
func (c *EffectiveHPAController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
...
newStatus := ehpa.Status.DeepCopy()
scale, mapping, err := utils.GetScale(ctx, c.RestMapper, c.ScaleClient, ehpa.Namespace, ehpa.Spec.ScaleTargetRef)
if err != nil {
c.Recorder.Event(ehpa, v1.EventTypeWarning, "FailedGetScale", err.Error())
klog.Errorf("Failed to get scale, ehpa %s", klog.KObj(ehpa))
setCondition(newStatus, autoscalingapi.Ready, metav1.ConditionFalse, "FailedGetScale", "Failed to get scale")
c.UpdateStatus(ctx, ehpa, newStatus)
return ctrl.Result{}, err
}
if scale.Spec.Replicas == 0 && *ehpa.Spec.MinReplicas != 0 {
newStatus.CurrentReplicas = &scale.Spec.Replicas
setCondition(newStatus, autoscalingapi.Ready, metav1.ConditionFalse, "ScalingDisabled", "scaling is disabled since the replica count of the target is zero")
c.UpdateStatus(ctx, ehpa, newStatus)
return ctrl.Result{}, err
}
...
}
- 5~12行 判断ehpa目标资源是否支持扩缩容,如果不支持,记录状态数据后,退出后续逻辑
- 14~19行判断副本数配置,如果配置异常,记录状态数据后,退出后续逻辑
func (c *EffectiveHPAController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
...
var substitute *autoscalingapi.Substitute
if ehpa.Spec.ScaleStrategy == autoscalingapi.ScaleStrategyPreview {
substitute, err = c.ReconcileSubstitute(ctx, ehpa, scale)
if err != nil {
setCondition(newStatus, autoscalingapi.Ready, metav1.ConditionFalse, "FailedReconcileSubstitute", "Failed to reconcile substitute")
c.UpdateStatus(ctx, ehpa, newStatus)
return ctrl.Result{}, err
}
}
// reconcile prediction if enabled
var tsp *predictionapi.TimeSeriesPrediction
if utils.IsEHPAPredictionEnabled(ehpa) && utils.IsEHPAHasPredictionMetric(ehpa) {
tsp, err = c.ReconcilePredication(ctx, ehpa)
if err != nil {
setCondition(newStatus, autoscalingapi.Ready, metav1.ConditionFalse, "FailedReconcilePrediction", err.Error())
c.UpdateStatus(ctx, ehpa, newStatus)
return ctrl.Result{}, err
}
setPredictionCondition(newStatus, tsp.Status.Conditions)
}
hpa, err := c.ReconcileHPA(ctx, ehpa, substitute, tsp)
if err != nil {
setCondition(newStatus, autoscalingapi.Ready, metav1.ConditionFalse, "FailedReconcileHPA", err.Error())
c.UpdateStatus(ctx, ehpa, newStatus)
return ctrl.Result{}, err
}
newStatus.ExpectReplicas = &hpa.Status.DesiredReplicas
newStatus.CurrentReplicas = &hpa.Status.CurrentReplicas
if hpa.Status.LastScaleTime != nil && newStatus.LastScaleTime != nil && hpa.Status.LastScaleTime.After(newStatus.LastScaleTime.Time) {
newStatus.LastScaleTime = hpa.Status.LastScaleTime
}
setHPACondition(newStatus, hpa.Status.Conditions)
// sync custom metric to annotations
if hpa.Status.CurrentMetrics != nil {
var currentMetrics string
if ehpa.Annotations == nil {
ehpa.Annotations = map[string]string{}
}
currentMetrics = ehpa.Annotations[known.EffectiveHorizontalPodAutoscalerCurrentMetricsAnnotation]
valueBytes, err := yaml.Marshal(hpa.Status.CurrentMetrics)
if err == nil && currentMetrics != string(valueBytes) {
ehpa.Annotations[known.EffectiveHorizontalPodAutoscalerCurrentMetricsAnnotation] = string(valueBytes)
klog.V(4).Infof("Updating ehpa %s current metrics: %s.", klog.KObj(ehpa), string(valueBytes))
err := c.Client.Update(ctx, ehpa)
if err != nil {
klog.Errorf("Failed to update current metrics for ehpa %s: %v", klog.KObj(ehpa), err)
}
klog.Infof("Updated ehpa %s current metrics: %s.", klog.KObj(ehpa), string(valueBytes))
}
}
// scale target to its specific replicas for Preview strategy
if ehpa.Spec.ScaleStrategy == autoscalingapi.ScaleStrategyPreview && ehpa.Spec.SpecificReplicas != nil && *ehpa.Spec.SpecificReplicas != scale.Status.Replicas {
scale.Spec.Replicas = *ehpa.Spec.SpecificReplicas
updatedScale, err := c.ScaleClient.Scales(scale.Namespace).Update(ctx, mapping.Resource.GroupResource(), scale, metav1.UpdateOptions{})
if err != nil {
c.Recorder.Event(ehpa, v1.EventTypeWarning, "FailedManualScale", err.Error())
msg := fmt.Sprintf("Failed to manual scale target to specific replicas, ehpa %s replicas %d", klog.KObj(ehpa), *ehpa.Spec.SpecificReplicas)
klog.Error(err, msg)
setCondition(newStatus, autoscalingapi.Ready, metav1.ConditionFalse, "FailedScale", msg)
c.UpdateStatus(ctx, ehpa, newStatus)
return ctrl.Result{}, err
}
klog.Infof("Manual scale target to specific replicas, ehpa %s replicas %d", klog.KObj(ehpa), *ehpa.Spec.SpecificReplicas)
now := metav1.Now()
newStatus.LastScaleTime = &now
newStatus.CurrentReplicas = &updatedScale.Status.Replicas
}
setCondition(newStatus, autoscalingapi.Ready, metav1.ConditionTrue, "EffectiveHorizontalPodAutoscalerReady", "Effective HPA is ready")
c.UpdateStatus(ctx, ehpa, newStatus)
return ctrl.Result{}, nil
}
- 3~11行判断EHPA是不是预览(Preview)模式,如果是预览模式的话,控制器会创建或者更新
Substitute
资源对象 - 13~23行判断EHPA是否开启,如果开启则创建或者更新对应的
TimeSeriesPrediction
资源对象 - 25~30行创建对应的
HPA
资源对象 - 32~39行更新当前EHPA状态数据
- 42~59行根据HPA的状态数据给当前EHPA增加注解数据
- 62~81行更新状态数据和
scale
副本数配置
从上面的分析我们可以看到EHPA控制器会创建TimeSeriesPrediction
资源,创建TimeSeriesPrediction
资源后,TimeSeriesPrediction
控制器就开始工作,会驱动对应的预测算法获取历史监控数据,进行副本数预测,病生成预测数据,metric-adapter会读取最终的预测数据,给HPA提供指标数据,从而干预影响workload的的扩缩容。
作者能力有限,如果有理解不正确的还请大佬们指出。