如何实现 Service Weaver 部署器
声明
作者能力有限,如感觉有翻译不准确的请移步原文。
原文地址:https://serviceweaver.dev/blog/deployers.html 。
Service Weaver 允许您以多种不同的方式部署应用程序。例如,您可以在单个进程 、跨多个进程 或云 中部署应用程序。毫不奇怪,部署 Service Weaver 应用程序的代码称为deployer。这篇博文解释了部署器是什么以及如何实施部署器。我们假设您熟悉如何编写 Service Weaver 应用程序。如果不是,我们建议您阅读分步教程 。
概述
Service Weaver 应用程序由许多组件 组成。该应用程序被编译成单个应用程序二进制文件。部署者通过多次运行二进制文件来部署应用程序,通常是跨多台机器。二进制文件的每个实例都运行组件的一个子集。为了了解要运行哪些组件,二进制链接在一个称为 weavelet 的小型后台代理中,部署者使用envelope与之通信。如下图所示。
在这篇博文中,我们提供了对weavelets
、deployers
和envelopes
的高级概述。然后,我们通过完全从头开始实施多进程部署程序,深入了解它们的工作原理。
Weavelets
要了解部署器,我们必须首先了解小波。Service Weaver 应用程序被编译成单个可执行二进制文件。链接到二进制文件中的 Service Weaver 库包括一个称为weavelet的小代理,它是在您调用 weaver.Init
时创建的。Weavelet 的主要职责是启动和管理一组组件。
部署 Service Weaver 应用程序时,不只有一个 weavelet。如果有只有一个,Service Weaver 应用程序就不会非常分散。相反,部署人员会多次运行您的二进制文件——在不同机器上的不同进程中以启动多个 weavelet,这些 weavelet 协同工作以执行您的分布式应用程序。
每个 weavelet 都承载一组可能不同的组件。因为组件是复制的,所以一个组件可能由多个weavelets托管。例如,考虑一个包含组件A
、B
和的应用程序C
。下图显示了一个由三个weavelets组成的示例部署。weavelet 1 托管组件A
和B
;weavelet 2 承载组件B
和 C
,weavelet 3 承载组件C
。
您还会注意到每个小波都有一个唯一的网络地址。Weavelets 使用这些地址来执行远程方法调用。例如,假设图中 weavelet 1上的A
组件想要调用组件C
上的方法。weavelet 1 将联系地址 2.2.2.2 上的 weavelet 2 或地址 3.3.3.3 上的 weavelet 3 以执行该方法。
部署者
部署者通过启动和管理一组 weavelet 来分发 Service Weaver 应用程序。管理 weavelet 涉及与 (1) 组件、(2) 侦听器和 (3) 遥测相关的三个主要职责。
- 组件。部署者启动 weavelets 并告诉他们要托管哪些组件。部署者还确保 weavelets 知道其他 weavelets 的地址。例如,如果部署者启动了一个新的 weavelet,则部署者会通知所有其他 weavelet 新 weavelet 的存在,包括其地址和它托管的组件。相反,如果一个部署者检测到一个 weavelet 已经失败,部署者会通知所有其他 weavelet 它的失败。
- 侦听器。当组件想要为外部流量提供服务时,它会请求网络侦听器。部署者为侦听器选择一个地址并确保侦听器可公开访问。多个 weavelet 可能共享同一个侦听器,部署者必须确保它们之间的流量平衡。这通常涉及运行或配置代理。
- 遥测。部署者收集、聚合和导出由 weavelet 生成的所有遥测数据。这包括日志、指标、跟踪和配置文件。
部署者和 weavelet 通过一对 Unix 管道交换协议缓冲区进行通信。我们将与 weavelet 通信的部署器部分称为envelope。在本文的附录中,我们描述了 envelope-weavelet 协议的底层细节,但对于大多数部署者来说,使用 ServiceWeaver 的Envelope
API 应该就足够了。
envelope和weavelet之间的通信要么是weavelet发起的,要么是envelope发起的。Weavelet 发起的通信显示为EnvelopeHandler
对部署者实现提供的接口的方法调用。
type EnvelopeHandler interface {
// Components.
ActivateComponent(context.Context, *protos.ActivateComponentRequest) (*protos.ActivateComponentReply, error)
// Listeners.
GetListenerAddress(context.Context, *protos.GetListenerAddressRequest) (*protos.GetListenerAddressReply, error)
ExportListener(context.Context, *protos.ExportListenerRequest) (*protos.ExportListenerReply, error)
// Telemetry.
HandleLogEntry(context.Context, *protos.LogEntry) error
HandleTraceSpans(context.Context, []trace.ReadOnlySpan) error
}
envelope发起的通信是通过调用 上的方法来执行的 Envelope
。
// Components.
func (e *Envelope) UpdateRoutingInfo(routing *protos.RoutingInfo) error
func (e *Envelope) UpdateComponents(components []string) error
// Telemetry.
func (e *Envelope) GetHealth() protos.HealthStatus
func (e *Envelope) GetMetrics() ([]*metrics.MetricSnapshot, error)
func (e *Envelope) GetLoad() (*protos.LoadReport, error)
func (e *Envelope) GetProfile(req *protos.GetProfileRequest) ([]byte, error)
一个简单的多进程部署器
在本节中,我们将实现一个功能完备的多进程部署器
。我们会将我们的部署程序编译成一个名为 deploy
. 然后我们将能够通过运行来部署 Service Weaver 二进制文件 ./deploy <Service Weaver binary>
。为简单起见,我们的部署人员不会共同定位或复制任何组件。每个组件都将在一个单独的 weavelet 中自行运行。我们首先为部署程序和 weavelet 声明类型。
package main
import ...
// deployer is a simple multiprocess deployer that doesn't implement
// co-location or replication. That is, every component is run in its own OS
// process, and there is only one replica of every component.
type deployer struct {
mu sync.Mutex // guards handlers
handlers map[string]*handler // handlers, by component
}
// A handler handles messages from a weavelet. It implements the
// EnvelopeHandler interface.
type handler struct {
deployer *deployer // underlying deployer
envelope *envelope.Envelope // envelope to the weavelet
address string // weavelet's address
}
// Check that handler implements the envelope.EnvelopeHandler interface.
var _ envelope.EnvelopeHandler = &handler{}
接下来,我们实现一个spawn
方法来生成一个 weavelet 来托管一个组件。
- 为了生成
weavelet
并Envelope
与它通信,我们调用该envelope.NewEnvelope
函数。此函数接受EnvelopeInfo
传递给 weavelet 的 和AppConfig
描述应用程序的 。在子进程中NewEnvelope
运行提供的 Service Weaver 二进制文件(在本例中)。flag.Arg(0)
然后它返回一个Envelope
通过一对 Unix 管道与weavelet
通信的 an。 - 我们调用该
UpdateComponents
方法来告诉weavelet
运行哪个组件。部署者应该在组件集发生变化时调用时调用UpdateComponents
通知weavelet
。 - 我们调用
envelope.Serve
来处理来自weavelet
的请求。
// spawn spawns a weavelet to host the provided component (if one hasn't
// already spawned) and returns a handler to the weavelet.
func (d *deployer) spawn(component string) (*handler, error) {
d.mu.Lock()
defer d.mu.Unlock()
// Check if a weavelet has already been spawned.
h, ok := d.handlers[component]
if ok {
// The weavelet has already been spawned.
return h, nil
}
// Spawn a weavelet in a subprocess to host the component.
info := &protos.EnvelopeInfo{
App: "app", // the application name
DeploymentId: deploymentId, // the deployment id
Id: uuid.New().String(), // the weavelet id
SingleProcess: false, // is the app a single process?
SingleMachine: true, // is the app on a single machine?
RunMain: component == "main", // should the weavelet run main?
}
config := &protos.AppConfig{
Name: "app", // the application name
Binary: flag.Arg(0), // the application binary
}
envelope, err := envelope.NewEnvelope(context.Background(), info, config)
if err != nil {
return nil, err
}
h = &handler{
deployer: d,
envelope: envelope,
address: envelope.WeaveletInfo().DialAddr,
}
go func() {
// Inform the weavelet of the component it should host.
envelope.UpdateComponents([]string{component})
// Handle messages from the weavelet.
envelope.Serve(h)
}()
// Return the handler.
d.handlers[component] = h
return h, nil
}
现在,我们实现这些EnvelopeHandler
方法,这些方法处理 weavelet 发起的与部署者的通信。
对于每种EnvelopeHandler
方法,我们还总结了该方法的实现方式weaver multi
和实现方式,以便您更好地了解更高级的部署人员如何实现这些方法。 weaver gke
组件部分
首先,我们实施ActivateComponent
. 当由 weavelet 承载的组件调用weaver.Get[T]
获取某个组件时T
,weavelet 调用 ActivateComponent
方法激活T
。ActivateComponent
应该启动组件——可能有多个副本——如果它还没有启动的话。
我们的处理程序调用deployer.spawn
生成一个新的 weavelet
来托管组件。处理程序然后调用UpdateRoutingInfo
以将新产生的weavelet
地址通知所有的weavelet
。这允许请求 weavelet 上的组件与新生成的 weavelet 上的组件执行 RPC。
以上图为例,如果组件A
在weavelet 1 上的组件激活了 组件 C
,那么部署者会生成 weavelet 2 和 3(如果它们尚未生成),然后告诉 weavelet 1 服务 weavelet 2 和 3 的地址。
每当 weavelet 调用 ActivateComponent 的组件的路由信息发生变化时,部署者就应该调用 UpdateRoutingInfo。例如,如果部署者检测到一个 weavelet 托管组件A
已经崩溃,它应该在所有已调用 A 上的 ActivateComponent 的weavelet组件上调用 UpdateRoutingInfo,并使用新的路由信息省略失败的weavelet组件的地址。
// Responsibility 1: Components.
func (h *handler) ActivateComponent(_ context.Context, req *protos.ActivateComponentRequest) (*protos.ActivateComponentReply, error) {
// Spawn a weavelet to host the component, if one hasn't already been
// spawned.
spawned, err := h.deployer.spawn(req.Component)
if err != nil {
return nil, err
}
// Tell the weavelet the address of the requested component.
h.envelope.UpdateRoutingInfo(&protos.RoutingInfo{
Component: req.Component,
Replicas: []string{spawned.address},
})
return &protos.ActivateComponentReply{}, nil
}
weaver multi
,就像我们的部署者一样,在子流程中生成weavelet。在Kubernetes 部署
weaver gke
中生成小波。
侦听器
接下来,我们实现侦听器方法。当组件请求网络侦听器时,GetListenerAddress
将调用该方法。此方法返回组件应侦听的地址。我们简单的部署器总是返回 "localhost:0"
。
在 weavelet 从 接收到地址后GetListenerAddress
,它会在该地址上创建一个网络侦听器,并ExportListener
使用它正在侦听的具体地址调用该方法。例如,在 weavelet "localhost:0"
从我们的GetListenerAddress
实现中接收后,它会监听 "localhost:0"
. 这会产生一个可拨号地址,例如"127.0.0.1:35879"
,然后 weavelet 将其报告给处理ExportListener
程序。我们简单的部署器只是打印出这个地址供用户直接联系。
// Responsibility 2: Listeners.
func (h *handler) GetListenerAddress(_ context.Context, req *protos.GetListenerAddressRequest) (*protos.GetListenerAddressReply, error) {
return &protos.GetListenerAddressReply{Address: "localhost:0"}, nil
}
func (h *handler) ExportListener(_ context.Context, req *protos.ExportListenerRequest) (*protos.ExportListenerReply, error) {
// This simplified deployer does not proxy network traffic. Listeners
// should be contacted directly.
fmt.Printf("Weavelet listening on %s\n", req.Address)
return &protos.ExportListenerReply{}, nil
}
weaver multi
GetListenerAddress
总是返回的实施"localhost:0"
。它在传递给Listener的
ListenerOptions
字段ExportListener
中指定的地址上运行本地 HTTP 代理。此代理平衡报告给 的侦听器地址之间的流量。通过在 Google Cloud 中配置负载平衡器来实现侦听器。 LocalAddress``ExportListener
weaver gke
遥测
最后,我们实现遥测方法。该方法接收由小波产生的所有日志HandleLogEntry
。我们的部署人员使用 Service Weaver 库中的漂亮打印机logging
将日志打印到标准输出。类似地,小波产生的所有踪迹都被函数接收HandleTraceSpans
。为简单起见,我们的部署者忽略了痕迹。
// Responsibility 3: Telemetry.
func (h *handler) HandleLogEntry(_ context.Context, entry *protos.LogEntry) error {
pp := logging.NewPrettyPrinter(colors.Enabled())
fmt.Println(pp.Format(entry))
return nil
}
func (h *handler) HandleTraceSpans(context.Context, []trace.ReadOnlySpan) error {
// This simplified deployer drops traces on the floor.
return nil
}
weaver multi
将日志和跟踪写入文件。weaver gke
将日志和跟踪导出到Cloud Logging
和Cloud Trace
。
主要的
main
最后,我们为部署者实现一个功能。我们创建一个 deployer
,生成主要组件,然后阻止。
func main() {
flag.Parse()
d := &deployer{handlers: map[string]*handler{}}
d.spawn("main")
select {} // block forever
}
如果我们编译我们的部署程序,我们可以将一个 Service Weaver 二进制文件传递给它进行部署。
$ go build -o deploy main.go # compile the deployer
$ ./deploy <Service Weaver binary> # deploy an application
高级部署程序功能
上一节中的多进程部署器被设计得尽可能简单。另一方面,现实世界的部署人员需要许多更高级的功能。列举和解释如何实现这些功能超出了本博文的范围,但我们将在此处总结一些高级功能。weaver multi
您还可以查看我们和部署者的实现weaver gke
作为参考。
- 长寿和坚持。上一节中的多进程部署器只在它部署的应用程序存在时存在。真实世界的部署者应该是长期运行和容错的服务。
weaver gke
例如,部署者在 Kubernetes 集群中运行一个长时间运行的控制器作业,该作业将其状态持久保存到一个高度一致的数据存储中。它还在部署应用程序的每个集群中运行一个长时间运行的从属作业。当您运行weaver gke deploy
部署应用程序时,该应用程序将发送到控制器,控制器又将其分发给下属。 - 故障检测。部署者应该检测一个 weavelet 何时失败并相应地通知其他 weavelet。您可以使用该
Envelope.GetHealth
方法检查小波的健康状况。多机部署者将不得不实施自己的健康检查以检测机器故障。 - 路由。部署者应该通过监视路由组件上的负载并生成平衡此负载的路由分配来支持
路由组件。
您可以使用该 方法获取小波的负载。
Envelope.GetLoad
- 推出。部署者应该实施版本化推出 ,允许推出应用程序的一个版本以替代先前运行的版本。
- 工装。部署者应该提供工具来检查和调试应用程序的状态。
weaver multi status
、weaver multi dashboard
和weaver multi logs
,例如,可用于检查使用 部署的应用程序weaver multi deploy
。
附录:信封-小波协议
在本附录中,我们描述了信封和小波之间的低级通信协议。大多数部署程序应该使用前面描述的高级 Envelope
API,但是如果您想完全从头开始实施部署程序(比如使用另一种编程语言),了解低级细节很重要。
信封和小波通过一对 Unix 管道相互通信。一根管道从包络线延伸到小波,另一根从小波向相反方向延伸到包络线。信封和 weavelet 通过这些管道交换EnvelopeMsg 和WeaveletMsg 协议缓冲区。
当信封和小波首先建立它们的连接时,它们会执行一次 握手。信封发送一个EnvelopeInfo
,小波响应一个WeaveletInfo
。握手后,信封和小波自由通信。存在三种通信模式。
- 信封可以启动针对小波的 RPC。
- Weavelet 可以针对信封启动 RPC。
- Weavelet 可以将未确认的 RPC 发送到信封。
这三种通信形式可以任意交错。例如,在信封通过管道发送 RPC 请求后,它可能会在收到 RPC 回复之前通过管道接收到许多不相关的消息。
为了使这种交互具体化,我们将实施世界上最简单的部署程序
。这个简单的部署程序启动一个 Service Weaver 二进制文件,通过管道与 weavelet 交换一些消息,然后终止。main
我们从一个接收 Service Weaver 二进制文件作为其第一个也是唯一一个参数并将其传递给函数的函数开始run
。
package main
import ...
func main() {
flag.Parse()
if err := run(context.Background(), flag.Arg(0)); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}
为了实现这个run
功能,我们首先创建一对管道。第一条管道从envelopeWriter
包络中延伸到weaveletReader
小波中。第二个以相反的方向从weaveletWriter
小波中运行到envelopeReader
包络中。我们在子进程中启动二进制文件。weaveletReader
管道的两端weaveletWriter
将在子进程中作为文件描述符 3 和 4 访问(有关详细信息,请参阅exec 包
)。ENVELOPE_TO_WEAVELET_FD
在子进程中运行的 weavelet 打开存储在和 环境变量中的文件描述符WEAVELET_TO_ENVELOPE_FD
,以建立与信封的连接。
func run(ctx context.Context, binary string) error {
// Step 1. Run the binary and establish the pipes between the envelope and
// the weavelet.
//
// envelope weavelet
// ┌──────────────────┐ ┌───────────────────┐
// │ envelopeWriter --│---│-> weaveletReader │
// │ envelopeReader <-│---│-- weaveletWriter │
// └──────────────────┘ └───────────────────┘
weaveletReader, envelopeWriter, err := os.Pipe()
if err != nil {
return err
}
envelopeReader, weaveletWriter, err := os.Pipe()
if err != nil {
return err
}
// ExtraFiles file descriptors begin at 3 because descriptors 0, 1, and 2
// are reserved for stdin, stdout, and stderr. See
// https://pkg.go.dev/os/exec#Cmd for details.
cmd := exec.Command(binary)
cmd.ExtraFiles = []*os.File{weaveletReader, weaveletWriter}
cmd.Env = []string{"ENVELOPE_TO_WEAVELET_FD=3", "WEAVELET_TO_ENVELOPE_FD=4"}
if err := cmd.Start(); err != nil {
return err
}
// ...
}
其次,信封将EnvelopeMsg
包含 an 的 an发送EnvelopeInfo
到 weavelet。这是小波期望收到的第一条消息。An EnvelopeInfo
为 weavelet 提供一组基本的元数据,包括应用程序名称、部署的唯一 ID、Weavelet 的唯一 ID 等。
func run(ctx context.Context, binary string) error {
// Step 1...
// Step 2. Send an EnvelopeInfo to the weavelet.
info := &protos.EnvelopeMsg{
EnvelopeInfo: &protos.EnvelopeInfo{
App: "app", // the application name
DeploymentId: uuid.New().String(), // the deployment id
Id: uuid.New().String(), // the weavelet id
SingleProcess: false, // is the app a single process?
SingleMachine: true, // is the app on a single machine?
RunMain: true, // should the weavelet run main?
},
}
if err := protomsg.Write(envelopeWriter, info); err != nil {
return err
}
// ...
}
第三,部署者WeaveletMsg
从 weavelet 中读取并打印 a。
func run(ctx context.Context, binary string) error {
// Step 1...
// Step 2...
// Step 3. Receive a WeaveletInfo from the weavelet.
var reply protos.WeaveletMsg
if err := protomsg.Read(envelopeReader, &reply); err != nil {
return err
}
fmt.Println(prototext.Format(&reply))
// ...
}
这WeaveletMsg
包含一个WeaveletInfo
包含有关 weavelet 的信息,特别是它的地址和 PID:
weavelet_info: {
dial_addr: "tcp://127.0.0.1:41123"
pid: 2193420
}
第四,部署者发起一个 RPC 来接收 weavelet 的健康状态。42
它为 RPC选择一个唯一的 ID 。
func run(ctx context.Context, binary string) error {
// Step 1...
// Step 2...
// Step 3...
// Step 4. Send a GetHealth RPC to the weavelet.
req := &protos.EnvelopeMsg{
Id: 42,
GetHealthRequest: &protos.GetHealthRequest{},
}
if err := protomsg.Write(envelopeWriter, req); err != nil {
return err
}
// ...
}
第五,部署者重复WeaveletMsg
从 weavelet 中读取 s 直到它收到一个 id -42
。每个具有正 id 的 RPCx
都会收到一个具有负 id 的回复-x
。请注意,在收到对其 RPC 的回复之前,部署者可能会收到来自 weavelet 的其他消息。一个真正的部署者会处理这些消息,但我们的普通部署者会简单地忽略它们。
func run(ctx context.Context, binary string) error {
// Step 1...
// Step 2...
// Step 3...
// Step 4...
// Step 5. Receive a reply to the GetHealth RPC, ignoring other messages.
for {
var reply protos.WeaveletMsg
if err := protomsg.Read(envelopeReader, &reply); err != nil {
return err
}
if reply.Id == -42 {
fmt.Println(prototext.Format(&reply))
break
}
}
return nil
}
回复看起来像这样:
id: -42
get_health_reply: {
status: HEALTHY
}
这个简单的部署器演示了部署器和 weavelet 如何通过一对管道发送 protobufs 进行通信。有关更多详细信息,请参阅 runtime.proto 。
欢迎关注微信公众号,第一时间收到更新推送。