thrift 传输方式(Transport)
(1) 传输方式(Transport)作用
传输方式(Transport)作为rpc框架接收报文的入口,提供各种底层实现如socket创建、读写、接收连接等。
同时实现各种复写传输层包括http、framed、buffered、压缩传输等。
(1.1) 支持的传输方式
thrift支持多种传输方式
传输方式 | 特点 |
---|---|
TSocket | 阻塞型 socket,用于客户端,采用系统函数 read 和 write 进行读写数据。 |
TServerSocket | 非阻塞型 socket,用于服务器端,accecpt 到的 socket 类型都是 TSocket(即阻塞型 socket)。 |
TBufferedTransport | |
TFramedTransport | |
TMemoryBuffer | |
TFileTransport | |
TFDTransport | |
TSimpleFileTransport | |
TZlibTransport | |
TSSLSocket | |
TSSLServerSocket |
(1.2) 传输方式提供的能力
能力 | Java TTransport | go TTransport |
---|---|---|
传输是否打开? | TTransport::isOpen() | IsOpen() |
有更多数据要读取? | TTransport::peek() | Open() |
打开传输进行读/写 | TTransport::open() | - |
关闭传输 | TTransport::close() | Closer::Close() (组合自io.Closer) |
从通道中读取字节序列到缓冲区 | read(ByteBuffer dst) | |
从offset开始,将len个字节读入缓冲区buf。 | read(byte[] buf, int off, int len) | Reader::Read(p []byte) (组合自io.Reader) |
从传输中读取所有len个字节。 | readAll(byte[] buf, int off, int len) | - |
将缓冲区数据写到输出 | write(byte[] buf) | Writer::Write(p []byte) (n int, err error) (组合自io.Writer) |
从缓冲区写出len字节数据 | write(byte[] buf, int off, int len) | - |
将字节序列写到缓冲区 | write(ByteBuffer src) | - |
清空传输缓冲区 | flush() | ContextFlusher::Flush(ctx context.Context) (组合自ContextFlusher) |
直接访问协议的底层缓冲区 | getBuffer() | - |
返回底层缓冲区中的索引 | getBufferPosition() | - |
获取底层缓冲区中剩余的字节数。 | getBytesRemainingInBuffer() | ReadSizeProvider::RemainingBytes() (组合自ReadSizeProvider) |
从底层缓冲区消费len字节 | consumeBuffer(int len) | - |
获取传输配置 | getConfiguration() | - |
更新消息大小 | updateKnownMessageSize(long size) | - |
校验是否可以读取数据 | checkReadBytesAvailable(long numBytes) | - |
(2) 源码解读-go
thrift 源码版本 0.16.0
https://github.com/apache/thrift/tree/0.16.0/lib/go/thrift
(2.1) TTransport
// Encapsulates the I/O layer
type TTransport interface {
io.ReadWriteCloser
Flusher
ReadSizeProvider
// Opens the transport for communication
Open() error
// Returns true if the transport is open
IsOpen() bool
}
package io
// ReadWriteCloser is the interface that groups the basic Read, Write and Close methods.
type ReadWriteCloser interface {
Reader
Writer
Closer
}
type Reader interface {
Read(p []byte) (n int, err error)
}
type Writer interface {
Write(p []byte) (n int, err error)
}
type Closer interface {
Close() error
}
package thrift
type ContextFlusher interface {
Flush(ctx context.Context) (err error)
}
type ReadSizeProvider interface {
RemainingBytes() (num_bytes uint64)
}
(2.2) 客户端阻塞型socket (TSocket)
package thrift
// TSocket 实现了 TTransport 接口
type TSocket struct {
conn *socketConn // socketConn是对net.Conn的包装
addr net.Addr // 网络地址 地址类型(tcp) 地址描述(192.0.2.1:25)
cfg *TConfiguration // 传输配置
}
创建TSocket的方式
//
// Deprecated: Use NewTSocketConf instead.
func NewTSocket(hostPort string) (*TSocket, error) {
return NewTSocketConf(hostPort, &TConfiguration{
noPropagation: true,
}), nil
}
//
// NewTSocketConf creates a net.Conn-backed TTransport, given a host and port.
//
// Example:
//
// trans, err := thrift.NewTSocketConf("localhost:9090", &TConfiguration{
// ConnectTimeout: time.Second, // Use 0 for no timeout
// SocketTimeout: time.Second, // Use 0 for no timeout
// })
func NewTSocketConf(hostPort string, conf *TConfiguration) *TSocket {
return NewTSocketFromAddrConf(tcpAddr(hostPort), conf)
}
// Deprecated: Use NewTSocketConf instead.
func NewTSocketTimeout(hostPort string, connTimeout time.Duration, soTimeout time.Duration) (*TSocket, error) {
return NewTSocketConf(hostPort, &TConfiguration{
ConnectTimeout: connTimeout,
SocketTimeout: soTimeout,
noPropagation: true,
}), nil
}
// 根据地址+配置创建 TSocket
// 最终都是通过 NewTSocketFromAddrConf 创建 TSocket
// NewTSocketFromAddrConf creates a TSocket from a net.Addr
func NewTSocketFromAddrConf(addr net.Addr, conf *TConfiguration) *TSocket {
return &TSocket{
addr: addr,
cfg: conf,
}
}
// Deprecated: Use NewTSocketFromAddrConf instead.
func NewTSocketFromAddrTimeout(addr net.Addr, connTimeout time.Duration, soTimeout time.Duration) *TSocket {
return NewTSocketFromAddrConf(addr, &TConfiguration{
ConnectTimeout: connTimeout,
SocketTimeout: soTimeout,
noPropagation: true,
})
}
// NewTSocketFromConnConf creates a TSocket from an existing net.Conn.
func NewTSocketFromConnConf(conn net.Conn, conf *TConfiguration) *TSocket {
return &TSocket{
conn: wrapSocketConn(conn),
addr: conn.RemoteAddr(),
cfg: conf,
}
}
// Deprecated: Use NewTSocketFromConnConf instead.
func NewTSocketFromConnTimeout(conn net.Conn, socketTimeout time.Duration) *TSocket {
return NewTSocketFromConnConf(conn, &TConfiguration{
SocketTimeout: socketTimeout,
noPropagation: true,
})
}
open()
// Connects the socket, creating a new socket object if necessary.
func (p *TSocket) Open() error {
// 创建前校验 校验状态是否已经开启 校验参数是否合法
// 校验是否已经可用
if p.conn.isValid() {
// 如果已经可用,抛异常
return NewTTransportException(ALREADY_OPEN, "Socket already connected.")
}
if p.addr == nil {
return NewTTransportException(NOT_OPEN, "Cannot open nil address.")
}
if len(p.addr.Network()) == 0 {
return NewTTransportException(NOT_OPEN, "Cannot open bad network name.")
}
if len(p.addr.String()) == 0 {
return NewTTransportException(NOT_OPEN, "Cannot open bad address.")
}
var err error
// 创建
if p.conn, err = createSocketConnFromReturn(net.DialTimeout(
p.addr.Network(),
p.addr.String(),
p.cfg.GetConnectTimeout(),
)); err != nil {
return &tTransportException{
typeId: NOT_OPEN,
err: err,
msg: err.Error(),
}
}
// 更新socket地址
p.addr = p.conn.RemoteAddr()
return nil
}
socketConn
p.conn.isValid()
// socketConn is a wrapped net.Conn that tries to do connectivity check.
type socketConn struct {
net.Conn
buffer [1]byte
closed int32
}
// isValid checks whether there's a valid connection.
//
// It's nil safe, and returns false if sc itself is nil, or if the underlying
// connection is nil.
//
// It's the same as the previous implementation of TSocket.IsOpen and
// TSSLSocket.IsOpen before we added connectivity check.
func (sc *socketConn) isValid() bool {
return sc != nil && sc.Conn != nil && atomic.LoadInt32(&sc.closed) == 0
}
// createSocketConnFromReturn is a language sugar to help create socketConn from
// return values of functions like net.Dial, tls.Dial, net.Listener.Accept, etc.
func createSocketConnFromReturn(conn net.Conn, err error) (*socketConn, error) {
if err != nil {
return nil, err
}
return &socketConn{
Conn: conn,
}, nil
}
net.Dialer
// 拨号器包含连接到地址的选项。
// 每个字段的零值相当于没有该选项的拨号。因此,使用 Dialer 的零值拨号等同于调用 Dial 函数。
// 同时调用 Dialer 的方法是安全的。
//
type Dialer struct {
// 超时是拨号等待连接完成的最长时间。如果还设置了截止日期,它可能会提前失败。// 默认是没有超时。
// 当使用 TCP 并拨打具有多个 IP 地址的主机名时,超时可能会在它们之间划分。
// 无论有没有超时,操作系统都可以强加自己的早期超时。例如,TCP 超时通常在 3 分钟左右。
Timeout time.Duration
// 截止日期是拨号失败的绝对时间点。如果设置了 Timeout,它可能会提前失败。
// 零表示没有截止日期,或者与超时选项一样取决于操作系统。
Deadline time.Time
// LocalAddr is the local address to use when dialing an
// address. The address must be of a compatible type for the
// network being dialed.
// If nil, a local address is automatically chosen.
LocalAddr Addr
// DualStack previously enabled RFC 6555 Fast Fallback
// support, also known as "Happy Eyeballs", in which IPv4 is
// tried soon if IPv6 appears to be misconfigured and
// hanging.
//
// Deprecated: Fast Fallback is enabled by default. To
// disable, set FallbackDelay to a negative value.
DualStack bool
// FallbackDelay specifies the length of time to wait before
// spawning a RFC 6555 Fast Fallback connection. That is, this
// is the amount of time to wait for IPv6 to succeed before
// assuming that IPv6 is misconfigured and falling back to
// IPv4.
//
// If zero, a default delay of 300ms is used.
// A negative value disables Fast Fallback support.
FallbackDelay time.Duration
// KeepAlive specifies the interval between keep-alive
// probes for an active network connection.
// If zero, keep-alive probes are sent with a default value
// (currently 15 seconds), if supported by the protocol and operating
// system. Network protocols or operating systems that do
// not support keep-alives ignore this field.
// If negative, keep-alive probes are disabled.
KeepAlive time.Duration
// Resolver optionally specifies an alternate resolver to use.
Resolver *Resolver
// Cancel is an optional channel whose closure indicates that
// the dial should be canceled. Not all types of dials support
// cancellation.
//
// Deprecated: Use DialContext instead.
Cancel <-chan struct{}
// If Control is not nil, it is called after creating the network
// connection but before actually dialing.
//
// Network and address parameters passed to Control method are not
// necessarily the ones passed to Dial. For example, passing "tcp" to Dial
// will cause the Control function to be called with "tcp4" or "tcp6".
Control func(network, address string, c syscall.RawConn) error
}
// 拨号连接到指定网络上的地址。
//
// 有关网络和地址参数的说明,请参见 func Dial。
//
// Dial 在内部使用 context.Background;要指定上下文,请使用 DialContext。
func (d *Dialer) Dial(network, address string) (Conn, error) {
return d.DialContext(context.Background(), network, address)
}
(2.3) TFramedTransport
package thrift
// framed_transport.go
type TFramedTransport struct {
transport TTransport //
cfg *TConfiguration //
writeBuf *bytes.Buffer //
reader *bufio.Reader //
readBuf *bytes.Buffer //
buffer [4]byte //
}
(2.4) 服务端非阻塞型socket(TServerSocket)
// thrift/server_socket.go
package thrift
// TServerSocket
// 实现了 TServerTransport 接口
type TServerSocket struct {
listener net.Listener // 监听
addr net.Addr // 地址
clientTimeout time.Duration // 超时时间
// Protects the interrupted value to make it thread safe.
mu sync.RWMutex // 读写锁
interrupted bool / 是否中断
}
// thrift/server_transport.go
package thrift
// 服务器传输。提供客户端传输的对象。
//
// Server transport. Object which provides client transports.
type TServerTransport interface {
Listen() error // 监听
Accept() (TTransport, error) // 接收请求
Close() error // 关闭
// 可选方法实现。 向服务器传输发出信号,表明它应该中断当前被阻止的任何 accept() 或 listen() 。
// 此方法(如果实现)必须是线程安全的,因为它可能从与其他 TServerTransport 方法不同的线程上下文中调用。
//
// Optional method implementation. This signals to the server transport
// that it should break out of any accept() or listen() that it is currently
// blocked on. This method, if implemented, MUST be thread safe, as it may
// be called from a different thread context than the other TServerTransport
// methods.
Interrupt() error // 中断
}
创建TServerSocket的方法
// 根据地址创建TServerSocket
func NewTServerSocket(listenAddr string) (*TServerSocket, error) {
return NewTServerSocketTimeout(listenAddr, 0)
}
// 根据地址和超时时间创建TServerSocket
func NewTServerSocketTimeout(listenAddr string, clientTimeout time.Duration) (*TServerSocket, error) {
// 解析tcp地址
addr, err := net.ResolveTCPAddr("tcp", listenAddr)
if err != nil {
return nil, err
}
return &TServerSocket{addr: addr, clientTimeout: clientTimeout}, nil
}
// 根据地址创建TServerSocket
// Creates a TServerSocket from a net.Addr
func NewTServerSocketFromAddrTimeout(addr net.Addr, clientTimeout time.Duration) *TServerSocket {
return &TServerSocket{addr: addr, clientTimeout: clientTimeout}
}
监听请求 Listen()
func (p *TServerSocket) Listen() error {
p.mu.Lock() // 加互斥锁 保护监听资源
defer p.mu.Unlock() // 互斥锁解锁
// 检查socket是否还在监听 如果在监听直接返回
if p.IsListening() {
return nil
}
// 监听指定地址 这里是监听 tcp localhost:9090
l, err := net.Listen(p.addr.Network(), p.addr.String())
if err != nil {
return err
}
// 更新监听对象
p.listener = l
return nil
}
// 检查socket是否还在监听
// Checks whether the socket is listening.
func (p *TServerSocket) IsListening() bool {
return p.listener != nil
}
接收请求 Accept()
func (p *TServerSocket) Accept() (TTransport, error) {
p.mu.RLock() // (interrupted资源) 加写锁
interrupted := p.interrupted // 更新interrupted值
p.mu.RUnlock() // 释放(interrupted资源)写锁
// 如果已经中断,抛异常
if interrupted {
return nil, errTransportInterrupted
}
p.mu.Lock() // (listener资源)加互斥锁
listener := p.listener // 更新listener值
p.mu.Unlock() // (listener资源)解锁
if listener == nil {
return nil, NewTTransportException(NOT_OPEN, "No underlying server socket")
}
// 接收请求 创建socket、建立连接
conn, err := listener.Accept()
if err != nil {
return nil, NewTTransportExceptionFromError(err)
}
// 返回创建的连接
return NewTSocketFromConnTimeout(conn, p.clientTimeout), nil
}
Open()
// 连接socket,创建一个socket
// Connects the socket, creating a new socket object if necessary.
func (p *TServerSocket) Open() error {
p.mu.Lock() // (listener资源)加互斥锁
defer p.mu.Unlock() // (listener资源)解锁
// 已经在监听
if p.IsListening() {
return NewTTransportException(ALREADY_OPEN, "Server socket already open")
}
// 创建监听
if l, err := net.Listen(p.addr.Network(), p.addr.String()); err != nil {
return err
} else {
// 更新监听
p.listener = l
}
return nil
}
Close()
func (p *TServerSocket) Close() error {
var err error
p.mu.Lock()
if p.IsListening() {
err = p.listener.Close()
p.listener = nil
}
p.mu.Unlock()
return err
}
Interrupt()
func (p *TServerSocket) Interrupt() error {
p.mu.Lock()
p.interrupted = true
p.mu.Unlock()
p.Close()
return nil
}
(3) 源码解读-Java
thrift 源码版本 0.16.0
https://github.com/apache/thrift/tree/0.16.0/lib/java/thrift
参考
[1] 和 Thrift 的一场美丽邂逅