polaris-go 源码走读
provider
实例化ProviderAPI
- NewProviderAPI() -> 根据默认配置文件
./polaris.yaml
实例化- api.newProviderAPI->api.newProviderAPIByConfig->api.InitContextByConfig->[api.newProviderAPIByContext]->
&providerAPI{rawAPI: p}
- api.newProviderAPI->api.newProviderAPIByConfig->api.InitContextByConfig->[api.newProviderAPIByContext]->
- NewProviderAPIByFile(path string) -> 根据指定配置文件实例化
- api.newProviderAPIByFile->api.InitContextByFile->api.InitContextByStream->api.InitContextByConfig->[api.newProviderAPIByContext]->
&providerAPI{rawAPI: p}
- api.newProviderAPIByFile->api.InitContextByFile->api.InitContextByStream->api.InitContextByConfig->[api.newProviderAPIByContext]->
- NewProviderAPIByConfig(cfg config.Configuration) -> 根据指定配置实例化
- api.newProviderAPIByConfig->api.InitContextByConfig->[api.newProviderAPIByContext]
- NewProviderAPIByContext(context api.SDKContext) -> 根据指定
api.SDKContext
实例化- api.newProviderAPIByContext->
&providerAPI{rawAPI: p}
- api.newProviderAPIByContext->
- NewProviderAPIByAddress(address …string) -> 根据指定
polaris-server
地址实例化- api.newProviderAPIByAddress->[config.NewDefaultConfiguration]->api.newProviderAPIByConfig->api.InitContextByConfig->[api.newProviderAPIByContext]->
&providerAPI{rawAPI: p}
- api.newProviderAPIByAddress->[config.NewDefaultConfiguration]->api.newProviderAPIByConfig->api.InitContextByConfig->[api.newProviderAPIByContext]->
以上所有方法最终都会走到api.newProviderAPIByContex
方法, 然后返回&providerAPI{rawAPI: p}
![image-20221104120449226](/Users/lbbniu/Library/Application Support/typora-user-images/image-20221104120449226.png)
相关接口定义
// ProviderAPI CL5服务端API的主接口
type ProviderAPI interface {
// RegisterInstance
// minimum supported version of polaris-server is v1.10.0
RegisterInstance(instance *InstanceRegisterRequest) (*model.InstanceRegisterResponse, error)
// Deregister synchronize the anti registration service
Deregister(instance *InstanceDeRegisterRequest) error
// Destroy the api is destroyed and cannot be called again
Destroy()
}
RegisterInstance
// providerAPI 调用者对外函数实现
type providerAPI struct {
rawAPI api.ProviderAPI
}
func (p *providerAPI) RegisterInstance(instance *InstanceRegisterRequest)
p.rawAPI.RegisterInstance((*api.InstanceRegisterRequest)(instance))
c.context.GetEngine().SyncRegisterV2(&instance.InstanceRegisterRequest)
request.SetDefaultTTL() // 设置默认ttl
// 同步进行服务注册
e.doSyncRegister(request, registerstate.CreateRegisterV2Header())
data.BuildControlParam(instance, e.configuration, param) // build 实例参数
data.RetrySyncCall("register", ... // 带重试机制
// connector proxy 方法
e.connector.RegisterInstance(request.(*model.InstanceRegisterRequest), header)
p.ServerConnector.RegisterInstance(req, header) // serverconnector plugin
conn, err := g.connManager.GetConnection(opKey, config.DiscoverCluster) // 获取server连接
namingClient := namingpb.NewPolarisGRPCClient(network.ToGRPCConn(conn.Conn)) // grpc 客户端
reqProto := registerRequestToProto(req) // 返回 namingpb.Instance 类型
pbResp, err := namingClient.RegisterInstance(ctx, reqProto) // grpc 调用完成服务注册
// 心跳上报以及出错重新同步进行服务注册
e.registerStates.PutRegister(request, e.doSyncRegister, e.SyncHeartbeat)
// regis = e.doSyncRegister, beat = e.SyncHeartbeat
go c.runHeartbeat(ctx, state, regis, beat)
ticker := time.NewTicker(time.Duration(*instance.TTL) * time.Second)
<-ticker.C // 定时上报健康状态
beat(hbReq) // e.SyncHeartbeat(hbReq)
// connector proxy 方法
e.connector.Heartbeat(request.(*model.InstanceHeartbeatRequest))
p.ServerConnector.Heartbeat(instance) // serverconnector plugin
conn, err := g.connManager.GetConnection(opKey, config.HealthCheckCluster)
namingClient := namingpb.NewPolarisGRPCClient(network.ToGRPCConn(conn.Conn))
reqProto := heartbeatRequestToProto(req) // 心跳上报数据组装:namingpb.Instance 类型
pbResp, err := namingClient.Heartbeat(ctx, reqProto) // grpc调用更新心跳数据
// 如果心跳上报失败,根据以下条件判断是否需要重新注册
needRegis := errCnt > _maxHeartbeatErrorCount && time.Since(state.lastRegisterTime) > minInterval
// 如果 needRegis 为 true,重新注册, regis = e.doSyncRegister
_, err = regis(instance, CreateRegisterV2Header())
Deregister
// providerAPI 调用者对外函数实现
type providerAPI struct {
rawAPI api.ProviderAPI
}
// Deregister 同步反注册
func (p *providerAPI) Deregister(instance *InstanceDeRegisterRequest) error {
p.rawAPI.Deregister((*api.InstanceDeRegisterRequest)(instance))
c.context.GetEngine().SyncDeregister(&instance.InstanceDeRegisterRequest)
_, err := data.RetrySyncCall("deregister", ... // 带重试机制
e.connector.DeregisterInstance(request.(*model.InstanceDeRegisterRequest)) // serverconnector plugin
conn, err := g.connManager.GetConnection(opKey, config.DiscoverCluster) // 获取server连接
namingClient := namingpb.NewPolarisGRPCClient(network.ToGRPCConn(conn.Conn)) // grpc 客户端
reqProto := deregisterRequestToProto(req) // 反注册数据组装: namingpb.Instance 类型
pbResp, err := namingClient.DeregisterInstance(ctx, reqProto) // grpc调用反注册
consumer
实例化 ConsumerAPI
- NewConsumerAPI() -> 根据默认配置文件
./polaris.yaml
实例化- api.newConsumerAPI->api.newConsumerAPIByConfig->api.InitContextByConfig->
&consumerAPI{context}
- api.newConsumerAPI->api.newConsumerAPIByConfig->api.InitContextByConfig->
- NewConsumerAPIByFile(path string) -> 根据指定配置文件实例化
- newConsumerAPIByFile->InitContextByFile->InitContextByStream->InitContextByConfig->
&consumerAPI{context}
- newConsumerAPIByFile->InitContextByFile->InitContextByStream->InitContextByConfig->
- NewConsumerAPIByConfig(cfg config.Configuration) -> 根据指定配置实例化
- api.newConsumerAPIByConfig->api.InitContextByConfig–>
&consumerAPI{context}
- api.newConsumerAPIByConfig->api.InitContextByConfig–>
- NewConsumerAPIByContext(context api.SDKContext) -> 根据指定
api.SDKContext
实例化- api.newConsumerAPIByContext->
&consumerAPI{context}
- api.newConsumerAPIByContext->
- NewConsumerAPIByAddress(address …string) -> 根据指定
polaris-server
地址实例化- api.newConsumerAPIByAddress->[config.NewDefaultConfiguration]->api.newConsumerAPIByConfig->api.InitContextByConfig->
&consumerAPI{context}
- api.newConsumerAPIByAddress->[config.NewDefaultConfiguration]->api.newConsumerAPIByConfig->api.InitContextByConfig->
以上所有方法最终都会走到api.InitContextByConfig(cfg config.Configuration)
方法, 然后返回&consumerAPI{context}
相关接口定义
// ConsumerAPI 主调端API方法
type ConsumerAPI interface {
// GetOneInstance 获取单个服务(会执行路由链与负载均衡,获取负载均衡后的服务实例)
GetOneInstance(req *GetOneInstanceRequest) (*model.OneInstanceResponse, error)
// GetInstances 获取可用的服务列表(会执行路由链,默认去掉隔离以及不健康的服务实例)
GetInstances(req *GetInstancesRequest) (*model.InstancesResponse, error)
// GetAllInstances 获取完整的服务列表(包括隔离及不健康的服务实例)
GetAllInstances(req *GetAllInstancesRequest) (*model.InstancesResponse, error)
// GetRouteRule 同步获取服务路由规则
GetRouteRule(req *GetServiceRuleRequest) (*model.ServiceRuleResponse, error)
// UpdateServiceCallResult 上报服务调用结果
UpdateServiceCallResult(req *ServiceCallResult) error
// WatchService 订阅服务消息
WatchService(req *WatchServiceRequest) (*model.WatchServiceResponse, error)
// GetServices 根据业务同步获取批量服务
GetServices(req *GetServicesRequest) (*model.ServicesResponse, error)
// InitCalleeService 初始化服务运行中需要的被调服务
InitCalleeService(req *InitCalleeServiceRequest) error
// Destroy 销毁API,销毁后无法再进行调用
Destroy()
}
实例化 api.SDKContext
- NewSDKContext() -> 根据默认配置文件
./polaris.yaml
创建SDK上下文- api.InitContextByConfig(config.NewDefaultConfigurationWithDomain())
- NewSDKContextByAddress(address …string) -> 根据address创建SDK上下文
- api.InitContextByConfig(config.NewDefaultConfiguration(address))
- NewSDKContextByConfig(cfg config.Configuration) -> 根据配置创建SDK上下文
- api.InitContextByConfig(cfg)
// SDKContext .
// @brief SDK配置对象,每个API实例都会挂载一个context,包含:
// 插件实例列表
// 配置实例
// 执行流程引擎,包括定时器等
type SDKContext interface {
// Destroy
// @brief 销毁SDK上下文
Destroy()
// IsDestroyed
// @brief SDK上下文是否已经销毁
IsDestroyed() bool
// GetConfig
// @brief 获取全局配置信息
GetConfig() config.Configuration
// GetPlugins
// @brief 获取插件列表
GetPlugins() plugin.Manager
// GetEngine
// @brief 获取执行引擎
GetEngine() model.Engine
// GetValueContext
// @brief 获取值上下文
GetValueContext() model.ValueContext
}
router
实例化 RouterAPI
- NewRouterAPI() -> 根据默认配置文件
./polaris.yaml
实例化- NewRouterAPI->NewRouterAPIByConfig->api.InitContextByConfig->
&routerAPI{context}
- NewRouterAPI->NewRouterAPIByConfig->api.InitContextByConfig->
- NewRouterAPIByFile(path string) -> 根据指定配置文件实例化
- NewRouterAPIByFile->api.InitContextByFile->api.InitContextByStream->api.InitContextByConfig->
&routerAPI{context}
- NewRouterAPIByFile->api.InitContextByFile->api.InitContextByStream->api.InitContextByConfig->
- NewRouterAPIByConfig(cfg config.Configuration) -> 根据指定配置实例化
- NewRouterAPIByConfig->api.InitContextByConfig->
&routerAPI{context}
- NewRouterAPIByConfig->api.InitContextByConfig->
- NewRouterAPIByContext(context api.SDKContext) -> 根据指定
api.SDKContext
实例化- NewRouterAPIByContext->
&routerAPI{context}
- NewRouterAPIByContext->
- NewRouterAPIByAddress(address …string) -> 根据指定
polaris-server
地址实例化- NewRouterAPIByAddress->config.NewDefaultConfiguration(address)->NewRouterAPIByConfig->api.InitContextByConfig(cfg)->
&routerAPI{context}
- NewRouterAPIByAddress->config.NewDefaultConfiguration(address)->NewRouterAPIByConfig->api.InitContextByConfig(cfg)->
以上所有方法最终都会走到api.InitContextByConfig(cfg config.Configuration)
方法, 然后返回&routerAPI{context}
相关接口定义
// RouterAPI 路由API方法
type RouterAPI interface {
// ProcessRouters process routers to filter instances
ProcessRouters(*ProcessRoutersRequest) (*model.InstancesResponse, error)
// ProcessLoadBalance process load balancer to get the target instances
ProcessLoadBalance(*ProcessLoadBalanceRequest) (*model.OneInstanceResponse, error)
}
limiter
实例化 LimitAPI
- NewLimitAPI() -> 根据默认配置文件
./polaris.yaml
实例化- api.newLimitAPI->api.newLimitAPIByConfig->api.InitContextByConfig->
&limitAPI{context}
- api.newLimitAPI->api.newLimitAPIByConfig->api.InitContextByConfig->
- NewLimitAPIByFile(path string) -> 根据指定配置文件实例化
- api.newLimitAPIByFile->api.InitContextByFile->api.InitContextByStream->api.InitContextByConfig->
&limitAPI{context}
- api.newLimitAPIByFile->api.InitContextByFile->api.InitContextByStream->api.InitContextByConfig->
- NewLimitAPIByConfig(cfg config.Configuration) -> 根据指定配置实例化
- api.newLimitAPIByConfig->api.InitContextByConfig->
&limitAPI{context}
- api.newLimitAPIByConfig->api.InitContextByConfig->
- NewLimitAPIByContext(context api.SDKContext) -> 根据指定
api.SDKContext
实例化- api.newLimitAPIByContext->
&limitAPI{context}
- api.newLimitAPIByContext->
- NewLimitAPIByAddress(address …string) -> 根据指定
polaris-server
地址实例化- api.newLimitAPIByAddress->[config.NewDefaultConfiguration]->api.newLimitAPIByConfig->api.InitContextByConfig->
&limitAPI{context}
- api.newLimitAPIByAddress->[config.NewDefaultConfiguration]->api.newLimitAPIByConfig->api.InitContextByConfig->
以上所有方法最终都会走到api.InitContextByConfig(cfg config.Configuration)
方法, 然后返回&limitAPI{context}
相关接口定义
// LimitAPI 限流相关的API相关接口
type LimitAPI interface {
// GetQuota 获取限流配额,一次接口只获取一个配额
GetQuota(request QuotaRequest) (QuotaFuture, error)
// Destroy 销毁API,销毁后无法再进行调用
Destroy()
}
config
实例化 ConfigAPI
- NewConfigAPI() -> 根据默认配置文件
./polaris.yaml
实例化- api.newConfigFileAPI->api.newConfigFileAPIByConfig->api.InitContextByConfig->
&configAPI{rawAPI: rawAPI}
- api.newConfigFileAPI->api.newConfigFileAPIByConfig->api.InitContextByConfig->
- NewConfigAPIByFile(path string) -> 根据指定配置文件实例化
- api.newConfigFileAPIByFile->api.InitContextByFile->api.InitContextByStream->api.InitContextByConfig->
&configAPI{rawAPI: rawAPI}
- api.newConfigFileAPIByFile->api.InitContextByFile->api.InitContextByStream->api.InitContextByConfig->
- NewConfigAPIByConfig(cfg config.Configuration) -> 根据指定配置实例化
- api.newConfigFileAPIByConfig->api.InitContextByConfig->
&configAPI{rawAPI: rawAPI}
- api.newConfigFileAPIByConfig->api.InitContextByConfig->
- NewConfigAPIByContext(context api.SDKContext) -> 根据指定
api.SDKContext
实例化- api.newConfigFileAPIBySDKContext->
&configAPI{rawAPI: rawAPI}
- api.newConfigFileAPIBySDKContext->
以上前3个方法最终都会调用 api.InitContextByConfig(cfg config.Configuration)
方法初始化SDKContext
, 然后返回&configAPI{rawAPI: rawAPI}
相关接口定义
// ConfigAPI 配置文件的 API.
type ConfigAPI interface {
// GetConfigFile 获取配置文件
GetConfigFile(namespace, fileGroup, fileName string) (ConfigFile, error)
}
// ConfigFile 文本类型配置文件对象
type ConfigFile interface {
ConfigFileMetadata
// GetContent 获取配置文件内容
GetContent() string
// HasContent 是否有配置内容
HasContent() bool
// AddChangeListenerWithChannel 增加配置文件变更监听器
AddChangeListenerWithChannel(chan ConfigFileChangeEvent)
// AddChangeListener 增加配置文件变更监听器
AddChangeListener(cb OnConfigFileChange)
}