写给Go开发者的Tars教程-支持超时传递

2024-01-14
3分钟阅读时长

本篇为【写给go开发者的Tars教程】系列第六篇

第一篇:Tars协议基础

第二篇:通信模式

第三篇:拦截器

第四篇:错误处理

第五篇:context/status

第六篇:超时控制

第六篇:续集支持超时传递

本系列将持续更新,欢迎关注👏获取实时通知


接上篇超时控制中提到TarsGo并不支持超时传递,下面我们将对TarsGo进行改造使其支持超时传递能力。

回顾

超时传递

当一个正常的请求会涉及到多个服务的调用时,从源头开始一个服务端不仅为上游服务提供服务,也作为下游的客户端

image-20240108232126522

如上的链路,如果当请求到达某一服务时,对于服务A来说已经超时了,那么就没有必要继续把请求传递下去了。这样可以最大限度的避免后续服务的资源浪费,提高系统的整体性能。

下面将为TarsGo实现了这一特性,需要对TarsGo框架的客户端和服务端分别进行改造,改造完成后我们业务层要做的就是不断的把context.Context传下去,如下代码所示:

// 服务A
func main(){
    ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
 	defer cancel()
    client.ServiceB(ctx)
}
// 服务B
func ServiceB(ctx context.Context){
    client.ServiceC(ctx)
}
// 服务C
func ServiceC(ctx context.Context){
    client.ServiceD(ctx)
}
// 服务D
func ServiceD(ctx context.Context){
    done := make(chan int, 1)
    go func() {
        // 业务逻辑
        done <- 1 // 处理完成
    }()
    select {
    case <-ctx.Done():
       	// 处理超时
    case <- done:
        // 处理完成
    }
}

在每一次的context.Context透传中, timeout都会减去在本进程中耗时,导致这个 timeout 传递到下一个 TarsGo 服务端时变短,当在某一个进程中已经超时,请求不会再继续传递,这样即实现了所谓的 超时传递

客户端实现

原来超时控制实现代码:

ctx := current.ContextWithClientCurrent(context.Background())
current.SetClientTimeout(ctx, 10000)
order, err := client.GetOrderWithContext(ctx, "1")
if err != nil {
    panic(err)
}

期望更符合go语言的超时控制如下:

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
order, err := client.GetOrderWithContext(ctx, "1")
if err != nil {
    panic(err)
}

分析tars2go生成的客户端代码:

// OrderManagement struct
type OrderManagement struct {
	servant m.Servant // servant 是一个接口,具体的实现是tars.ServantProxy
}

// GetOrderWithContext is the proxy function for the method defined in the tars file, with the context
func (obj *OrderManagement) GetOrderWithContext(tarsCtx context.Context, orderId string, opts ...map[string]string) (ret Order, err error) {
	// ......
	tarsResp := new(requestf.ResponsePacket)
    // tars框架底层代码,我们要支持超时传递主要改造就是此方法
	err = obj.servant.TarsInvoke(tarsCtx, 0, "getOrder", buf.ToBytes(), statusMap, contextMap, tarsResp)
	if err != nil {
		return ret, err
	}
	// ......
	return ret, nil
}

主要对tars/servant.go文件进行如下改造,此改造同时兼容了此前的超时控制参数

// TarsInvoke is used for client invoking server.
func (s *ServantProxy) TarsInvoke(ctx context.Context, cType byte,
	sFuncName string,
	buf []byte,
	status map[string]string,
	reqContext map[string]string,
	resp *requestf.ResponsePacket) error {
	defer CheckPanic()

	// 将ctx中的dyeing信息传入到request中

	// 将ctx中的trace信息传入到request中

	req := requestf.RequestPacket{
		// ......
		ITimeout:     int32(s.timeout), // 默认超时时间
		// ......
	}
	msg := &Message{Req: &req, Ser: s, Resp: resp}
	msg.Init()

	// ......

    // 保留兼容:获取此前通过current.SetClientTimeout(ctx, 10000)配置的超时时间
	timeout := time.Duration(s.timeout) * time.Millisecond
	if ok, to, isTimeout := current.GetClientTimeout(ctx); ok && isTimeout {
		timeout = time.Duration(to) * time.Millisecond
		req.ITimeout = int32(to)
	}
	// 重点:超时传递,判断传入的context.Context是否支持超时控制
	if dl, ok := ctx.Deadline(); ok {
		timeout = time.Until(dl) // 计算剩余超时时间
		req.ITimeout = int32(timeout / time.Millisecond) // 替换默认超时时间
	} else {
        // 使用老的超时时间参数生成新的 context.Context
		var cancel context.CancelFunc
		ctx, cancel = context.WithTimeout(ctx, timeout)
		defer cancel()
	}
	// ......
	return err
}

func (s *ServantProxy) doInvoke(ctx context.Context, msg *Message, timeout time.Duration) error {
	// ... 
	select {
	case <-ctx.Done(): // 原来此处是:rtimer.After(timeout),替换为 context.Context
		// 请求超时
	case msg.Resp = <-readCh:
		// ... 
	}
	return nil
}

以上就是客户端的改造,改造的代码很早就可以支持使用go推荐的使用context.Context的超时控制了。经过这个改造支持,我们原来使用过滤器控制修改超时的方法也需要做调整。

过滤器控制超时

// 此前的实现
func orderClientFilter(next tars.ClientFilter) tars.ClientFilter {
	return func(ctx context.Context, msg *tars.Message, invoke tars.Invoke, timeout time.Duration) (err error) {
        timeout = 100*time.Second
		// Invoking the remote method
		err = next(ctx, msg, invoke, timeout)
		return err
	}
}

// 改造之后
func orderClientFilter(next tars.ClientFilter) tars.ClientFilter {
	return func(ctx context.Context, msg *tars.Message, invoke tars.Invoke, timeout time.Duration) (err error) {
        ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second)
 defer cancel()
		// Invoking the remote method
		err = next(ctx, msg, invoke, timeout)
		return err
	}
}

PR变动对照

客户端改造

下面开始改造服务器端实现。

服务端实现

此前服务端没有对客户端传过来的超时时间做传递处理,只对handletimeout参数做了出来,且默认此配置值为0,也就是没有超时时间。同时 perf: optimize server-side handling of timeout control PR对handletimeout做了向下传递。接下来我们就让服务端支持客户端携带的超时时间的向下传递。

tars/tarsprotocol.go

// Invoke puts the request as []byte and call the dispatcher, and then return the response as []byte.
func (s *Protocol) Invoke(ctx context.Context, req []byte) (rsp []byte) {
	defer CheckPanic()
	reqPackage := requestf.RequestPacket{}
	rspPackage := requestf.ResponsePacket{}
	// ......
    // 获取数据包接收到的时间
	recvPkgTs, ok := current.GetRecvPkgTsFromContext(ctx)
	if !ok {
		recvPkgTs = time.Now().UnixNano() / 1e6
	}

	// timeout delivery
	now := time.Now().UnixNano() / 1e6
	if reqPackage.ITimeout > 0 {
		sub := now - recvPkgTs // 计算协程调度延迟时间
		timeout := int64(reqPackage.ITimeout) - sub // 超时时间减去协程调度延迟时间
		var cancel context.CancelFunc
        // 使用超时时间构造带超时时间的 context.Context
		ctx, cancel = context.WithTimeout(ctx, time.Duration(timeout)*time.Millisecond)
		defer cancel()
	}
	// ......
	select {
	case <-ctx.Done():
		rspPackage.IRet = basef.TARSSERVERQUEUETIMEOUT
		rspPackage.SResultDesc = "server invoke timeout"
		ip, _ := current.GetClientIPFromContext(ctx)
		port, _ := current.GetClientPortFromContext(ctx)
		TLOG.Errorf("handle queue timeout, obj:%s, func:%s, recv time:%d, now:%d, timeout:%d, cost:%d,  addr:(%s:%s), reqId:%d, err: %v",
			reqPackage.SServantName, reqPackage.SFuncName, recvPkgTs, now, reqPackage.ITimeout, now-recvPkgTs, ip, port, reqPackage.IRequestId, ctx.Err())
	default:
        // not tars_ping, normal business call branch
		if reqPackage.SFuncName != "tars_ping" { 
			// 调用服务端代码,进行业务逻辑处理 .....
		}
	}

	return s.rsp2Byte(&rspPackage)
}

PR变动对照

服务端

减小数据包接收时间失真

把携带数据包接收时间的context.Context创建提取到协程外,主要涉及tars/transport/tcphandler.gotars/transport/udphandler.go文件,如下图所示:

减小数据包接收时间失真

总结

经过上面对客户端和服务端的相关调整,TarsGo现已经完成支持超时传递能力,欢迎升级试用。完整PR:Implementing Inter-Service Timeout Propagation using context.Context

参考资料

gRPC 系列——grpc超时传递原理: https://xiaomi-info.github.io/2019/12/30/grpc-deadline/


👇 欢迎关注👇

关注公众号获得更多精彩文章

公众号:程序员大兵