thrift客户端

(1) 设计

TClient ,
TStandardClient、WrappedTClient 实现了TClient接口的Call方法

(2) demo

https://github.com/weikeqin/thrift-tutorial-go-demo

以client调用add方法为例

调用下游Add()方法代码

func main() {
	// 创建thrift client
	thriftClient := getThriftClient()
	// 创建 calculatorClientProxy   其实是一个代理类(静态代理) 代理了idl定义的所有方法
	// tutorial是由idl生成的
	calculatorClientProxy := tutorial.NewCalculatorClient(thriftClient)
	// 调用Add方法
	sum, _ := calculatorClientProxy.Add(defaultCtx, 1, 2)
	fmt.Print("1+2=", sum, "\n")
}

(2.1) Add()方法定义

// 

(2.2) idl里代理方法

CalculatorClient::Add 实现了 idl里定义的 Add方法, CalculatorClient 可以看做Calculator接口的代理

详细的根据gen命令生成的go代码如下

// gen-go/tutorial/tutorial.go 
package tutorial


// Parameters:
//  - Num1
//  - Num2
func (p *CalculatorClient) Add(ctx context.Context, num1 int32, num2 int32) (_r int32, _err error) {
  // 业务方法入参包装  `CalculatorAddArgs`实现了`TStruct`接口
  var _args3 CalculatorAddArgs
  _args3.Num1 = num1
  _args3.Num2 = num2
  // 业务返回结果   
  // call的第4个入参类型是`TStruct`,`CalculatorAddResult`实现了`TStruct`接口,可以直接传递
  var _result5 CalculatorAddResult
  // 通用返回结果里的headers
  var _meta4 thrift.ResponseMeta

  // 调用 TStandardClient::Call 方法
  _meta4, _err = p.Client_().Call(ctx, "add", &_args3, &_result5)
  p.SetLastResponseMeta_(_meta4)
  if _err != nil {
    return
  }
  // 返回业务处理结果
  return _result5.GetSuccess(), nil
}

(3) 源码解读-go

TClient

(3.1) TClient

TClient定义了客户端

package thrift
// thrift.client.go 

type TClient interface {
	Call(ctx context.Context, method string, args, result TStruct) (ResponseMeta, error)
}

(3.2) TStandardClient

TStandardClient有3个方法 Send Recv Call

package thrift
// thrift.client.go 

// 
type TStandardClient struct {
	seqId        int32      // 序列Id
	iprot, oprot TProtocol  // 输入传输协议 输出传输协议
}

(3.2.1) 创建NewTStandardClient方法 NewTStandardClient

// TStandardClient 实现了 TClient 接口,使用 Thrift 的标准消息格式。
// 并发使用是不安全的
// 
// TStandardClient implements TClient, and uses the standard message format for Thrift.
// It is not safe for concurrent use.
func NewTStandardClient(inputProtocol, outputProtocol TProtocol) *TStandardClient {
	return &TStandardClient{
		iprot: inputProtocol,
		oprot: outputProtocol,
	}
}

(3.2.2) Call

// param  method  方法名
// param  args    业务入参
// param  result  返回(业务)结果
func (p *TStandardClient) Call(ctx context.Context, method string, args, result TStruct) (ResponseMeta, error) {
    // 序列Id+1  注意:这儿不是并发安全的 
    p.seqId++
	seqId := p.seqId

    // 调用Send方法
	if err := p.Send(ctx, p.oprot, seqId, method, args); err != nil {
		return ResponseMeta{}, err
	}

	// method is oneway
	if result == nil {
		return ResponseMeta{}, nil
	}

    // 调用Recv方法,把返回结果写入result
    err := p.Recv(ctx, p.iprot, seqId, method, result)
    // 处理返回headers 
	var headers THeaderMap
	if hp, ok := p.iprot.(*THeaderProtocol); ok {
		headers = hp.transport.readHeaders
	}
	return ResponseMeta{
		Headers: headers,
	}, err
}

从以上的代码可以看到
1、TStandardClient并发使用是不安全的,所以使用的时候得每次新建一个
2、TStandardClient是同步调用

(3.2.3) Send

// client给Server发请求的方法 
func (p *TStandardClient) Send(ctx context.Context, oprot TProtocol, seqId int32, method string, args TStruct) error {

	// 设置headers
	// Set headers from context object on THeaderProtocol
	if headerProt, ok := oprot.(*THeaderProtocol); ok {
		headerProt.ClearWriteHeaders()
		for _, key := range GetWriteHeaderList(ctx) {
			if value, ok := GetHeader(ctx, key); ok {
				headerProt.SetWriteHeader(key, value)
			}
		}
	}


	// 使用配置的传输协议写入数据 
	// method是方法名  CALL是TMessageType对应的枚举  seqId是序列Id 
    // 调用server接口 其实就是给server发一个请求,按照约定的传输协议(TProtocol)和传输方式(TTransport)传输对应的二进制数据 
    // 这个例子里传输协议是 TBinaryProtocol  传输方式是 TSocket
	if err := oprot.WriteMessageBegin(ctx, method, CALL, seqId); err != nil {
		return err
	}
	if err := args.Write(ctx, oprot); err != nil {
		return err
	}
	if err := oprot.WriteMessageEnd(ctx); err != nil {
		return err
    }
    
    // 传输协议 调 传输方式 flush 
	return oprot.Flush(ctx)
}

可以看到,client 的send方法主要作用有2个
1、(在传输协议是THeaderProtocol的情况下)设置headers
2、按照约定的传输一些写数据,并flush

(3.2.4) Recv

// 接收请求方法
func (p *TStandardClient) Recv(ctx context.Context, iprot TProtocol, seqId int32, method string, result TStruct) error {
    // 
	rMethod, rTypeId, rSeqId, err := iprot.ReadMessageBegin(ctx)
	if err != nil {
		return err
	}

    // 消息校验 校验方法名、序列Id、消息类型
	if method != rMethod {  // 方法和消息里的方法不一致,抛异常
		return NewTApplicationException(WRONG_METHOD_NAME, fmt.Sprintf("%s: wrong method name", method))
	} else if seqId != rSeqId { // 序列Id不一致,抛异常 
		return NewTApplicationException(BAD_SEQUENCE_ID, fmt.Sprintf("%s: out of order sequence response", method))
	} else if rTypeId == EXCEPTION { // 异常类消息,抛异常
		var exception tApplicationException
		if err := exception.Read(ctx, iprot); err != nil {
			return err
		}

		if err := iprot.ReadMessageEnd(ctx); err != nil {
			return err
		}

		return &exception
	} else if rTypeId != REPLY { // 返回结果里消息类型不是回复消息,抛异常
		return NewTApplicationException(INVALID_MESSAGE_TYPE_EXCEPTION, fmt.Sprintf("%s: invalid message type", method))
	}

    // 读取消息
	if err := result.Read(ctx, iprot); err != nil {
		return err
	}

    // 
	return iprot.ReadMessageEnd(ctx)
}

(3.3) WrappedTClient

Wrapper提供了一种包装机制,使得在执行某方法前先执行Wrapper
因此可以在客户端和服务器做很多功能:接口监控、熔断限流、鉴权、Filter等。

// file thrift/middleware.go
package thrift

// WrappedTClient is a convenience struct that implements the TClient interface
// using inner Wrapped function.
//
// This is provided to aid in developing ClientMiddleware.
type WrappedTClient struct {
	Wrapped func(ctx context.Context, method string, args, result TStruct) (ResponseMeta, error)
}

(3.3.1) 新建WrappedTClient

// file thrift/middleware.go
package thrift

// 验证 WrappedTClient 实现了 TClient
// verify that WrappedTClient implements TClient
var (
	_ TClient = WrappedTClient{}
	_ TClient = (*WrappedTClient)(nil)
)


// WrappedTClient 是一个使用内部 Wrapped 函数实现 TClient 接口的便利结构。
// 这是为了帮助开发 ClientMiddleware。
// 
// WrapClient wraps the given TClient in the given middlewares.
//
// Middlewares will be called in the order that they are defined:
//
//		1. Middlewares[0]
//		2. Middlewares[1]
//		...
//		N. Middlewares[n]
func WrapClient(client TClient, middlewares ...ClientMiddleware) TClient {
	// 反向添加中间件,因此列表中的第一个是最外面的。
	// Add middlewares in reverse so the first in the list is the outermost.
	for i := len(middlewares) - 1; i >= 0; i-- {
		// middlewares[i]是ClientMiddleware,是一个函数,client是对应的入参 
		client = middlewares[i](client)
	}
	return client
}

(3.3.2) Call

// file thrift/middleware.go
package thrift

// Call 通过调用 c.Wrapped的TClient接口实现 来方法调用 并返回结果 。
// Call implements the TClient interface by calling and returning c.Wrapped.
func (c WrappedTClient) Call(ctx context.Context, method string, args, result TStruct) (ResponseMeta, error) {
	// 
	return c.Wrapped(ctx, method, args, result)
}

(3.4) WrappedTClient Demo

func main() {
	var thriftClient thrift.TClient
	thriftClient = getThriftClient()

	// 扩展点:日志中间件  
	clientMiddleware := clientLoggingMiddleware()
	// 包装后的client
	wrapClient := thrift.WrapClient(thriftClient, clientMiddleware)

	// 创建 calculatorClient   tutorial是由idl生成的
	calculatorClient := tutorial.NewCalculatorClient(wrapClient)
	// 调用Add方法
	sum, _ := calculatorClient.Add(defaultCtx, 1, 2)
	fmt.Print("1+2=", sum, "\n")
}
// 生成一个日志中间件 Demo用  
func clientLoggingMiddleware() thrift.ClientMiddleware {

	clientMiddleware := func(next thrift.TClient) thrift.TClient {
		wrappedTClient := thrift.WrappedTClient{
			Wrapped: func(ctx context.Context, method string, args, result thrift.TStruct) (thrift.ResponseMeta, error) {
				// 调用前打印 方法名及参数
				log.Printf("Before: %q    Args: %#v ", method, args)
				// 方法调用
				headers, err := next.Call(ctx, method, args, result)
				// 调用后打印
				log.Printf("After: %q    Result: %#v", method, result)
				if err != nil {
					log.Printf("Error: %v", err)
				}
				return headers, err
			},
		}
		return wrappedTClient
	}

	return clientMiddleware
}

问题

(1) clinet write tcp broken pipe

17:26:38.879  errmsg=write tcp 127.0.0.1:57899->127.0.0.1:8081: write: broken pipe

17:26:38.884  Error while flushing write buffer of size 287 to transport, only wrote 0 bytes: write tcp 127.0.0.1:59781->127.0.0.1:8081: write: broken pipe

17:26:43.220  errmsg=write tcp 127.0.0.1:59781->127.0.0.1:8081: write: broken pipe

client建立连接后,server读数据超时断开连接,client再写数据提示 broken pipe

server write tcp broken pipe

errmsg=write tcp 10.157.121.37:8300->10.159.184.191:57066: write: broken pipe||timeout=0s
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:58842: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:50344: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:38252: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:22356: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:50002: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:32572: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:59450: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:59320: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:41668: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:36668: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:60856: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:49126: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:30506: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:55618: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:22438: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:43892: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:26776: write: broken pipe

client调用超时,server处理请求后写入数据发送,但是这个时候client已经关闭连接,所以报错 write: broken pipe

参考资料

[1] github-thrift-0.16.0-go
[2] Golang微服务:Micro限流、熔断