如何实现 Service Weaver 部署器

2022-02-15
7分钟阅读时长

声明

作者能力有限,如感觉有翻译不准确的请移步原文。

原文地址:https://serviceweaver.dev/blog/deployers.html

Service Weaver 允许您以多种不同的方式部署应用程序。例如,您可以在单个进程跨多个进程 中部署应用程序。毫不奇怪,部署 Service Weaver 应用程序的代码称为deployer。这篇博文解释了部署器是什么以及如何实施部署器。我们假设您熟悉如何编写 Service Weaver 应用程序。如果不是,我们建议您阅读分步教程

概述

Service Weaver 应用程序由许多组件 组成。该应用程序被编译成单个应用程序二进制文件。部署者通过多次运行二进制文件来部署应用程序,通常是跨多台机器。二进制文件的每个实例都运行组件的一个子集。为了了解要运行哪些组件,二进制链接在一个称为 weavelet 的小型后台代理中,部署者使用envelope与之通信。如下图所示。

部署器的架构图。 具有组件 A、B 和 C 的应用程序被编译成二进制文件,由部署者跨三个 weavelet 部署。

在这篇博文中,我们提供了对weaveletsdeployersenvelopes的高级概述。然后,我们通过完全从头开始实施多进程部署程序,深入了解它们的工作原理。

Weavelets

要了解部署器,我们必须首先了解小波。Service Weaver 应用程序被编译成单个可执行二进制文件。链接到二进制文件中的 Service Weaver 库包括一个称为weavelet的小代理,它是在您调用 weaver.Init 时创建的。Weavelet 的主要职责是启动和管理一组组件。

部署 Service Weaver 应用程序时,不只有一个 weavelet。如果有只有一个,Service Weaver 应用程序就不会非常分散。相反,部署人员会多次运行您的二进制文件——在不同机器上的不同进程中以启动多个 weavelet,这些 weavelet 协同工作以执行您的分布式应用程序。

每个 weavelet 都承载一组可能不同的组件。因为组件是复制的,所以一个组件可能由多个weavelets托管。例如,考虑一个包含组件AB和的应用程序C。下图显示了一个由三个weavelets组成的示例部署。weavelet 1 托管组件AB;weavelet 2 承载组件BC,weavelet 3 承载组件C

三个 weavelet 托管组件 A、B 和 C 的不同子集。每个 weavelet 都有自己的网络地址。

您还会注意到每个小波都有一个唯一的网络地址。Weavelets 使用这些地址来执行远程方法调用。例如,假设图中 weavelet 1上的A组件想要调用组件C 上的方法。weavelet 1 将联系地址 2.2.2.2 上的 weavelet 2 或地址 3.3.3.3 上的 weavelet 3 以执行该方法。

部署者

部署者通过启动和管理一组 weavelet 来分发 Service Weaver 应用程序。管理 weavelet 涉及与 (1) 组件、(2) 侦听器和 (3) 遥测相关的三个主要职责。

  1. 组件。部署者启动 weavelets 并告诉他们要托管哪些组件。部署者还确保 weavelets 知道其他 weavelets 的地址。例如,如果部署者启动了一个新的 weavelet,则部署者会通知所有其他 weavelet 新 weavelet 的存在,包括其地址和它托管的组件。相反,如果一个部署者检测到一个 weavelet 已经失败,部署者会通知所有其他 weavelet 它的失败。
  2. 侦听器。当组件想要为外部流量提供服务时,它会请求网络侦听器。部署者为侦听器选择一个地址并确保侦听器可公开访问。多个 weavelet 可能共享同一个侦听器,部署者必须确保它们之间的流量平衡。这通常涉及运行或配置代理。
  3. 遥测。部署者收集、聚合和导出由 weavelet 生成的所有遥测数据。这包括日志、指标、跟踪和配置文件。

部署者和 weavelet 通过一对 Unix 管道交换协议缓冲区进行通信。我们将与 weavelet 通信的部署器部分称为envelope。在本文的附录中,我们描述了 envelope-weavelet 协议的底层细节,但对于大多数部署者来说,使用 ServiceWeaver 的Envelope API 应该就足够了。

envelope和weavelet之间的通信要么是weavelet发起的,要么是envelope发起的。Weavelet 发起的通信显示为EnvelopeHandler 对部署者实现提供的接口的方法调用。

type EnvelopeHandler interface {
    // Components.
    ActivateComponent(context.Context, *protos.ActivateComponentRequest) (*protos.ActivateComponentReply, error)

    // Listeners.
    GetListenerAddress(context.Context, *protos.GetListenerAddressRequest) (*protos.GetListenerAddressReply, error)
    ExportListener(context.Context, *protos.ExportListenerRequest) (*protos.ExportListenerReply, error)

    // Telemetry.
    HandleLogEntry(context.Context, *protos.LogEntry) error
    HandleTraceSpans(context.Context, []trace.ReadOnlySpan) error
}

envelope发起的通信是通过调用 上的方法来执行的 Envelope

// Components.
func (e *Envelope) UpdateRoutingInfo(routing *protos.RoutingInfo) error
func (e *Envelope) UpdateComponents(components []string) error

// Telemetry.
func (e *Envelope) GetHealth() protos.HealthStatus
func (e *Envelope) GetMetrics() ([]*metrics.MetricSnapshot, error)
func (e *Envelope) GetLoad() (*protos.LoadReport, error)
func (e *Envelope) GetProfile(req *protos.GetProfileRequest) ([]byte, error)

一个简单的多进程部署器

在本节中,我们将实现一个功能完备的多进程部署器 。我们会将我们的部署程序编译成一个名为 deploy. 然后我们将能够通过运行来部署 Service Weaver 二进制文件 ./deploy <Service Weaver binary>。为简单起见,我们的部署人员不会共同定位或复制任何组件。每个组件都将在一个单独的 weavelet 中自行运行。我们首先为部署程序和 weavelet 声明类型。

package main

import ...

// deployer is a simple multiprocess deployer that doesn't implement
// co-location or replication. That is, every component is run in its own OS
// process, and there is only one replica of every component.
type deployer struct {
    mu       sync.Mutex          // guards handlers
    handlers map[string]*handler // handlers, by component
}

// A handler handles messages from a weavelet. It implements the
// EnvelopeHandler interface.
type handler struct {
    deployer *deployer          // underlying deployer
    envelope *envelope.Envelope // envelope to the weavelet
    address  string             // weavelet's address
}

// Check that handler implements the envelope.EnvelopeHandler interface.
var _ envelope.EnvelopeHandler = &handler{}

接下来,我们实现一个spawn方法来生成一个 weavelet 来托管一个组件。

  1. 为了生成 weaveletEnvelope与它通信,我们调用该envelope.NewEnvelope 函数。此函数接受 EnvelopeInfo 传递给 weavelet 的 和 AppConfig 描述应用程序的 。在子进程中NewEnvelope运行提供的 Service Weaver 二进制文件(在本例中)。flag.Arg(0)然后它返回一个Envelope通过一对 Unix 管道与 weavelet 通信的 an。
  2. 我们调用该UpdateComponents方法来告诉 weavelet 运行哪个组件。部署者应该在组件集发生变化时调用时调用UpdateComponents通知weavelet
  3. 我们调用envelope.Serve来处理来自 weavelet 的请求。
// spawn spawns a weavelet to host the provided component (if one hasn't
// already spawned) and returns a handler to the weavelet.
func (d *deployer) spawn(component string) (*handler, error) {
    d.mu.Lock()
    defer d.mu.Unlock()

    // Check if a weavelet has already been spawned.
    h, ok := d.handlers[component]
    if ok {
        // The weavelet has already been spawned.
        return h, nil
    }

    // Spawn a weavelet in a subprocess to host the component.
    info := &protos.EnvelopeInfo{
        App:           "app",               // the application name
        DeploymentId:  deploymentId,        // the deployment id
        Id:            uuid.New().String(), // the weavelet id
        SingleProcess: false,               // is the app a single process?
        SingleMachine: true,                // is the app on a single machine?
        RunMain:       component == "main", // should the weavelet run main?
    }
    config := &protos.AppConfig{
        Name:   "app",       // the application name
        Binary: flag.Arg(0), // the application binary
    }
    envelope, err := envelope.NewEnvelope(context.Background(), info, config)
    if err != nil {
        return nil, err
    }
    h = &handler{
        deployer: d,
        envelope: envelope,
        address:  envelope.WeaveletInfo().DialAddr,
    }

    go func() {
        // Inform the weavelet of the component it should host.
        envelope.UpdateComponents([]string{component})

        // Handle messages from the weavelet.
        envelope.Serve(h)
    }()

    // Return the handler.
    d.handlers[component] = h
    return h, nil
}

现在,我们实现这些EnvelopeHandler方法,这些方法处理 weavelet 发起的与部署者的通信。

对于每种EnvelopeHandler方法,我们还总结了该方法的实现方式weaver multi 和实现方式,以便您更好地了解更高级的部署人员如何实现这些方法。 weaver gke

组件部分

首先,我们实施ActivateComponent. 当由 weavelet 承载的组件调用weaver.Get[T] 获取某个组件时T,weavelet 调用 ActivateComponent方法激活TActivateComponent应该启动组件——可能有多个副本——如果它还没有启动的话。

我们的处理程序调用deployer.spawn生成一个新的 weavelet 来托管组件。处理程序然后调用UpdateRoutingInfo以将新产生的weavelet地址通知所有的weavelet。这允许请求 weavelet 上的组件与新生成的 weavelet 上的组件执行 RPC。

以上图为例,如果组件A在weavelet 1 上的组件激活了 组件 C,那么部署者会生成 weavelet 2 和 3(如果它们尚未生成),然后告诉 weavelet 1 服务 weavelet 2 和 3 的地址。

每当 weavelet 调用 ActivateComponent 的组件的路由信息发生变化时,部署者就应该调用 UpdateRoutingInfo。例如,如果部署者检测到一个 weavelet 托管组件A已经崩溃,它应该在所有已调用 A 上的 ActivateComponent 的weavelet组件上调用 UpdateRoutingInfo,并使用新的路由信息省略失败的weavelet组件的地址。

// Responsibility 1: Components.
func (h *handler) ActivateComponent(_ context.Context, req *protos.ActivateComponentRequest) (*protos.ActivateComponentReply, error) {
    // Spawn a weavelet to host the component, if one hasn't already been
    // spawned.
    spawned, err := h.deployer.spawn(req.Component)
    if err != nil {
        return nil, err
    }

    // Tell the weavelet the address of the requested component.
    h.envelope.UpdateRoutingInfo(&protos.RoutingInfo{
        Component: req.Component,
        Replicas:  []string{spawned.address},
    })

    return &protos.ActivateComponentReply{}, nil
}

weaver multi ,就像我们的部署者一样,在子流程中生成weavelet。在Kubernetes 部署 weaver gke 中生成小波。

侦听器

接下来,我们实现侦听器方法。当组件请求网络侦听器时,GetListenerAddress将调用该方法。此方法返回组件应侦听的地址。我们简单的部署器总是返回 "localhost:0"

在 weavelet 从 接收到地址后GetListenerAddress,它会在该地址上创建一个网络侦听器,并ExportListener使用它正在侦听的具体地址调用该方法。例如,在 weavelet "localhost:0"从我们的GetListenerAddress实现中接收后,它会监听 "localhost:0". 这会产生一个可拨号地址,例如"127.0.0.1:35879",然后 weavelet 将其报告给处理ExportListener程序。我们简单的部署器只是打印出这个地址供用户直接联系。

// Responsibility 2: Listeners.
func (h *handler) GetListenerAddress(_ context.Context, req *protos.GetListenerAddressRequest) (*protos.GetListenerAddressReply, error) {
    return &protos.GetListenerAddressReply{Address: "localhost:0"}, nil
}

func (h *handler) ExportListener(_ context.Context, req *protos.ExportListenerRequest) (*protos.ExportListenerReply, error) {
    // This simplified deployer does not proxy network traffic. Listeners
    // should be contacted directly.
    fmt.Printf("Weavelet listening on %s\n", req.Address)
    return &protos.ExportListenerReply{}, nil
}

weaver multi GetListenerAddress总是返回的实施"localhost:0"。它在传递给Listener的 ListenerOptions 字段ExportListener中指定的地址上运行本地 HTTP 代理。此代理平衡报告给 的侦听器地址之间的流量。通过在 Google Cloud 中配置负载平衡器来实现侦听器。 LocalAddress``ExportListenerweaver gke

遥测

最后,我们实现遥测方法。该方法接收由小波产生的所有日志HandleLogEntry。我们的部署人员使用 Service Weaver 库中的漂亮打印机logging将日志打印到标准输出。类似地,小波产生的所有踪迹都被函数接收HandleTraceSpans。为简单起见,我们的部署者忽略了痕迹。

// Responsibility 3: Telemetry.
func (h *handler) HandleLogEntry(_ context.Context, entry *protos.LogEntry) error {
    pp := logging.NewPrettyPrinter(colors.Enabled())
    fmt.Println(pp.Format(entry))
    return nil
}

func (h *handler) HandleTraceSpans(context.Context, []trace.ReadOnlySpan) error {
    // This simplified deployer drops traces on the floor.
    return nil
}

weaver multi 将日志和跟踪写入文件。weaver gke 将日志和跟踪导出到Cloud LoggingCloud Trace

主要的

main最后,我们为部署者实现一个功能。我们创建一个 deployer,生成主要组件,然后阻止。

func main() {
    flag.Parse()
    d := &deployer{handlers: map[string]*handler{}}
    d.spawn("main")
    select {} // block forever
}

如果我们编译我们的部署程序,我们可以将一个 Service Weaver 二进制文件传递给它进行部署。

$ go build -o deploy main.go       # compile the deployer
$ ./deploy <Service Weaver binary> # deploy an application

高级部署程序功能

上一节中的多进程部署器被设计得尽可能简单。另一方面,现实世界的部署人员需要许多更高级的功能。列举和解释如何实现这些功能超出了本博文的范围,但我们将在此处总结一些高级功能。weaver multi 您还可以查看我们和部署者的实现weaver gke 作为参考。

  • 长寿和坚持。上一节中的多进程部署器只在它部署的应用程序存在时存在。真实世界的部署者应该是长期运行和容错的服务。weaver gke 例如,部署者在 Kubernetes 集群中运行一个长时间运行的控制器作业,该作业将其状态持久保存到一个高度一致的数据存储中。它还在部署应用程序的每个集群中运行一个长时间运行的从属作业。当您运行weaver gke deploy部署应用程序时,该应用程序将发送到控制器,控制器又将其分发给下属。
  • 故障检测。部署者应该检测一个 weavelet 何时失败并相应地通知其他 weavelet。您可以使用该 Envelope.GetHealth 方法检查小波的健康状况。多机部署者将不得不实施自己的健康检查以检测机器故障。
  • 路由。部署者应该通过监视路由组件上的负载并生成平衡此负载的路由分配来支持 路由组件。 您可以使用该 方法获取小波的负载。Envelope.GetLoad
  • 推出。部署者应该实施版本化推出 ,允许推出应用程序的一个版本以替代先前运行的版本。
  • 工装。部署者应该提供工具来检查和调试应用程序的状态。weaver multi statusweaver multi dashboardweaver multi logs,例如,可用于检查使用 部署的应用程序weaver multi deploy

附录:信封-小波协议

在本附录中,我们描述了信封和小波之间的低级通信协议。大多数部署程序应该使用前面描述的高级 Envelope API,但是如果您想完全从头开始实施部署程序(比如使用另一种编程语言),了解低级细节很重要。

信封和小波通过一对 Unix 管道相互通信。一根管道从包络线延伸到小波,另一根从小波向相反方向延伸到包络线。信封和 weavelet 通过这些管道交换EnvelopeMsgWeaveletMsg 协议缓冲区。

当信封和小波首先建立它们的连接时,它们会执行一次 握手。信封发送一个EnvelopeInfo ,小波响应一个WeaveletInfo 。握手后,信封和小波自由通信。存在三种通信模式。

  1. 信封可以启动针对小波的 RPC。
  2. Weavelet 可以针对信封启动 RPC。
  3. Weavelet 可以将未确认的 RPC 发送到信封。

这三种通信形式可以任意交错。例如,在信封通过管道发送 RPC 请求后,它可能会在收到 RPC 回复之前通过管道接收到许多不相关的消息。

为了使这种交互具体化,我们将实施世界上最简单的部署程序 。这个简单的部署程序启动一个 Service Weaver 二进制文件,通过管道与 weavelet 交换一些消息,然后终止。main我们从一个接收 Service Weaver 二进制文件作为其第一个也是唯一一个参数并将其传递给函数的函数开始run

package main

import ...

func main() {
    flag.Parse()
    if err := run(context.Background(), flag.Arg(0)); err != nil {
        fmt.Fprintln(os.Stderr, err)
        os.Exit(1)
    }
}

为了实现这个run功能,我们首先创建一对管道。第一条管道从envelopeWriter包络中延伸到weaveletReader小波中。第二个以相反的方向从weaveletWriter小波中运行到envelopeReader包络中。我们在子进程中启动二进制文件。weaveletReader管道的两端weaveletWriter将在子进程中作为文件描述符 3 和 4 访问(有关详细信息,请参阅exec 包 )。ENVELOPE_TO_WEAVELET_FD在子进程中运行的 weavelet 打开存储在和 环境变量中的文件描述符WEAVELET_TO_ENVELOPE_FD,以建立与信封的连接。

func run(ctx context.Context, binary string) error {
    // Step 1. Run the binary and establish the pipes between the envelope and
    // the weavelet.
    //
    //              envelope               weavelet
    //             ┌──────────────────┐   ┌───────────────────┐
    //             │ envelopeWriter --│---│-> weaveletReader  │
    //             │ envelopeReader <-│---│-- weaveletWriter  │
    //             └──────────────────┘   └───────────────────┘
    weaveletReader, envelopeWriter, err := os.Pipe()
    if err != nil {
        return err
    }
    envelopeReader, weaveletWriter, err := os.Pipe()
    if err != nil {
        return err
    }

    // ExtraFiles file descriptors begin at 3 because descriptors 0, 1, and 2
    // are reserved for stdin, stdout, and stderr. See
    // https://pkg.go.dev/os/exec#Cmd for details.
    cmd := exec.Command(binary)
    cmd.ExtraFiles = []*os.File{weaveletReader, weaveletWriter}
    cmd.Env = []string{"ENVELOPE_TO_WEAVELET_FD=3", "WEAVELET_TO_ENVELOPE_FD=4"}
    if err := cmd.Start(); err != nil {
        return err
    }

    // ...
}

其次,信封将EnvelopeMsg包含 an 的 an发送EnvelopeInfo到 weavelet。这是小波期望收到的第一条消息。An EnvelopeInfo为 weavelet 提供一组基本的元数据,包括应用程序名称、部署的唯一 ID、Weavelet 的唯一 ID 等。

func run(ctx context.Context, binary string) error {
    // Step 1...

    // Step 2. Send an EnvelopeInfo to the weavelet.
    info := &protos.EnvelopeMsg{
        EnvelopeInfo: &protos.EnvelopeInfo{
            App:           "app",               // the application name
            DeploymentId:  uuid.New().String(), // the deployment id
            Id:            uuid.New().String(), // the weavelet id
            SingleProcess: false,               // is the app a single process?
            SingleMachine: true,                // is the app on a single machine?
            RunMain:       true,                // should the weavelet run main?
        },
    }
    if err := protomsg.Write(envelopeWriter, info); err != nil {
        return err
    }

    // ...
}

第三,部署者WeaveletMsg从 weavelet 中读取并打印 a。

func run(ctx context.Context, binary string) error {
    // Step 1...
    // Step 2...

    // Step 3. Receive a WeaveletInfo from the weavelet.
    var reply protos.WeaveletMsg
    if err := protomsg.Read(envelopeReader, &reply); err != nil {
        return err
    }
    fmt.Println(prototext.Format(&reply))

    // ...
}

WeaveletMsg包含一个WeaveletInfo包含有关 weavelet 的信息,特别是它的地址和 PID:

weavelet_info: {
    dial_addr: "tcp://127.0.0.1:41123"
    pid: 2193420
}

第四,部署者发起一个 RPC 来接收 weavelet 的健康状态。42它为 RPC选择一个唯一的 ID 。

func run(ctx context.Context, binary string) error {
    // Step 1...
    // Step 2...
    // Step 3...

    // Step 4. Send a GetHealth RPC to the weavelet.
    req := &protos.EnvelopeMsg{
        Id:               42,
        GetHealthRequest: &protos.GetHealthRequest{},
    }
    if err := protomsg.Write(envelopeWriter, req); err != nil {
        return err
    }

    // ...
}

第五,部署者重复WeaveletMsg从 weavelet 中读取 s 直到它收到一个 id -42。每个具有正 id 的 RPCx都会收到一个具有负 id 的回复-x。请注意,在收到对其 RPC 的回复之前,部署者可能会收到来自 weavelet 的其他消息。一个真正的部署者会处理这些消息,但我们的普通部署者会简单地忽略它们。

func run(ctx context.Context, binary string) error {
    // Step 1...
    // Step 2...
    // Step 3...
    // Step 4...

    // Step 5. Receive a reply to the GetHealth RPC, ignoring other messages.
    for {
        var reply protos.WeaveletMsg
        if err := protomsg.Read(envelopeReader, &reply); err != nil {
            return err
        }
        if reply.Id == -42 {
            fmt.Println(prototext.Format(&reply))
            break
        }
    }
    return nil
}

回复看起来像这样:

id: -42
get_health_reply: {
    status: HEALTHY
}

这个简单的部署器演示了部署器和 weavelet 如何通过一对管道发送 protobufs 进行通信。有关更多详细信息,请参阅 runtime.proto

欢迎关注微信公众号,第一时间收到更新推送。

qrcode_for_gh_4ab47aa64a20_258