写给Go开发者的Tars教程-支持超时传递
本篇为【写给go开发者的Tars教程】系列第六篇
第一篇:Tars协议基础
第二篇:通信模式
第三篇:拦截器
第四篇:错误处理
第五篇:context/status
第六篇:超时控制
第六篇:续集支持超时传递
本系列将持续更新,欢迎关注👏获取实时通知
接上篇超时控制中提到
TarsGo
并不支持超时传递,下面我们将对TarsGo
进行改造使其支持超时传递能力。
回顾
超时传递
当一个正常的请求会涉及到多个服务的调用时,从源头开始一个服务端不仅为上游服务提供服务,也作为下游的客户端
如上的链路,如果当请求到达某一服务时,对于服务A来说已经超时了,那么就没有必要继续把请求传递下去了。这样可以最大限度的避免后续服务的资源浪费,提高系统的整体性能。
下面将为TarsGo
实现了这一特性,需要对TarsGo
框架的客户端和服务端分别进行改造,改造完成后我们业务层要做的就是不断的把context.Context
传下去,如下代码所示:
// 服务A
func main(){
ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
client.ServiceB(ctx)
}
// 服务B
func ServiceB(ctx context.Context){
client.ServiceC(ctx)
}
// 服务C
func ServiceC(ctx context.Context){
client.ServiceD(ctx)
}
// 服务D
func ServiceD(ctx context.Context){
done := make(chan int, 1)
go func() {
// 业务逻辑
done <- 1 // 处理完成
}()
select {
case <-ctx.Done():
// 处理超时
case <- done:
// 处理完成
}
}
在每一次的context.Context
透传中, timeout都会减去在本进程中耗时,导致这个 timeout 传递到下一个 TarsGo 服务端时变短,当在某一个进程中已经超时,请求不会再继续传递,这样即实现了所谓的 超时传递
客户端实现
原来超时控制实现代码:
ctx := current.ContextWithClientCurrent(context.Background())
current.SetClientTimeout(ctx, 10000)
order, err := client.GetOrderWithContext(ctx, "1")
if err != nil {
panic(err)
}
期望更符合go
语言的超时控制如下:
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
order, err := client.GetOrderWithContext(ctx, "1")
if err != nil {
panic(err)
}
分析tars2go
生成的客户端代码:
// OrderManagement struct
type OrderManagement struct {
servant m.Servant // servant 是一个接口,具体的实现是tars.ServantProxy
}
// GetOrderWithContext is the proxy function for the method defined in the tars file, with the context
func (obj *OrderManagement) GetOrderWithContext(tarsCtx context.Context, orderId string, opts ...map[string]string) (ret Order, err error) {
// ......
tarsResp := new(requestf.ResponsePacket)
// tars框架底层代码,我们要支持超时传递主要改造就是此方法
err = obj.servant.TarsInvoke(tarsCtx, 0, "getOrder", buf.ToBytes(), statusMap, contextMap, tarsResp)
if err != nil {
return ret, err
}
// ......
return ret, nil
}
主要对tars/servant.go
文件进行如下改造,此改造同时兼容了此前的超时控制参数
// TarsInvoke is used for client invoking server.
func (s *ServantProxy) TarsInvoke(ctx context.Context, cType byte,
sFuncName string,
buf []byte,
status map[string]string,
reqContext map[string]string,
resp *requestf.ResponsePacket) error {
defer CheckPanic()
// 将ctx中的dyeing信息传入到request中
// 将ctx中的trace信息传入到request中
req := requestf.RequestPacket{
// ......
ITimeout: int32(s.timeout), // 默认超时时间
// ......
}
msg := &Message{Req: &req, Ser: s, Resp: resp}
msg.Init()
// ......
// 保留兼容:获取此前通过current.SetClientTimeout(ctx, 10000)配置的超时时间
timeout := time.Duration(s.timeout) * time.Millisecond
if ok, to, isTimeout := current.GetClientTimeout(ctx); ok && isTimeout {
timeout = time.Duration(to) * time.Millisecond
req.ITimeout = int32(to)
}
// 重点:超时传递,判断传入的context.Context是否支持超时控制
if dl, ok := ctx.Deadline(); ok {
timeout = time.Until(dl) // 计算剩余超时时间
req.ITimeout = int32(timeout / time.Millisecond) // 替换默认超时时间
} else {
// 使用老的超时时间参数生成新的 context.Context
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
// ......
return err
}
func (s *ServantProxy) doInvoke(ctx context.Context, msg *Message, timeout time.Duration) error {
// ...
select {
case <-ctx.Done(): // 原来此处是:rtimer.After(timeout),替换为 context.Context
// 请求超时
case msg.Resp = <-readCh:
// ...
}
return nil
}
以上就是客户端的改造,改造的代码很早就可以支持使用go
推荐的使用context.Context
的超时控制了。经过这个改造支持,我们原来使用过滤器控制修改超时的方法也需要做调整。
过滤器控制超时
// 此前的实现
func orderClientFilter(next tars.ClientFilter) tars.ClientFilter {
return func(ctx context.Context, msg *tars.Message, invoke tars.Invoke, timeout time.Duration) (err error) {
timeout = 100*time.Second
// Invoking the remote method
err = next(ctx, msg, invoke, timeout)
return err
}
}
// 改造之后
func orderClientFilter(next tars.ClientFilter) tars.ClientFilter {
return func(ctx context.Context, msg *tars.Message, invoke tars.Invoke, timeout time.Duration) (err error) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second)
defer cancel()
// Invoking the remote method
err = next(ctx, msg, invoke, timeout)
return err
}
}
PR变动对照
下面开始改造服务器端实现。
服务端实现
此前服务端没有对客户端传过来的超时时间做传递处理,只对handletimeout
参数做了出来,且默认此配置值为0,也就是没有超时时间。同时 perf: optimize server-side handling of timeout control
PR对handletimeout
做了向下传递。接下来我们就让服务端支持客户端携带的超时时间的向下传递。
tars/tarsprotocol.go
// Invoke puts the request as []byte and call the dispatcher, and then return the response as []byte.
func (s *Protocol) Invoke(ctx context.Context, req []byte) (rsp []byte) {
defer CheckPanic()
reqPackage := requestf.RequestPacket{}
rspPackage := requestf.ResponsePacket{}
// ......
// 获取数据包接收到的时间
recvPkgTs, ok := current.GetRecvPkgTsFromContext(ctx)
if !ok {
recvPkgTs = time.Now().UnixNano() / 1e6
}
// timeout delivery
now := time.Now().UnixNano() / 1e6
if reqPackage.ITimeout > 0 {
sub := now - recvPkgTs // 计算协程调度延迟时间
timeout := int64(reqPackage.ITimeout) - sub // 超时时间减去协程调度延迟时间
var cancel context.CancelFunc
// 使用超时时间构造带超时时间的 context.Context
ctx, cancel = context.WithTimeout(ctx, time.Duration(timeout)*time.Millisecond)
defer cancel()
}
// ......
select {
case <-ctx.Done():
rspPackage.IRet = basef.TARSSERVERQUEUETIMEOUT
rspPackage.SResultDesc = "server invoke timeout"
ip, _ := current.GetClientIPFromContext(ctx)
port, _ := current.GetClientPortFromContext(ctx)
TLOG.Errorf("handle queue timeout, obj:%s, func:%s, recv time:%d, now:%d, timeout:%d, cost:%d, addr:(%s:%s), reqId:%d, err: %v",
reqPackage.SServantName, reqPackage.SFuncName, recvPkgTs, now, reqPackage.ITimeout, now-recvPkgTs, ip, port, reqPackage.IRequestId, ctx.Err())
default:
// not tars_ping, normal business call branch
if reqPackage.SFuncName != "tars_ping" {
// 调用服务端代码,进行业务逻辑处理 .....
}
}
return s.rsp2Byte(&rspPackage)
}
PR变动对照
减小数据包接收时间失真
把携带数据包接收时间的context.Context
创建提取到协程外,主要涉及tars/transport/tcphandler.go
和tars/transport/udphandler.go
文件,如下图所示:
总结
经过上面对客户端和服务端的相关调整,TarsGo
现已经完成支持超时传递能力,欢迎升级试用。完整PR:Implementing Inter-Service Timeout Propagation using context.Context
参考资料
gRPC 系列——grpc超时传递原理: https://xiaomi-info.github.io/2019/12/30/grpc-deadline/
👇 欢迎关注👇