polaris-go 源码走读

2022-10-28
4分钟阅读时长

provider

实例化ProviderAPI

  • NewProviderAPI() -> 根据默认配置文件./polaris.yaml 实例化
    • api.newProviderAPI->api.newProviderAPIByConfig->api.InitContextByConfig->[api.newProviderAPIByContext]->&providerAPI{rawAPI: p}
  • NewProviderAPIByFile(path string) -> 根据指定配置文件实例化
    • api.newProviderAPIByFile->api.InitContextByFile->api.InitContextByStream->api.InitContextByConfig->[api.newProviderAPIByContext]->&providerAPI{rawAPI: p}
  • NewProviderAPIByConfig(cfg config.Configuration) -> 根据指定配置实例化
    • api.newProviderAPIByConfig->api.InitContextByConfig->[api.newProviderAPIByContext]
  • NewProviderAPIByContext(context api.SDKContext) -> 根据指定api.SDKContext实例化
    • api.newProviderAPIByContext->&providerAPI{rawAPI: p}
  • NewProviderAPIByAddress(address …string) -> 根据指定 polaris-server 地址实例化
    • api.newProviderAPIByAddress->[config.NewDefaultConfiguration]->api.newProviderAPIByConfig->api.InitContextByConfig->[api.newProviderAPIByContext]->&providerAPI{rawAPI: p}

以上所有方法最终都会走到api.newProviderAPIByContex方法, 然后返回&providerAPI{rawAPI: p}

![image-20221104120449226](/Users/lbbniu/Library/Application Support/typora-user-images/image-20221104120449226.png)

相关接口定义

// ProviderAPI CL5服务端API的主接口
type ProviderAPI interface {
	// RegisterInstance
	// minimum supported version of polaris-server is v1.10.0
	RegisterInstance(instance *InstanceRegisterRequest) (*model.InstanceRegisterResponse, error)
	// Deregister synchronize the anti registration service
	Deregister(instance *InstanceDeRegisterRequest) error
	// Destroy the api is destroyed and cannot be called again
	Destroy()
}

RegisterInstance

// providerAPI 调用者对外函数实现
type providerAPI struct {
	rawAPI api.ProviderAPI
}

func (p *providerAPI) RegisterInstance(instance *InstanceRegisterRequest)
  p.rawAPI.RegisterInstance((*api.InstanceRegisterRequest)(instance))
	c.context.GetEngine().SyncRegisterV2(&instance.InstanceRegisterRequest)
	  request.SetDefaultTTL() // 设置默认ttl
	  // 同步进行服务注册
	  e.doSyncRegister(request, registerstate.CreateRegisterV2Header())
	    data.BuildControlParam(instance, e.configuration, param) // build 实例参数
        data.RetrySyncCall("register", ... // 带重试机制
          // connector proxy 方法
          e.connector.RegisterInstance(request.(*model.InstanceRegisterRequest), header)  
            p.ServerConnector.RegisterInstance(req, header) // serverconnector plugin
               conn, err := g.connManager.GetConnection(opKey, config.DiscoverCluster) // 获取server连接
               namingClient := namingpb.NewPolarisGRPCClient(network.ToGRPCConn(conn.Conn)) // grpc 客户端
               reqProto := registerRequestToProto(req) // 返回 namingpb.Instance 类型
               pbResp, err := namingClient.RegisterInstance(ctx, reqProto) // grpc 调用完成服务注册
	  // 心跳上报以及出错重新同步进行服务注册
	  e.registerStates.PutRegister(request, e.doSyncRegister, e.SyncHeartbeat)
        // regis = e.doSyncRegister, beat = e.SyncHeartbeat
        go c.runHeartbeat(ctx, state, regis, beat)
          ticker := time.NewTicker(time.Duration(*instance.TTL) * time.Second)
          <-ticker.C // 定时上报健康状态
            beat(hbReq) // e.SyncHeartbeat(hbReq)
              // connector proxy 方法
              e.connector.Heartbeat(request.(*model.InstanceHeartbeatRequest))
                p.ServerConnector.Heartbeat(instance) // serverconnector plugin
                  conn, err := g.connManager.GetConnection(opKey, config.HealthCheckCluster) 
                  namingClient := namingpb.NewPolarisGRPCClient(network.ToGRPCConn(conn.Conn))
                  reqProto := heartbeatRequestToProto(req) // 心跳上报数据组装:namingpb.Instance 类型
                  pbResp, err := namingClient.Heartbeat(ctx, reqProto) // grpc调用更新心跳数据
            // 如果心跳上报失败,根据以下条件判断是否需要重新注册
            needRegis := errCnt > _maxHeartbeatErrorCount && time.Since(state.lastRegisterTime) > minInterval
            // 如果 needRegis 为 true,重新注册, regis = e.doSyncRegister
            _, err = regis(instance, CreateRegisterV2Header()) 

Deregister

// providerAPI 调用者对外函数实现
type providerAPI struct {
	rawAPI api.ProviderAPI
}

// Deregister 同步反注册
func (p *providerAPI) Deregister(instance *InstanceDeRegisterRequest) error {
  p.rawAPI.Deregister((*api.InstanceDeRegisterRequest)(instance))
    c.context.GetEngine().SyncDeregister(&instance.InstanceDeRegisterRequest)
      _, err := data.RetrySyncCall("deregister", ... // 带重试机制
        e.connector.DeregisterInstance(request.(*model.InstanceDeRegisterRequest)) // serverconnector plugin
          conn, err := g.connManager.GetConnection(opKey, config.DiscoverCluster) // 获取server连接
          namingClient := namingpb.NewPolarisGRPCClient(network.ToGRPCConn(conn.Conn)) // grpc 客户端
          reqProto := deregisterRequestToProto(req) // 反注册数据组装: namingpb.Instance 类型
          pbResp, err := namingClient.DeregisterInstance(ctx, reqProto) // grpc调用反注册

consumer

实例化 ConsumerAPI

  • NewConsumerAPI() -> 根据默认配置文件./polaris.yaml 实例化
    • api.newConsumerAPI->api.newConsumerAPIByConfig->api.InitContextByConfig->&consumerAPI{context}
  • NewConsumerAPIByFile(path string) -> 根据指定配置文件实例化
    • newConsumerAPIByFile->InitContextByFile->InitContextByStream->InitContextByConfig->&consumerAPI{context}
  • NewConsumerAPIByConfig(cfg config.Configuration) -> 根据指定配置实例化
    • api.newConsumerAPIByConfig->api.InitContextByConfig–>&consumerAPI{context}
  • NewConsumerAPIByContext(context api.SDKContext) -> 根据指定api.SDKContext实例化
    • api.newConsumerAPIByContext->&consumerAPI{context}
  • NewConsumerAPIByAddress(address …string) -> 根据指定 polaris-server 地址实例化
    • api.newConsumerAPIByAddress->[config.NewDefaultConfiguration]->api.newConsumerAPIByConfig->api.InitContextByConfig->&consumerAPI{context}

以上所有方法最终都会走到api.InitContextByConfig(cfg config.Configuration)方法, 然后返回&consumerAPI{context}

相关接口定义

// ConsumerAPI 主调端API方法
type ConsumerAPI interface {
	// GetOneInstance 获取单个服务(会执行路由链与负载均衡,获取负载均衡后的服务实例)
	GetOneInstance(req *GetOneInstanceRequest) (*model.OneInstanceResponse, error)
	// GetInstances 获取可用的服务列表(会执行路由链,默认去掉隔离以及不健康的服务实例)
	GetInstances(req *GetInstancesRequest) (*model.InstancesResponse, error)
	// GetAllInstances 获取完整的服务列表(包括隔离及不健康的服务实例)
	GetAllInstances(req *GetAllInstancesRequest) (*model.InstancesResponse, error)
	// GetRouteRule 同步获取服务路由规则
	GetRouteRule(req *GetServiceRuleRequest) (*model.ServiceRuleResponse, error)
	
    // UpdateServiceCallResult 上报服务调用结果
	UpdateServiceCallResult(req *ServiceCallResult) error
	
    
	// WatchService 订阅服务消息
	WatchService(req *WatchServiceRequest) (*model.WatchServiceResponse, error)
	// GetServices 根据业务同步获取批量服务
	GetServices(req *GetServicesRequest) (*model.ServicesResponse, error)
	// InitCalleeService 初始化服务运行中需要的被调服务
	InitCalleeService(req *InitCalleeServiceRequest) error
    
    // Destroy 销毁API,销毁后无法再进行调用
	Destroy()
}

实例化 api.SDKContext

  • NewSDKContext() -> 根据默认配置文件./polaris.yaml创建SDK上下文
    • api.InitContextByConfig(config.NewDefaultConfigurationWithDomain())
  • NewSDKContextByAddress(address …string) -> 根据address创建SDK上下文
    • api.InitContextByConfig(config.NewDefaultConfiguration(address))
  • NewSDKContextByConfig(cfg config.Configuration) -> 根据配置创建SDK上下文
    • api.InitContextByConfig(cfg)
// SDKContext .
// @brief SDK配置对象,每个API实例都会挂载一个context,包含:
// 插件实例列表
// 配置实例
// 执行流程引擎,包括定时器等
type SDKContext interface {
	// Destroy
	// @brief 销毁SDK上下文
	Destroy()

	// IsDestroyed
	// @brief SDK上下文是否已经销毁
	IsDestroyed() bool

	// GetConfig
	// @brief 获取全局配置信息
	GetConfig() config.Configuration

	// GetPlugins
	// @brief 获取插件列表
	GetPlugins() plugin.Manager

	// GetEngine
	// @brief 获取执行引擎
	GetEngine() model.Engine

	// GetValueContext
	// @brief 获取值上下文
	GetValueContext() model.ValueContext
}

router

实例化 RouterAPI

  • NewRouterAPI() -> 根据默认配置文件./polaris.yaml 实例化
    • NewRouterAPI->NewRouterAPIByConfig->api.InitContextByConfig->&routerAPI{context}
  • NewRouterAPIByFile(path string) -> 根据指定配置文件实例化
    • NewRouterAPIByFile->api.InitContextByFile->api.InitContextByStream->api.InitContextByConfig->&routerAPI{context}
  • NewRouterAPIByConfig(cfg config.Configuration) -> 根据指定配置实例化
    • NewRouterAPIByConfig->api.InitContextByConfig->&routerAPI{context}
  • NewRouterAPIByContext(context api.SDKContext) -> 根据指定api.SDKContext实例化
    • NewRouterAPIByContext->&routerAPI{context}
  • NewRouterAPIByAddress(address …string) -> 根据指定 polaris-server 地址实例化
    • NewRouterAPIByAddress->config.NewDefaultConfiguration(address)->NewRouterAPIByConfig->api.InitContextByConfig(cfg)->&routerAPI{context}

以上所有方法最终都会走到api.InitContextByConfig(cfg config.Configuration)方法, 然后返回&routerAPI{context}

相关接口定义

// RouterAPI 路由API方法
type RouterAPI interface {
	// ProcessRouters process routers to filter instances
	ProcessRouters(*ProcessRoutersRequest) (*model.InstancesResponse, error)
	// ProcessLoadBalance process load balancer to get the target instances
	ProcessLoadBalance(*ProcessLoadBalanceRequest) (*model.OneInstanceResponse, error)
}

limiter

实例化 LimitAPI

  • NewLimitAPI() -> 根据默认配置文件./polaris.yaml 实例化
    • api.newLimitAPI->api.newLimitAPIByConfig->api.InitContextByConfig->&limitAPI{context}
  • NewLimitAPIByFile(path string) -> 根据指定配置文件实例化
    • api.newLimitAPIByFile->api.InitContextByFile->api.InitContextByStream->api.InitContextByConfig->&limitAPI{context}
  • NewLimitAPIByConfig(cfg config.Configuration) -> 根据指定配置实例化
    • api.newLimitAPIByConfig->api.InitContextByConfig->&limitAPI{context}
  • NewLimitAPIByContext(context api.SDKContext) -> 根据指定api.SDKContext实例化
    • api.newLimitAPIByContext->&limitAPI{context}
  • NewLimitAPIByAddress(address …string) -> 根据指定 polaris-server 地址实例化
    • api.newLimitAPIByAddress->[config.NewDefaultConfiguration]->api.newLimitAPIByConfig->api.InitContextByConfig->&limitAPI{context}

以上所有方法最终都会走到api.InitContextByConfig(cfg config.Configuration)方法, 然后返回&limitAPI{context}

相关接口定义

// LimitAPI 限流相关的API相关接口
type LimitAPI interface {
	// GetQuota 获取限流配额,一次接口只获取一个配额
	GetQuota(request QuotaRequest) (QuotaFuture, error)
	// Destroy 销毁API,销毁后无法再进行调用
	Destroy()
}

config

实例化 ConfigAPI

  • NewConfigAPI() -> 根据默认配置文件./polaris.yaml 实例化
    • api.newConfigFileAPI->api.newConfigFileAPIByConfig->api.InitContextByConfig->&configAPI{rawAPI: rawAPI}
  • NewConfigAPIByFile(path string) -> 根据指定配置文件实例化
    • api.newConfigFileAPIByFile->api.InitContextByFile->api.InitContextByStream->api.InitContextByConfig->&configAPI{rawAPI: rawAPI}
  • NewConfigAPIByConfig(cfg config.Configuration) -> 根据指定配置实例化
    • api.newConfigFileAPIByConfig->api.InitContextByConfig->&configAPI{rawAPI: rawAPI}
  • NewConfigAPIByContext(context api.SDKContext) -> 根据指定api.SDKContext实例化
    • api.newConfigFileAPIBySDKContext->&configAPI{rawAPI: rawAPI}

以上前3个方法最终都会调用 api.InitContextByConfig(cfg config.Configuration) 方法初始化SDKContext, 然后返回&configAPI{rawAPI: rawAPI}

相关接口定义

// ConfigAPI 配置文件的 API.
type ConfigAPI interface {
	// GetConfigFile 获取配置文件
	GetConfigFile(namespace, fileGroup, fileName string) (ConfigFile, error)
}

// ConfigFile 文本类型配置文件对象
type ConfigFile interface {
	ConfigFileMetadata

	// GetContent 获取配置文件内容
	GetContent() string
	// HasContent 是否有配置内容
	HasContent() bool
	// AddChangeListenerWithChannel 增加配置文件变更监听器
	AddChangeListenerWithChannel(chan ConfigFileChangeEvent)
	// AddChangeListener 增加配置文件变更监听器
	AddChangeListener(cb OnConfigFileChange)
}

关注公众号获得更多精彩文章

公众号:程序员大兵