thrift 传输方式(Transport)

(1) 传输方式(Transport)作用

传输方式(Transport)作为rpc框架接收报文的入口,提供各种底层实现如socket创建、读写、接收连接等。
同时实现各种复写传输层包括http、framed、buffered、压缩传输等。

(1.1) 支持的传输方式

thrift支持多种传输方式

传输方式 特点
TSocket 阻塞型 socket,用于客户端,采用系统函数 read 和 write 进行读写数据。
TServerSocket 非阻塞型 socket,用于服务器端,accecpt 到的 socket 类型都是 TSocket(即阻塞型 socket)。
TBufferedTransport
TFramedTransport
TMemoryBuffer
TFileTransport
TFDTransport
TSimpleFileTransport
TZlibTransport
TSSLSocket
TSSLServerSocket

(1.2) 传输方式提供的能力

能力 Java TTransport go TTransport
传输是否打开? TTransport::isOpen() IsOpen()
有更多数据要读取? TTransport::peek() Open()
打开传输进行读/写 TTransport::open() -
关闭传输 TTransport::close() Closer::Close() (组合自io.Closer)
从通道中读取字节序列到缓冲区 read(ByteBuffer dst)
从offset开始,将len个字节读入缓冲区buf。 read(byte[] buf, int off, int len) Reader::Read(p []byte) (组合自io.Reader)
从传输中读取所有len个字节。 readAll(byte[] buf, int off, int len) -
将缓冲区数据写到输出 write(byte[] buf) Writer::Write(p []byte) (n int, err error) (组合自io.Writer)
从缓冲区写出len字节数据 write(byte[] buf, int off, int len) -
将字节序列写到缓冲区 write(ByteBuffer src) -
清空传输缓冲区 flush() ContextFlusher::Flush(ctx context.Context) (组合自ContextFlusher)
直接访问协议的底层缓冲区 getBuffer() -
返回底层缓冲区中的索引 getBufferPosition() -
获取底层缓冲区中剩余的字节数。 getBytesRemainingInBuffer() ReadSizeProvider::RemainingBytes() (组合自ReadSizeProvider)
从底层缓冲区消费len字节 consumeBuffer(int len) -
获取传输配置 getConfiguration() -
更新消息大小 updateKnownMessageSize(long size) -
校验是否可以读取数据 checkReadBytesAvailable(long numBytes) -

(2) 源码解读-go

thrift 源码版本 0.16.0
https://github.com/apache/thrift/tree/0.16.0/lib/go/thrift

(2.1) TTransport

// Encapsulates the I/O layer
type TTransport interface {
	io.ReadWriteCloser
	Flusher
	ReadSizeProvider

	// Opens the transport for communication
	Open() error

	// Returns true if the transport is open
	IsOpen() bool
}
package io

// ReadWriteCloser is the interface that groups the basic Read, Write and Close methods.
type ReadWriteCloser interface {
	Reader
	Writer
	Closer
}

type Reader interface {
	Read(p []byte) (n int, err error)
}

type Writer interface {
	Write(p []byte) (n int, err error)
}

type Closer interface {
	Close() error
}
package thrift

type ContextFlusher interface {
	Flush(ctx context.Context) (err error)
}

type ReadSizeProvider interface {
	RemainingBytes() (num_bytes uint64)
}

(2.2) 客户端阻塞型socket (TSocket)

package thrift

// TSocket 实现了 TTransport 接口
type TSocket struct {
	conn *socketConn      // socketConn是对net.Conn的包装 
	addr net.Addr         // 网络地址 地址类型(tcp) 地址描述(192.0.2.1:25)
	cfg  *TConfiguration  // 传输配置 
}

创建TSocket的方式

// 
// Deprecated: Use NewTSocketConf instead.
func NewTSocket(hostPort string) (*TSocket, error) {
	return NewTSocketConf(hostPort, &TConfiguration{
		noPropagation: true,
	}), nil
}

// 
// NewTSocketConf creates a net.Conn-backed TTransport, given a host and port.
//
// Example:
//
//     trans, err := thrift.NewTSocketConf("localhost:9090", &TConfiguration{
//         ConnectTimeout: time.Second, // Use 0 for no timeout
//         SocketTimeout:  time.Second, // Use 0 for no timeout
//     })
func NewTSocketConf(hostPort string, conf *TConfiguration) *TSocket {
	return NewTSocketFromAddrConf(tcpAddr(hostPort), conf)
}

// Deprecated: Use NewTSocketConf instead.
func NewTSocketTimeout(hostPort string, connTimeout time.Duration, soTimeout time.Duration) (*TSocket, error) {
	return NewTSocketConf(hostPort, &TConfiguration{
		ConnectTimeout: connTimeout,
		SocketTimeout:  soTimeout,

		noPropagation: true,
	}), nil
}

// 根据地址+配置创建 TSocket
// 最终都是通过 NewTSocketFromAddrConf 创建 TSocket
// NewTSocketFromAddrConf creates a TSocket from a net.Addr
func NewTSocketFromAddrConf(addr net.Addr, conf *TConfiguration) *TSocket {
	return &TSocket{
		addr: addr,
		cfg:  conf,
	}
}

// Deprecated: Use NewTSocketFromAddrConf instead.
func NewTSocketFromAddrTimeout(addr net.Addr, connTimeout time.Duration, soTimeout time.Duration) *TSocket {
	return NewTSocketFromAddrConf(addr, &TConfiguration{
		ConnectTimeout: connTimeout,
		SocketTimeout:  soTimeout,

		noPropagation: true,
	})
}

// NewTSocketFromConnConf creates a TSocket from an existing net.Conn.
func NewTSocketFromConnConf(conn net.Conn, conf *TConfiguration) *TSocket {
	return &TSocket{
		conn: wrapSocketConn(conn),
		addr: conn.RemoteAddr(),
		cfg:  conf,
	}
}

// Deprecated: Use NewTSocketFromConnConf instead.
func NewTSocketFromConnTimeout(conn net.Conn, socketTimeout time.Duration) *TSocket {
	return NewTSocketFromConnConf(conn, &TConfiguration{
		SocketTimeout: socketTimeout,

		noPropagation: true,
	})
}

open()

// Connects the socket, creating a new socket object if necessary.
func (p *TSocket) Open() error {
	// 创建前校验  校验状态是否已经开启 校验参数是否合法 
	// 校验是否已经可用
	if p.conn.isValid() {
		// 如果已经可用,抛异常
		return NewTTransportException(ALREADY_OPEN, "Socket already connected.")
	}
	if p.addr == nil {
		return NewTTransportException(NOT_OPEN, "Cannot open nil address.")
	}
	if len(p.addr.Network()) == 0 {
		return NewTTransportException(NOT_OPEN, "Cannot open bad network name.")
	}
	if len(p.addr.String()) == 0 {
		return NewTTransportException(NOT_OPEN, "Cannot open bad address.")
	}
	var err error
	// 创建
	if p.conn, err = createSocketConnFromReturn(net.DialTimeout(
		p.addr.Network(),
		p.addr.String(),
		p.cfg.GetConnectTimeout(),
	)); err != nil {
		return &tTransportException{
			typeId: NOT_OPEN,
			err:    err,
			msg:    err.Error(),
		}
	}
	// 更新socket地址
	p.addr = p.conn.RemoteAddr()
	return nil
}

socketConn

p.conn.isValid()

// socketConn is a wrapped net.Conn that tries to do connectivity check.
type socketConn struct {
	net.Conn

	buffer [1]byte
	closed int32
}

// isValid checks whether there's a valid connection.
//
// It's nil safe, and returns false if sc itself is nil, or if the underlying
// connection is nil.
//
// It's the same as the previous implementation of TSocket.IsOpen and
// TSSLSocket.IsOpen before we added connectivity check.
func (sc *socketConn) isValid() bool {
	return sc != nil && sc.Conn != nil && atomic.LoadInt32(&sc.closed) == 0
}


// createSocketConnFromReturn is a language sugar to help create socketConn from
// return values of functions like net.Dial, tls.Dial, net.Listener.Accept, etc.
func createSocketConnFromReturn(conn net.Conn, err error) (*socketConn, error) {
	if err != nil {
		return nil, err
	}
	return &socketConn{
		Conn: conn,
	}, nil
}

net.Dialer

// 拨号器包含连接到地址的选项。
// 每个字段的零值相当于没有该选项的拨号。因此,使用 Dialer 的零值拨号等同于调用 Dial 函数。
// 同时调用 Dialer 的方法是安全的。
// 
type Dialer struct {
	// 超时是拨号等待连接完成的最长时间。如果还设置了截止日期,它可能会提前失败。// 默认是没有超时。
	// 当使用 TCP 并拨打具有多个 IP 地址的主机名时,超时可能会在它们之间划分。
	// 无论有没有超时,操作系统都可以强加自己的早期超时。例如,TCP 超时通常在 3 分钟左右。
	Timeout time.Duration

	// 截止日期是拨号失败的绝对时间点。如果设置了 Timeout,它可能会提前失败。
 	// 零表示没有截止日期,或者与超时选项一样取决于操作系统。
	Deadline time.Time

	// LocalAddr is the local address to use when dialing an
	// address. The address must be of a compatible type for the
	// network being dialed.
	// If nil, a local address is automatically chosen.
	LocalAddr Addr

	// DualStack previously enabled RFC 6555 Fast Fallback
	// support, also known as "Happy Eyeballs", in which IPv4 is
	// tried soon if IPv6 appears to be misconfigured and
	// hanging.
	//
	// Deprecated: Fast Fallback is enabled by default. To
	// disable, set FallbackDelay to a negative value.
	DualStack bool

	// FallbackDelay specifies the length of time to wait before
	// spawning a RFC 6555 Fast Fallback connection. That is, this
	// is the amount of time to wait for IPv6 to succeed before
	// assuming that IPv6 is misconfigured and falling back to
	// IPv4.
	//
	// If zero, a default delay of 300ms is used.
	// A negative value disables Fast Fallback support.
	FallbackDelay time.Duration

	// KeepAlive specifies the interval between keep-alive
	// probes for an active network connection.
	// If zero, keep-alive probes are sent with a default value
	// (currently 15 seconds), if supported by the protocol and operating
	// system. Network protocols or operating systems that do
	// not support keep-alives ignore this field.
	// If negative, keep-alive probes are disabled.
	KeepAlive time.Duration

	// Resolver optionally specifies an alternate resolver to use.
	Resolver *Resolver

	// Cancel is an optional channel whose closure indicates that
	// the dial should be canceled. Not all types of dials support
	// cancellation.
	//
	// Deprecated: Use DialContext instead.
	Cancel <-chan struct{}

	// If Control is not nil, it is called after creating the network
	// connection but before actually dialing.
	//
	// Network and address parameters passed to Control method are not
	// necessarily the ones passed to Dial. For example, passing "tcp" to Dial
	// will cause the Control function to be called with "tcp4" or "tcp6".
	Control func(network, address string, c syscall.RawConn) error
}

// 拨号连接到指定网络上的地址。
//
// 有关网络和地址参数的说明,请参见 func Dial。
//
// Dial 在内部使用 context.Background;要指定上下文,请使用 DialContext。
func (d *Dialer) Dial(network, address string) (Conn, error) {
	return d.DialContext(context.Background(), network, address)
}

(2.3) TFramedTransport

package thrift
// framed_transport.go 

type TFramedTransport struct {
	transport TTransport   // 

	cfg *TConfiguration    //

	writeBuf *bytes.Buffer //

	reader  *bufio.Reader  //
	readBuf *bytes.Buffer  //

	buffer [4]byte         // 
}

(2.4) 服务端非阻塞型socket(TServerSocket)

// thrift/server_socket.go
package thrift

// TServerSocket 
// 实现了 TServerTransport 接口
type TServerSocket struct {
	listener      net.Listener   // 监听
	addr          net.Addr       // 地址
	clientTimeout time.Duration  // 超时时间

	// Protects the interrupted value to make it thread safe.
	mu          sync.RWMutex  // 读写锁
	interrupted bool          / 是否中断
}
// thrift/server_transport.go
package thrift

// 服务器传输。提供客户端传输的对象。
// 
// Server transport. Object which provides client transports.
type TServerTransport interface {
	Listen() error                 // 监听
	Accept() (TTransport, error)   // 接收请求
	Close() error                  // 关闭

	// 可选方法实现。 向服务器传输发出信号,表明它应该中断当前被阻止的任何 accept() 或 listen() 。
	// 此方法(如果实现)必须是线程安全的,因为它可能从与其他 TServerTransport 方法不同的线程上下文中调用。
	// 
	// Optional method implementation. This signals to the server transport
	// that it should break out of any accept() or listen() that it is currently
	// blocked on. This method, if implemented, MUST be thread safe, as it may
	// be called from a different thread context than the other TServerTransport
	// methods.
	Interrupt() error  // 中断 
}

创建TServerSocket的方法

// 根据地址创建TServerSocket
func NewTServerSocket(listenAddr string) (*TServerSocket, error) {
	return NewTServerSocketTimeout(listenAddr, 0)
}

// 根据地址和超时时间创建TServerSocket
func NewTServerSocketTimeout(listenAddr string, clientTimeout time.Duration) (*TServerSocket, error) {
	// 解析tcp地址
	addr, err := net.ResolveTCPAddr("tcp", listenAddr)
	if err != nil {
		return nil, err
	}
	return &TServerSocket{addr: addr, clientTimeout: clientTimeout}, nil
}

// 根据地址创建TServerSocket
// Creates a TServerSocket from a net.Addr
func NewTServerSocketFromAddrTimeout(addr net.Addr, clientTimeout time.Duration) *TServerSocket {
	return &TServerSocket{addr: addr, clientTimeout: clientTimeout}
}

监听请求 Listen()

func (p *TServerSocket) Listen() error {
	p.mu.Lock() // 加互斥锁 保护监听资源
	defer p.mu.Unlock() // 互斥锁解锁
	// 检查socket是否还在监听 如果在监听直接返回
	if p.IsListening() {
		return nil
	}
	// 监听指定地址  这里是监听 tcp localhost:9090 
	l, err := net.Listen(p.addr.Network(), p.addr.String())
	if err != nil {
		return err
	}
	// 更新监听对象
	p.listener = l
	return nil
}
// 检查socket是否还在监听
// Checks whether the socket is listening.
func (p *TServerSocket) IsListening() bool {
	return p.listener != nil
}

接收请求 Accept()

func (p *TServerSocket) Accept() (TTransport, error) {
	p.mu.RLock() // (interrupted资源) 加写锁 
	interrupted := p.interrupted // 更新interrupted值
	p.mu.RUnlock() // 释放(interrupted资源)写锁

	// 如果已经中断,抛异常
	if interrupted {
		return nil, errTransportInterrupted
	}

	p.mu.Lock() // (listener资源)加互斥锁
	listener := p.listener // 更新listener值
	p.mu.Unlock() // (listener资源)解锁
	if listener == nil {
		return nil, NewTTransportException(NOT_OPEN, "No underlying server socket")
	}

	// 接收请求  创建socket、建立连接  
	conn, err := listener.Accept()
	if err != nil {
		return nil, NewTTransportExceptionFromError(err)
	}
	// 返回创建的连接
	return NewTSocketFromConnTimeout(conn, p.clientTimeout), nil
}

Open()

// 连接socket,创建一个socket
// Connects the socket, creating a new socket object if necessary.
func (p *TServerSocket) Open() error {
	p.mu.Lock() // (listener资源)加互斥锁
	defer p.mu.Unlock() // (listener资源)解锁 
	// 已经在监听
	if p.IsListening() {
		return NewTTransportException(ALREADY_OPEN, "Server socket already open")
	}
	// 创建监听
	if l, err := net.Listen(p.addr.Network(), p.addr.String()); err != nil {
		return err
	} else {
		// 更新监听
		p.listener = l
	}
	return nil
}

Close()

func (p *TServerSocket) Close() error {
	var err error
	p.mu.Lock()
	if p.IsListening() {
		err = p.listener.Close()
		p.listener = nil
	}
	p.mu.Unlock()
	return err
}

Interrupt()

func (p *TServerSocket) Interrupt() error {
	p.mu.Lock()
	p.interrupted = true
	p.mu.Unlock()
	p.Close()

	return nil
}

(3) 源码解读-Java

thrift 源码版本 0.16.0
https://github.com/apache/thrift/tree/0.16.0/lib/java/thrift

参考

[1] 和 Thrift 的一场美丽邂逅