thrift Server

(1) thrift Server 作用

Server将 thrift 所有功能整合到一起:

1、创建一个 Transport;
2、创建 Transport 使用的 I/O Protocol;
3、为 I/O Protocol 创建 Processor;
4、启动服务,等待客户端的连接;

thrift不同语言实现提供的服务器端的模式不一样

thrift Java版本为服务器端提供了多种模式: TSimpleServer 、 TThreadPoolServer 、 TNonblockingServer 、 THsHaServer 、 TThreadedSelectorServer

Thrift Go版本为服务器端提供了 TSimpleServer

IO模型 Java Go 特点
阻塞IO TSimpleServer - 只有一个工作线程,循环监听新请求的到来并完成对请求的处理,一次只能接收和处理一个socket连接,效率比较低。
阻塞IO TThreadPoolServer TSimpleServer
IO多路复用 TNonblockingServer - TNonblockingServer 单线程工作,采用NIO的方式,所有的socket都被注册到selector中

从是否为阻塞IO角度划分为:阻塞服务模型、非阻塞服务模型。
阻塞服务模型:TSimpleServer、TThreadPoolServer。
非阻塞服务模型:TNonblockingServer、THsHaServer 和 TThreadedSelectorServer。

从网络服务模型角度划分为:单线程/协程、多线程/协程、事件驱动

(2) Server启动及处理请求流程

  1. Server初始化
    1.1 创建 ServerHandler 对应 calculatorHandler
    1.2 为 ServerHandler 添加(注册) processor, 这些processor都实现了 thrift.TProcessorFunction 接口
    1.3 创建 ServerTransport (Server的主要实现)
    1.4 根据 协议工厂(protocolFactory)、传输方式工厂(transportFactory)、服务传输方式(ServerTransport)、服务类(ServerHandler) 创建Server
  2. Server启动
    2.1 启动server 监听配置地址端口,
    2.2
    2.3 并循环处理请求
  3. Server处理请求
    3.1 获取对应的processor 从ServerHandler注册的
  4. Server关闭

(3) Demo-Go

源码 https://github.com/weikeqin/thrift-tutorial-go-demo

func main() {
	
	addr := flag.String("addr", "localhost:9090", "Address to listen to")
	var protocolFactory thrift.TProtocolFactory
	var transportFactory thrift.TTransportFactory
	var transport thrift.TServerTransport
	// 省略部分代码

	// CalculatorHandler 实现了 Calculator 接口
	calculatorHandler := NewCalculatorHandler()
	// CalculatorProcessor 实现了 TProcessor 接口
	calculatorProcessor := tutorial.NewCalculatorProcessor(calculatorHandler)
	// 创建 TSimpleServer
	server := thrift.NewTSimpleServer4(calculatorProcessor, transport, transportFactory, protocolFactory)
	fmt.Println("Starting the simple server... on ", addr)
	// 启动server
	err := server.Serve()
	if err != nil {
		fmt.Println(err)
	}
	fmt.Println("server start")

}

(4) 源码分析-go

(4.1) TSimpleServer

首先看一下 TSimpleServer 的结构,里面标识了 服务状态、服务任务等待组、互斥锁、处理类的工厂、服务通信方式、输入通信方式工厂

/*
 * 这不是典型的TSimpleServer,因为它在接受套接字后不会被阻塞。
 * 它更像是一个TThreadedServer,可以在不同的goroutine中处理不同的连接。
 * 如果golang用户在客户端实现了一个类似conn池的东西,这将起作用。
 * 
 * This is not a typical TSimpleServer as it is not blocked after accept a socket.
 * It is more like a TThreadedServer that can handle different connections in different goroutines.
 * This will work if golang user implements a conn-pool like thing in client side.
 */
type TSimpleServer struct {
	closed int32            // 关闭
	wg     sync.WaitGroup   // 等待任务组
	mu     sync.Mutex       // 互斥锁

	processorFactory       TProcessorFactory  // 处理器工厂
	serverTransport        TServerTransport   // 服务通信   server具体实现
	inputTransportFactory  TTransportFactory  // 传输层输入工厂
	outputTransportFactory TTransportFactory  // 传输层输出工厂
	inputProtocolFactory   TProtocolFactory   // 协议层输入工厂
	outputProtocolFactory  TProtocolFactory   // 协议层输出工厂

	// Headers to auto forward in THeaderProtocol
	forwardHeaders []string

	logger Logger
}

(4.1.1) Serve

Serve是启动服务的方法,主要做了2件事
1、监听指定地址及端口。
2、开始accept阻塞,循环 接收并处理客户端的请求。

go-TSimpleServer-流程图

// 
func (p *TSimpleServer) Serve() error {
	p.logger = fallbackLogger(p.logger)

    // 开启监听
	err := p.Listen()
	if err != nil {
		return err
    }
    
    // 阻塞,循环接收并处理客户端的请求
	p.AcceptLoop()
	return nil
}

(4.1.2) Listen()

// 监听
func (p *TSimpleServer) Listen() error {
	return p.serverTransport.Listen()
}

(4.1.3) AcceptLoop()

// 循环接收请求
func (p *TSimpleServer) AcceptLoop() error {
	for {
        // 内部接受 
        closed, err := p.innerAccept()
        // 这块出错只能是serverTransport接收请求出错
		if err != nil {
            // 出错,跳出循环,返回
			return err
        }
        // closed=0是服务器开启状态  
		if closed != 0 {
            // 服务关闭,跳出循环,返回
			return nil
		}
	}
}

(4.1.4) innerAccept

func (p *TSimpleServer) innerAccept() (int32, error) {
  // 接收请求   没有请求会被阻塞到这儿
  client, err := p.serverTransport.Accept()
  // 加互斥锁 保证处理请求并发安全
	p.mu.Lock()
  defer p.mu.Unlock()
  // 获取closed的值 
  closed := atomic.LoadInt32(&p.closed)
  // 检查服务是否关闭 closed=1是服务关闭
	if closed != 0 {
		return closed, nil
  }
  // 从serverTransport接收请求是否异常
	if err != nil {
		return 0, err
  }
  // client是传输方式 
	if client != nil {
    p.wg.Add(1)
    // 开一个goroutine单独处理
		go func() {
      defer p.wg.Done()
      // 处理请求
			if err := p.processRequests(client); err != nil {
				p.logger(fmt.Sprintf("error processing request: %v", err))
			}
		}()
	}
	return 0, nil
}

(4.1.5) processRequests

// 处理请求 
func (p *TSimpleServer) processRequests(client TTransport) (err error) {
	defer func() {
		err = treatEOFErrorsAsNil(err)
	}()
	// 获取处理器 
	// 这里获取到的是*tutorial.CalculatorProcessor
	processor := p.processorFactory.GetProcessor(client)
	// 获取传输层输入
	inputTransport, err := p.inputTransportFactory.GetTransport(client)
	if err != nil {
		return err
	}
	// 协议层输入
	inputProtocol := p.inputProtocolFactory.GetProtocol(inputTransport)
	var outputTransport TTransport
	var outputProtocol TProtocol

	// for THeaderProtocol, we must use the same protocol instance for
	// input and output so that the response is in the same dialect that
	// the server detected the request was in.
	headerProtocol, ok := inputProtocol.(*THeaderProtocol)
	if ok {
		outputProtocol = inputProtocol
	} else {
		// 传输层输出
		oTrans, err := p.outputTransportFactory.GetTransport(client)
		if err != nil {
			return err
		}
		outputTransport = oTrans
		// 协议层输出
		outputProtocol = p.outputProtocolFactory.GetProtocol(outputTransport)
	}

	if inputTransport != nil {
		defer inputTransport.Close()
	}
	if outputTransport != nil {
		defer outputTransport.Close()
	}
	for {
		if atomic.LoadInt32(&p.closed) != 0 {
			return nil
		}

		ctx := SetResponseHelper(
			defaultCtx,
			TResponseHelper{
				THeaderResponseHelper: NewTHeaderResponseHelper(outputProtocol),
			},
		)
		if headerProtocol != nil {
			// We need to call ReadFrame here, otherwise we won't
			// get any headers on the AddReadTHeaderToContext call.
			//
			// ReadFrame is safe to be called multiple times so it
			// won't break when it's called again later when we
			// actually start to read the message.
			if err := headerProtocol.ReadFrame(ctx); err != nil {
				return err
			}
			ctx = AddReadTHeaderToContext(ctx, headerProtocol.GetReadHeaders())
			ctx = SetWriteHeaderList(ctx, p.forwardHeaders)
		}

		// 调用对应处理方法处理请求 
		// 返回数据会写入到 outputProtocol的TTransport里
		ok, err := processor.Process(ctx, inputProtocol, outputProtocol)
		// 主动断开
		if errors.Is(err, ErrAbandonRequest) {
			return client.Close()
		}
			// 通信异常
		if errors.As(err, new(TTransportException)) && err != nil {
			// 经常遇到的错误 read: connection reset by peer 就是这儿返回的 
			// read tcp 127.0.0.1:9090->127.0.0.1:58470: read: connection reset by peer
			// 还有 EOF 
			return err
		}
		var tae TApplicationException
		if errors.As(err, &tae) && tae.TypeId() == UNKNOWN_METHOD {
			continue
		}
		if !ok {
			break
		}
	}
	return nil
}
// 
func (p *TSimpleServer) Stop() error {
	p.mu.Lock() // 加互斥锁
    defer p.mu.Unlock() // 互斥锁解锁
    // 如果已经关闭 直接返回    closed=1是关闭 closed=0是开启
	if atomic.LoadInt32(&p.closed) != 0 {
		return nil
    }
    // 设置closed=1 关闭
    atomic.StoreInt32(&p.closed, 1)
    // 发送中断信号
    p.serverTransport.Interrupt()
    // 等待所有请求处理完
	p.wg.Wait()
	return nil
}

TServerTransport

// 服务通信方式 
// Server transport. Object which provides client transports.
type TServerTransport interface {
	Listen() error
	Accept() (TTransport, error)
	Close() error

	// Optional method implementation. This signals to the server transport
	// that it should break out of any accept() or listen() that it is currently
	// blocked on. This method, if implemented, MUST be thread safe, as it may
	// be called from a different thread context than the other TServerTransport
	// methods.
	Interrupt() error
}

TServerTransport接口的实现类有2个: TServerSocketTSSLServerSocket

// 
type TServerSocket struct {
	listener      net.Listener
	addr          net.Addr
	clientTimeout time.Duration

	// Protects the interrupted value to make it thread safe.
	mu          sync.RWMutex
	interrupted bool
}


// 
func (p *TServerSocket) Listen() error {
	// 加互斥锁 保护p.listener 
	p.mu.Lock()
	defer p.mu.Unlock()
	// 已经在监听 返回
	if p.IsListening() {
		return nil
  }
	// 监听指定地址端口
	l, err := net.Listen(p.addr.Network(), p.addr.String())
	if err != nil {
		return err
  }
  // 赋值
	p.listener = l
	return nil
}

(5) Demo-Java

// 

(6) 源码解析Java

(6.1) TServer

package org.apache.thrift.server;

/**
 * 抽象 thrift服务
 * 
 * Generic interface for a Thrift server.
 *
 */
public abstract class TServer {

  /**
   * 核心处理工厂
   * Core processor
   */
  protected TProcessorFactory processorFactory_;

  /**
   * 服务通信
   * Server transport
   */
  protected TServerTransport serverTransport_;

  /**
   * 传输层输入工厂
   * Input Transport Factory
   */
  protected TTransportFactory inputTransportFactory_;

  /**
   * 传输层输出工厂
   * Output Transport Factory
   */
  protected TTransportFactory outputTransportFactory_;

  /**
   * 协议层输入工厂
   * Input Protocol Factory
   */
  protected TProtocolFactory inputProtocolFactory_;

  /**
   * 协议层输出工厂
   * Output Protocol Factory
   */
  protected TProtocolFactory outputProtocolFactory_;

  /**
   * 服务状态  
   * 启动 关闭
   * volatile保证可见性 
   */
  private volatile boolean isServing;

  protected TServerEventHandler eventHandler_;

  // Flag for stopping the server
  // Please see THRIFT-1795 for the usage of this flag
  protected volatile boolean stopped_ = false;

  /**
   * 构造方法
   * 设置处理工厂、服务通信方式、输入传输方式、输出通信方式、输入协议工厂、
   */
  protected TServer(AbstractServerArgs args) {
    processorFactory_ = args.processorFactory;
    serverTransport_ = args.serverTransport;
    inputTransportFactory_ = args.inputTransportFactory;
    outputTransportFactory_ = args.outputTransportFactory;
    inputProtocolFactory_ = args.inputProtocolFactory;
    outputProtocolFactory_ = args.outputProtocolFactory;
  }

  public static class Args extends AbstractServerArgs<Args> {
    public Args(TServerTransport transport) {
      super(transport);
    }
  }

  /**
   * run方法启动服务器并开始运行。
   * 
   * The run method fires up the server and gets things going.
   */
  public abstract void serve();

  /**
   * 停止服务
   * 这在每个实现的基础上是可选的。并非所有服务器都要求优雅停止。
   * 
   * Stop the server. This is optional on a per-implementation basis. Not
   * all servers are required to be cleanly stoppable.
   */
  public void stop() {}

}

可以看到TServer是一个抽象类

  1. 里面定义了创建server需要的信息:处理器工厂、服务通信方式(server的实现)、传输层输入工厂、传输层输出工厂、协议层输入工厂、协议层输出工厂
  2. 定义了TServer的构造方法
  3. 服务启动方法serve()是一个抽象方法,需要子类自己去实现;
  4. 服务关闭方法stop()是一个普通方法,子类可以选择是否实现。

(6.2) TSimpleServer

Java实现的 TSimpleServer 工作模式采用最简单的阻塞IO,实现简洁明了,比较适合入门学习。
用于演示Thrift的工作流程,测试用,不会在正式环境使用。

Java实现的 TSimpleServer 一次只能接收和处理一个socket连接,效率比较低。

package org.apache.thrift.server;

/**
 * Simple singlethreaded server for testing.
 *
 */
public class TSimpleServer extends TServer {

  public TSimpleServer(AbstractServerArgs args) {
    super(args);
  }

}

(6.2.1) serve

TSimpleServer启动流程

// 启动serve
public void serve() {
  try {
    // 监听配置的地址和端口  
    serverTransport_.listen();
  } catch (TTransportException ttx) {
    LOGGER.error("Error occurred during listening.", ttx);
    return;
  }

  // 
  // Run the preServe event
  if (eventHandler_ != null) {
    eventHandler_.preServe();
  }

  // 设置服务状态为启动
  setServing(true);

  // 
  while (!stopped_) {
    // 客户端传输方式
    TTransport client = null;
    // 对应方法的处理器 
    TProcessor processor = null;
    // 输入传输方式
    TTransport inputTransport = null;
    // 输出传输方式
    TTransport outputTransport = null;
    // 输入协议
    TProtocol inputProtocol = null;
    // 输出协议
    TProtocol outputProtocol = null;
    ServerContext connectionContext = null;
    try {
      // 接收请求   这儿会被阻塞 
      client = serverTransport_.accept();
      if (client != null) {
        processor = processorFactory_.getProcessor(client);
        inputTransport = inputTransportFactory_.getTransport(client);
        outputTransport = outputTransportFactory_.getTransport(client);
        inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
        outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
        if (eventHandler_ != null) {
          connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol);
        }
        // 循环处理请求
        while (true) {
          if (eventHandler_ != null) {
            // 
            eventHandler_.processContext(connectionContext, inputTransport, outputTransport);
          }
          // 调对应的方法 处理请求
          processor.process(inputProtocol, outputProtocol);
        }
      }
    } catch (TTransportException ttx) {
      // Client died, just move on
      LOGGER.debug("Client Transportation Exception", ttx);
    } catch (TException tx) {
      if (!stopped_) {
        LOGGER.error("Thrift error occurred during processing of message.", tx);
      }
    } catch (Exception x) {
      if (!stopped_) {
        LOGGER.error("Error occurred during processing of message.", x);
      }
    }

    // 删除上下文
    if (eventHandler_ != null) {
      eventHandler_.deleteContext(connectionContext, inputProtocol, outputProtocol);
    }

    // 关闭 传输层输入
    if (inputTransport != null) {
      inputTransport.close();
    }

    // 关闭 传输层输出
    if (outputTransport != null) {
      outputTransport.close();
    }

  }

  // 设置服务状态为 停止
  setServing(false);
}

(6.3) TThreadPoolServer

/**
 * 使用Java内置的ThreadPool管理的服务器来生成一个以阻塞方式处理客户端连接的工作线程池。
 * 
 * Server which uses Java's built in ThreadPool management to spawn off
 * a worker pool that deals with client connections in blocking way.
 */
public class TThreadPoolServer extends TServer {

  // 处理client请求的工作线程  
  // Executor service for handling client connections
  private ExecutorService executorService_;

  private final TimeUnit stopTimeoutUnit;
  
  // 线程池停止倒计时
  private final long stopTimeoutVal;

  public TThreadPoolServer(Args args) {
    super(args);

    stopTimeoutUnit = args.stopTimeoutUnit;
    stopTimeoutVal = args.stopTimeoutVal;

    executorService_ = args.executorService != null ?
        args.executorService : createDefaultExecutorService(args);
  }

  /**
   * 创建一个线程池
   * 设置了核心线程数、最大线程数、工作线程存活时间、阻塞队列
   */
  private static ExecutorService createDefaultExecutorService(Args args) {
    return new ThreadPoolExecutor(args.minWorkerThreads, args.maxWorkerThreads, 60L, TimeUnit.SECONDS,
        new SynchronousQueue<>(), new ThreadFactory() {
          final AtomicLong count = new AtomicLong();
          @Override
          public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setDaemon(true);
            thread.setName(String.format("TThreadPoolServer WorkerProcess-%d", count.getAndIncrement()));
            return thread;
          }
        });
  }

}

(6.3.1) serve()

/**
 * 启动服务
 * 循环处理请求
 * 服务优雅关闭
 */
public void serve() {
  // 
  if (!preServe()) {
    return;
  }

  // 这个方法执行时会阻塞到这儿,直到服务器状态关闭
  execute();

  // 线程池优雅关闭
  // 关闭线程池 不再接收新的任务
  executorService_.shutdownNow();
  
  // 等待线程池内任务完成 
  if (!waitForShutdown()) {
    LOGGER.error("Shutdown is not done after " + stopTimeoutVal + stopTimeoutUnit);
  }

  // 设置服务状态为关闭
  setServing(false);
}
/**
  * 预启动
  */
protected boolean preServe() {
  try {
    // 监听配置地址端口
    serverTransport_.listen();
  } catch (TTransportException ttx) {
    LOGGER.error("Error occurred during listening.", ttx);
    return false;
  }

  // Run the preServe event
  if (eventHandler_ != null) {
    eventHandler_.preServe();
  }
  stopped_ = false;
  // 设置服务状态为启动
  setServing(true);
  return true;
}
/**
 * 
 */
protected void execute() {
  // 服务器未停止,循环处理
  while (!stopped_) {
    try {
      // 接收调用端请求
      TTransport client = serverTransport_.accept();
      try {
        // 新建一个任务放入线程池执行
        executorService_.execute(new WorkerProcess(client));
      } catch (RejectedExecutionException ree) {
        // 线程池满后拒绝策略 
        if (!stopped_) {
          LOGGER.warn("ThreadPool is saturated with incoming requests. Closing latest connection.");
        }
        // 关闭和client的通信传输连接 
        client.close();
      }
    } catch (TTransportException ttx) {
      // 和client通信传输异常  不处理 
      if (!stopped_) { // 服务未关闭,打一条warn日志 
        LOGGER.warn("Transport error occurred during acceptance of message", ttx);
      }
    }
  }
}
/**
 * 线程池优雅停止
 * 等待任务执行完再停止
 */
protected boolean waitForShutdown() {
  // 循环直到 awaitTermination 最终返回没有中断的异常。
  // 如果我们不这样做,那么我们将过早关闭。我们想让 executorService 清除它的任务队列,适当地关闭客户端套接字。
  // 
  // Loop until awaitTermination finally does return without a interrupted
  // exception. If we don't do this, then we'll shut down prematurely. We want
  // to let the executorService clear it's task queue, closing client sockets
  // appropriately.
  long timeoutMS = stopTimeoutUnit.toMillis(stopTimeoutVal);
  long now = System.currentTimeMillis();
  while (timeoutMS >= 0) {
    try {
      // 
      return executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
    } catch (InterruptedException ix) {
      long newnow = System.currentTimeMillis();
      timeoutMS -= (newnow - now);
      now = newnow;
    }
  }
  return false;
}
/**
 * 任务处理线程 
 */
private class WorkerProcess implements Runnable {

  /**
   * Client that this services.
   */
  private TTransport client_;

  /**
   * Default constructor.
   *
   * @param client Transport to process
   */
  private WorkerProcess(TTransport client) {
    client_ = client;
  }

  /**
   * 一直循环处理客户端的请求
   * 
   * Loops on processing a client forever
   */
  public void run() {
    TProcessor processor = null;
    TTransport inputTransport = null;
    TTransport outputTransport = null;
    TProtocol inputProtocol = null;
    TProtocol outputProtocol = null;

    Optional<TServerEventHandler> eventHandler = Optional.empty();
    ServerContext connectionContext = null;

    try {
      // 获取 处理器
      processor = processorFactory_.getProcessor(client_);
      // 获取 传输层输入
      inputTransport = inputTransportFactory_.getTransport(client_);
      // 获取 传输层输出
      outputTransport = outputTransportFactory_.getTransport(client_);
      // 获取 协议层输入
      inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
      // 获取 协议层输出
      outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);

      // 事件处理器
      eventHandler = Optional.ofNullable(getEventHandler());

      if (eventHandler.isPresent()) {
        connectionContext = eventHandler.get().createContext(inputProtocol, outputProtocol);
      }

      while (true) {
        // 线程是否中断
        if (Thread.currentThread().isInterrupted()) {
          LOGGER.debug("WorkerProcess requested to shutdown");
          break;
        }
        // 
        if (eventHandler.isPresent()) {
          eventHandler.get().processContext(connectionContext, inputTransport, outputTransport);
        }

        // 核心处理逻辑 
        // 不能通过中断标识来中断这个线程
        // 这个线程处理完后会把消息返回(把消息写入传输层输出),或者套接字已经超时,此时它将返回并检查线程的中断状态
        // 
        // This process cannot be interrupted by Interrupting the Thread. This
        // will return once a message has been processed or the socket timeout
        // has elapsed, at which point it will return and check the interrupt
        // state of the thread.
        processor.process(inputProtocol, outputProtocol);
      }
    } catch (Exception x) {
      LOGGER.debug("Error processing request", x);

      // 解析异常 并判断异常是否需要忽略 
      // 会忽略掉所有传输层异常 
      //
      // We'll usually receive RuntimeException types here
      // Need to unwrap to ascertain real causing exception before we choose to ignore
      // Ignore err-logging all transport-level/type exceptions
      if (!isIgnorableException(x)) {
        // Log the exception at error level and continue
        LOGGER.error((x instanceof TException ? "Thrift " : "") + "Error occurred during processing of message.", x);
      }
    } finally {
      // 
      if (eventHandler.isPresent()) {
        eventHandler.get().deleteContext(connectionContext, inputProtocol, outputProtocol);
      }
      // 关闭传输层输入
      if (inputTransport != null) {
        inputTransport.close();
      }
      // 关闭传输层输出
      if (outputTransport != null) {
        outputTransport.close();
      }
      // 关闭 客户端连接
      if (client_.isOpen()) {
        client_.close();
      }
    }
  }

  /**
   * 判断异常是否可以忽略
   * 
   * 可以忽略超时异常 和 文件读结束异常
   */
  private boolean isIgnorableException(Exception x) {
    TTransportException tTransportException = null;

    if (x instanceof TTransportException) {
      tTransportException = (TTransportException) x;
    } else if (x.getCause() instanceof TTransportException) {
      tTransportException = (TTransportException) x.getCause();
    }

    if (tTransportException != null) {
      switch(tTransportException.getType()) {
        // 读完文件
        case TTransportException.END_OF_FILE:
        // 传输超时
        case TTransportException.TIMED_OUT:
          return true;
      }
    }
    return false;
  }
}

(6.4) AbstractNonblockingServer

/**
 * 提供非阻塞TServer实现类的 常用方法和公共类。
 * 
 * Provides common methods and classes used by nonblocking TServer
 * implementations.
 */
public abstract class AbstractNonblockingServer extends TServer {

  /**
   * 我们将一次分配给客户端 IO 缓冲区的最大内存量。
   * 如果没有这个限制,服务器会很乐意将客户端缓冲区分配导致内存不足异常,而不是等待。
   * 
   * The maximum amount of memory we will allocate to client IO buffers at a
   * time. Without this limit, the server will gladly allocate client buffers
   * right into an out of memory exception, rather than waiting.
   */
  final long MAX_READ_BUFFER_BYTES;

  /**
   * 当前分配给读取缓冲区的字节数。
   * 
   * How many bytes are currently allocated to read buffers.
   */
  final AtomicLong readBufferBytesAllocated = new AtomicLong(0);

  public AbstractNonblockingServer(AbstractNonblockingServerArgs args) {
    super(args);
    MAX_READ_BUFFER_BYTES = args.maxReadBufferBytes;
  }


}

(6.4.1) serve()


/**
 * 开始接受连接和处理调用。
 * 
 * Begin accepting connections and processing invocations.
 */
public void serve() {
  // 启动任何 IO 线程
  // start any IO threads
  if (!startThreads()) {
    return;
  }

  // 开始监听 或者退出
  // start listening, or exit
  if (!startListening()) {
    return;
  }

  // 设置服务状态为服务中
  setServing(true);

  // 服务时会阻塞在这儿  
  // this will block while we serve
  waitForShutdown();

  // 设置服务状态为停止服务
  setServing(false);

  // 优雅关闭  
  // 做一些清理工作
  // do a little cleanup
  stopListening();
}
/**
 * 启动服务所需的任何线程。
 * 
 * Starts any threads required for serving.
 *
 * @return true if everything went ok, false if threads could not be started.
 */
protected abstract boolean startThreads();

/**
 * 在处理服务的线程被关闭之前 将阻塞的方法
 * 
 * A method that will block until when threads handling the serving have been
 * shut down.
 */
protected abstract void waitForShutdown();

/**
 * 让服务器传输开始接受请求/连接。
 * 
 * Have the server transport start accepting connections.
 *
 * @return true if we started listening successfully, false if something went
 *         wrong.
 */
protected boolean startListening() {
  try {
    // 监听指定地址端口
    serverTransport_.listen();
    return true;
  } catch (TTransportException ttx) {
    LOGGER.error("Failed to start listening on server socket!", ttx);
    return false;
  }
}

/**
 * 停止监听连接
 * 
 * Stop listening for connections.
 */
protected void stopListening() {
  serverTransport_.close();
}
/**
 * FrameBuffer 状态机的可能状态。
 * 
 * Possible states for the FrameBuffer state machine.
 */
private enum FrameBufferState {
  // 从网络读取帧大小的过程中
  // in the midst of reading the frame size off the wire
  READING_FRAME_SIZE,
  
  // 在读取实际的帧数据,但还没有全部完成
  // reading the actual frame data now, but not all the way done yet
  READING_FRAME,

  // 完全读取帧,因此现在可以进行调用
  // completely read the frame, so an invocation can now happen
  READ_FRAME_COMPLETE,

  // 等待切换到监听写事件
  // waiting to get switched to listening for write events
  AWAITING_REGISTER_WRITE,

  // 开始写入响应数据,尚未完全完成
  // started writing response data, not fully complete yet
  WRITING,

  // 另一个线程希望这个帧缓冲区返回读取
  // another thread wants this framebuffer to go back to reading
  AWAITING_REGISTER_READ,

  // 我们希望我们的传输和选择键在选择器线程中失效
  // we want our transport and selection key invalidated in the selector
  // thread
  AWAITING_CLOSE
}

(6.5) TNonblockingServer

package org.apache.thrift.server;

/**
 * 
 * 非阻塞 TServer 实现。就调用而言,这允许所有连接的客户端之间的公平性。
 * 
 * 该服务器本质上是单线程的。如果您想要一个有限的线程池以及调用公平,请参阅 THsHaServer。
 * 
 * 要使用此服务器,您必须在最外层传输处使用 TFramedTransport,否则此服务器将无法确定何时已从线路读取整个方法调用。客户端还必须使用 TFramedTransport。
 * 
 * 
 * A nonblocking TServer implementation. This allows for fairness amongst all
 * connected clients in terms of invocations.
 *
 * This server is inherently single-threaded. If you want a limited thread pool
 * coupled with invocation-fairness, see THsHaServer.
 *
 * To use this server, you MUST use a TFramedTransport at the outermost
 * transport, otherwise this server will be unable to determine when a whole
 * method call has been read off the wire. Clients must also use TFramedTransport.
 */
public class TNonblockingServer extends AbstractNonblockingServer {

  // 选择接受线程
  private SelectAcceptThread selectAcceptThread_;

  public TNonblockingServer(AbstractNonblockingServerArgs args) {
    super(args);
  }

}

TNonblockingServer里有比较重要的几个方法 TNonblockingServer()startThreads()requestInvoke()waitForShutdown()joinSelector()isStopped() stop()
以及比较重要的类 SelectAcceptThread

(6.5.1) startThreads()

/**
 * Start the selector thread to deal with accepts and client messages.
 *
 * @return true if everything went ok, false if we couldn't start for some
 * reason.
 */
@Override
protected boolean startThreads() {
  // 启动 选择接受线程
  // start the selector
  try {
    // 新创建一个选择接受线程 
    selectAcceptThread_ = new SelectAcceptThread((TNonblockingServerTransport)serverTransport_);
    // 启动线程
    selectAcceptThread_.start();
    return true;
  } catch (IOException e) {
    LOGGER.error("Failed to start selector thread!", e);
    return false;
  }
}

(6.5.2) SelectAcceptThread 重点

/**
 * The thread that will be doing all the selecting, managing new connections
 * and those that still need to be read.
 */
protected class SelectAcceptThread extends AbstractSelectThread {

  // The server transport on which new client transports will be accepted
  private final TNonblockingServerTransport serverTransport;

  /**
   * Set up the thread that will handle the non-blocking accepts, reads, and
   * writes.
   */
  public SelectAcceptThread(final TNonblockingServerTransport serverTransport)
  throws IOException {
    this.serverTransport = serverTransport;
    serverTransport.registerSelector(selector);
  }
}

SelectAcceptThread类比较重要的几个方法 SelectAcceptThreadrun()select()handleAccept()

run()

/**
 * 工作循环。
 * 处理选择(所有 IO 操作)和管理所有现有连接的选择首选项。
 * 
 * The work loop. Handles both selecting (all IO operations) and managing
 * the selection preferences of all existing connections.
 */
public void run() {
  try {
    if (eventHandler_ != null) {
      eventHandler_.preServe();
    }

    // 只要服务不停止,就一直循环
    while (!stopped_) {
      // 选择和处理IO事件
      select();
      // 处理
      processInterestChanges();
    }

    for (SelectionKey selectionKey : selector.keys()) {
      // 清理
      cleanupSelectionKey(selectionKey);
    }
  } catch (Throwable t) {
    LOGGER.error("run() exiting due to uncaught error", t);
  } finally {
    try {
      selector.close();
    } catch (IOException e) {
      LOGGER.error("Got an IOException while closing selector!", e);
    }
    stopped_ = true;
  }
}

select()


/**
 * 适当地选择和处理 IO 事件:
 *   如果有要接受的连接,请接受它们。
 *   如果存在等待读取的数据的现有连接,则读取它,缓冲直到读取整个帧。
 *   如果有任何待处理的响应,缓冲它们直到它们的目标客户端可用,然后发送数据。
 * 
 * Select and process IO events appropriately:
 * If there are connections to be accepted, accept them.
 * If there are existing connections with data waiting to be read, read it,
 * buffering until a whole frame has been read.
 * If there are any pending responses, buffer them until their target client
 * is available, and then send the data.
 */
private void select() {
  try {
    // 等待IO事件
    // wait for io events.
    selector.select();

    // 当IO事件到达时处理 
    // process the io events we received
    Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
    // 服务器未停止并且有IO事件时 循环处理 
    while (!stopped_ && selectedKeys.hasNext()) {
      // 
      SelectionKey key = selectedKeys.next();
      selectedKeys.remove();

      // 跳过不可用的事件
      // skip if not valid
      if (!key.isValid()) {
        cleanupSelectionKey(key);
        continue;
      }

      // 如果密钥被标记为接受,那么它必须是服务器传输。 
      // if the key is marked Accept, then it has to be the server
      // transport.
      if (key.isAcceptable()) {
        // 处理接受请求/新建连接操作
        handleAccept();
      } else if (key.isReadable()) {
        // 处理读取操作
        // deal with reads
        handleRead(key);
      } else if (key.isWritable()) {
        // 处理写入操作
        // deal with writes
        handleWrite(key);
      } else {
        LOGGER.warn("Unexpected state in select! " + key.interestOps());
      }
    }
  } catch (IOException e) {
    LOGGER.warn("Got an IOException while selecting!", e);
  }
}

handleAccept()

/**
 * 接受新连接 
 * 
 * Accept a new connection.
 */
private void handleAccept() throws IOException {
  SelectionKey clientKey = null;
  TNonblockingTransport client = null;
  try {
    // 接受连接
    // accept the connection
    client = serverTransport.accept();
    // 注册选择器 读取操作
    clientKey = client.registerSelector(selector, SelectionKey.OP_READ);

    //  
    // add this key to the map
      FrameBuffer frameBuffer = createFrameBuffer(client, clientKey, SelectAcceptThread.this);

      // 在clientKey上附加frameBuffer
      clientKey.attach(frameBuffer);

  } catch (TTransportException tte) {
    // something went wrong accepting.
    LOGGER.warn("Exception trying to accept!", tte);
    if (clientKey != null) cleanupSelectionKey(clientKey);
    if (client != null) client.close();
  }
}
/**
 * 
 */
protected FrameBuffer createFrameBuffer(final TNonblockingTransport trans,
    final SelectionKey selectionKey,
    final AbstractSelectThread selectThread) throws TTransportException {
    return processorFactory_.isAsyncProcessor() ?
              new AsyncFrameBuffer(trans, selectionKey, selectThread) :
              new FrameBuffer(trans, selectionKey, selectThread);
}

(6.5.2) AbstractNonblockingServer

handleRead(key)

package org.apache.thrift.server;
// class AbstractNonblockingServer

/**
 * Do the work required to read from a readable client. If the frame is
 * fully read, then invoke the method call.
 */
protected void handleRead(SelectionKey key) {
  FrameBuffer buffer = (FrameBuffer) key.attachment();
  // 数据校验 如果buffer还未读取,直接返回
  if (!buffer.read()) {
    cleanupSelectionKey(key);
    return;
  }

  // buffer读取完成,调用方法
  // if the buffer's frame read is complete, invoke the method.
  if (buffer.isFrameFullyRead()) {
    // 
    if (!requestInvoke(buffer)) {
      // 
      cleanupSelectionKey(key);
    }
  }
}
package org.apache.thrift.server;
// class TNonblockingServer

/**
 * Perform an invocation. This method could behave several different ways
 * - invoke immediately inline, queue for separate execution, etc.
 */
@Override
protected boolean requestInvoke(FrameBuffer frameBuffer) {
  frameBuffer.invoke();
  return true;
}
package org.apache.thrift.server;
// class AbstractNonblockingServer

/**
 * Actually invoke the method signified by this FrameBuffer.
 */
public void invoke() {
  // 
  frameTrans_.reset(buffer_.array());
  response_.reset();

  try {
    if (eventHandler_ != null) {
      eventHandler_.processContext(context_, inTrans_, outTrans_);
    }
    // 获取处理器处理 
    // 调用对应的服务方法执行 
    processorFactory_.getProcessor(inTrans_).process(inProt_, outProt_);
    responseReady();
    return;
  } catch (TException te) {
    LOGGER.warn("Exception while invoking!", te);
  } catch (Throwable t) {
    LOGGER.error("Unexpected throwable while invoking!", t);
  }
  // This will only be reached when there is a throwable.
  state_ = FrameBufferState.AWAITING_CLOSE;
  requestSelectInterestChange();
}

handleWrite(key)

// 

waitForShutdown()

package org.apache.thrift.server;
// class TNonblockingServer 

  /**
   *  等待服务关闭
   */
  @Override
  protected void waitForShutdown() {
    // 
    joinSelector();
  }

  /**
   * 阻塞 直到 选择接受线程退出
   * 
   * Block until the selector thread exits.
   */
  protected void joinSelector() {
    // wait until the selector thread exits
    try {
      // 等待 选择接受线程 执行完
      selectAcceptThread_.join();
    } catch (InterruptedException e) {
      LOGGER.debug("Interrupted while waiting for accept thread", e);
      Thread.currentThread().interrupt();
    }
  }

stop()

/**
 * 停止服务
 * 
 * Stop serving and shut everything down.
 */
@Override
public void stop() {
  stopped_ = true;
  if (selectAcceptThread_ != null) {
    // 
    selectAcceptThread_.wakeupSelector();
  }
}

(6.6) THsHaServer

package org.apache.thrift.server;

/**
 * 半同步/半异步服务器(THsHaServer)是对TNonblockingServer的扩展。
 * 与 TNonblockingServer 一样,它依赖于 TFramedTransport 的使用。
 * 
 * An extension of the TNonblockingServer to a Half-Sync/Half-Async server.
 * Like TNonblockingServer, it relies on the use of TFramedTransport.
 */
public class THsHaServer extends TNonblockingServer {

  // This wraps all the functionality of queueing and thread pool management
  // for the passing of Invocations from the Selector to workers.
  private final ExecutorService invoker;

  private final Args args;

  /**
   * Create the server with the specified Args configuration
   */
  public THsHaServer(Args args) {
    super(args);

    invoker = args.executorService == null ? createInvokerPool(args) : args.executorService;
    this.args = args;
  }

}

(6.6.1) createInvokerPool()

/**
 * Helper to create an invoker pool
 */
protected static ExecutorService createInvokerPool(Args options) {
  int minWorkerThreads = options.minWorkerThreads;
  int maxWorkerThreads = options.maxWorkerThreads;
  int stopTimeoutVal = options.stopTimeoutVal;
  TimeUnit stopTimeoutUnit = options.stopTimeoutUnit;

  LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
  ExecutorService invoker = new ThreadPoolExecutor(minWorkerThreads,
    maxWorkerThreads, stopTimeoutVal, stopTimeoutUnit, queue);

  return invoker;
}

(6.6.2) requestInvoke()

/**
 * We override the standard invoke method here to queue the invocation for
 * invoker service instead of immediately invoking. The thread pool takes care
 * of the rest.
 */
@Override
protected boolean requestInvoke(FrameBuffer frameBuffer) {
  try {
    // 新建一个线程
    Runnable invocation = getRunnable(frameBuffer);
    // 
    invoker.execute(invocation);
    return true;
  } catch (RejectedExecutionException rx) {
    LOGGER.warn("ExecutorService rejected execution!", rx);
    return false;
  }
}

(6.6.3) waitForShutdown()

/**
 * {@inheritDoc}
 */
@Override
protected void waitForShutdown() {
  joinSelector();
  gracefullyShutdownInvokerPool();
}
protected void gracefullyShutdownInvokerPool() {
  // try to gracefully shut down the executor service
  invoker.shutdown();

  // Loop until awaitTermination finally does return without a interrupted
  // exception. If we don't do this, then we'll shut down prematurely. We want
  // to let the executorService clear it's task queue, closing client sockets
  // appropriately.
  long timeoutMS = args.stopTimeoutUnit.toMillis(args.stopTimeoutVal);
  long now = System.currentTimeMillis();
  while (timeoutMS >= 0) {
    try {
      // 优雅关闭
      // 
      invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
      break;
    } catch (InterruptedException ix) {
      long newnow = System.currentTimeMillis();
      timeoutMS -= (newnow - now);
      now = newnow;
    }
  }
}

(6.7) TThreadedSelectorServer

/**
 * 
 * 具有单独的线程池来处理非阻塞 I/O 的半同步/半异步服务器。
 * 接受在单个线程上处理,并且可配置数量的非阻塞选择器线程管理客户端连接的读取和写入。
 * 同步工作线程池处理请求的处理。
 * 
 * 当瓶颈是单个选择器线程处理 I/O 上的 CPU 时,在多核环境中的性能优于 TNonblockingServer/THsHaServer。此外,由于接受处理与读/写和调用分离,服务器有更好的能力处理来自新连接的背压(例如,忙时停止接受)。
 * 与 TNonblockingServer 一样,它依赖于 TFramedTransport 的使用。
 * 
 * A Half-Sync/Half-Async server with a separate pool of threads to handle
 * non-blocking I/O. Accepts are handled on a single thread, and a configurable
 * number of nonblocking selector threads manage reading and writing of client
 * connections. A synchronous worker thread pool handles processing of requests.
 *
 * Performs better than TNonblockingServer/THsHaServer in multi-core
 * environments when the the bottleneck is CPU on the single selector thread
 * handling I/O. In addition, because the accept handling is decoupled from
 * reads/writes and invocation, the server has better ability to handle back-
 * pressure from new connections (e.g. stop accepting when busy).
 *
 * Like TNonblockingServer, it relies on the use of TFramedTransport.
 */
public class TThreadedSelectorServer extends AbstractNonblockingServer {

  // 包装队列和线程池管理的所有功能,以便将调用从选择器线程传递到工作线程(如果有)。
  // 
  // This wraps all the functionality of queueing and thread pool management
  // for the passing of Invocations from the selector thread(s) to the workers
  // (if any).
  private final ExecutorService invoker;

  private final Args args;

  /**
   * Create the server with the specified Args configuration
   */
  public TThreadedSelectorServer(Args args) {
    super(args);
    args.validate();
    invoker = args.executorService == null ? createDefaultExecutor(args) : args.executorService;
    this.args = args;
  }

}

serve流程

  1. startThreads();
  2. startListening();
  3. setServing(true);
  4. waitForShutdown();
  5. setServing(false);
  6. stopListening();

(6.7.1) startThreads()

/**
 * Start the accept and selector threads running to deal with clients.
 *
 * @return true if everything went ok, false if we couldn't start for some
 *         reason.
 */
@Override
protected boolean startThreads() {
  try {
    for (int i = 0; i < args.selectorThreads; ++i) {
      selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread));
    }
    acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_,
      createSelectorThreadLoadBalancer(selectorThreads));
    for (SelectorThread thread : selectorThreads) {
      // 
      thread.start();
    }
    // 
    acceptThread.start();
    return true;
  } catch (IOException e) {
    LOGGER.error("Failed to start threads!", e);
    return false;
  }
}

(6.7.2) SelectorThread

/**
 * The SelectorThread(s) will be doing all the selecting on accepted active
 * connections.
 */
protected class SelectorThread extends AbstractSelectThread {

  // Accepted connections added by the accept thread.
  private final BlockingQueue<TNonblockingTransport> acceptedQueue;
  private int SELECTOR_AUTO_REBUILD_THRESHOLD = 512;
  private long MONITOR_PERIOD = 1000L;
  private int jvmBug = 0;

  /**
   * Set up the SelectorThread with an unbounded queue for incoming accepts.
   *
   * @throws IOException
   *           if a selector cannot be created
   */
  public SelectorThread() throws IOException {
    this(new LinkedBlockingQueue<TNonblockingTransport>());
  }

}

SelectorThread::run()

/**
 * The work loop. Handles selecting (read/write IO), dispatching, and
 * managing the selection preferences of all existing connections.
 */
public void run() {
  try {
    while (!stopped_) {
      // 
      select();
      // 
      processAcceptedConnections();
      // 
      processInterestChanges();
    }
    for (SelectionKey selectionKey : selector.keys()) {
      cleanupSelectionKey(selectionKey);
    }
  } catch (Throwable t) {
    LOGGER.error("run() on SelectorThread exiting due to uncaught error", t);
  } finally {
    try {
      selector.close();
    } catch (IOException e) {
      LOGGER.error("Got an IOException while closing selector!", e);
    }
    // This will wake up the accept thread and the other selector threads
    TThreadedSelectorServer.this.stop();
  }
}
SelectorThread::select()
/**
 * Select and process IO events appropriately: If there are existing
 * connections with data waiting to be read, read it, buffering until a
 * whole frame has been read. If there are any pending responses, buffer
 * them until their target client is available, and then send the data.
 */
private void select() {
  try {

    doSelect();

    // process the io events we received
    Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
    while (!stopped_ && selectedKeys.hasNext()) {
      SelectionKey key = selectedKeys.next();
      selectedKeys.remove();

      // skip if not valid
      if (!key.isValid()) {
        cleanupSelectionKey(key);
        continue;
      }

      if (key.isReadable()) {
        // deal with reads
        handleRead(key);
      } else if (key.isWritable()) {
        // deal with writes
        handleWrite(key);
      } else {
        LOGGER.warn("Unexpected state in select! " + key.interestOps());
      }
    }
  } catch (IOException e) {
    LOGGER.warn("Got an IOException while selecting!", e);
  }
}

doSelect()

/**
 * Do select and judge epoll bug happen.
 * See : https://issues.apache.org/jira/browse/THRIFT-4251
 */
private void doSelect() throws IOException {
  long beforeSelect = System.currentTimeMillis();
  int selectedNums = selector.select();
  long afterSelect = System.currentTimeMillis();

  if (selectedNums == 0) {
    jvmBug++;
  } else {
    jvmBug = 0;
  }

  long selectedTime = afterSelect - beforeSelect;
  if (selectedTime >= MONITOR_PERIOD) {
    jvmBug = 0;
  } else if (jvmBug > SELECTOR_AUTO_REBUILD_THRESHOLD) {
    LOGGER.warn("In {} ms happen {} times jvm bug; rebuilding selector.", MONITOR_PERIOD, jvmBug);
    rebuildSelector();
    selector.selectNow();
    jvmBug = 0;
  }

}

rebuildSelector()

/**
 * Replaces the current Selector of this SelectorThread with newly created Selector to work
 * around the infamous epoll 100% CPU bug.
 */
private synchronized void rebuildSelector() {
  final Selector oldSelector = selector;
  if (oldSelector == null) {
    return;
  }
  Selector newSelector = null;
  try {
    newSelector = Selector.open();
    LOGGER.warn("Created new Selector.");
  } catch (IOException e) {
    LOGGER.error("Create new Selector error.", e);
  }

  for (SelectionKey key : oldSelector.selectedKeys()) {
    if (!key.isValid() && key.readyOps() == 0)
      continue;
    SelectableChannel channel = key.channel();
    Object attachment = key.attachment();

    try {
      if (attachment == null) {
        channel.register(newSelector, key.readyOps());
      } else {
        channel.register(newSelector, key.readyOps(), attachment);
      }
    } catch (ClosedChannelException e) {
      LOGGER.error("Register new selector key error.", e);
    }

  }

  selector = newSelector;
  try {
    oldSelector.close();
  } catch (IOException e) {
    LOGGER.error("Close old selector error.", e);
  }
  LOGGER.warn("Replace new selector success.");
}
processAcceptedConnections()
private void processAcceptedConnections() {
  // Register accepted connections
  while (!stopped_) {
    TNonblockingTransport accepted = acceptedQueue.poll();
    if (accepted == null) {
      break;
    }
    registerAccepted(accepted);
  }
}

registerAccepted()

private void registerAccepted(TNonblockingTransport accepted) {
  SelectionKey clientKey = null;
  try {
    clientKey = accepted.registerSelector(selector, SelectionKey.OP_READ);

    FrameBuffer frameBuffer = createFrameBuffer(accepted, clientKey, SelectorThread.this);

    clientKey.attach(frameBuffer);
  } catch (IOException | TTransportException e) {
    LOGGER.warn("Failed to register accepted connection to selector!", e);
    if (clientKey != null) {
      cleanupSelectionKey(clientKey);
    }
    accepted.close();
  }
}
processInterestChanges();

processInterestChanges()方法用的父类 里的processInterestChanges()

package org.apache.thrift.server;
// class AbstractNonblockingServer

    /**
     * Check to see if there are any FrameBuffers that have switched their
     * interest type from read to write or vice versa.
     */
    protected void processInterestChanges() {
      synchronized (selectInterestChanges) {
        for (FrameBuffer fb : selectInterestChanges) {
          fb.changeSelectInterests();
        }
        selectInterestChanges.clear();
      }
    }

AcceptThread

/**
 * The thread that selects on the server transport (listen socket) and accepts
 * new connections to hand off to the IO selector threads
 */
protected class AcceptThread extends Thread {

  // The listen socket to accept on
  private final TNonblockingServerTransport serverTransport;
  private final Selector acceptSelector;

  private final SelectorThreadLoadBalancer threadChooser;

  /**
   * Set up the AcceptThead
   *
   * @throws IOException
   */
  public AcceptThread(TNonblockingServerTransport serverTransport,
      SelectorThreadLoadBalancer threadChooser) throws IOException {
    this.serverTransport = serverTransport;
    this.threadChooser = threadChooser;
    this.acceptSelector = SelectorProvider.provider().openSelector();
    this.serverTransport.registerSelector(acceptSelector);
  }

}

AcceptThread::run()

/**
 * The work loop. Selects on the server transport and accepts. If there was
 * a server transport that had blocking accepts, and returned on blocking
 * client transports, that should be used instead
 */
public void run() {
  try {
    if (eventHandler_ != null) {
      eventHandler_.preServe();
    }

    while (!stopped_) {
      select();
    }
  } catch (Throwable t) {
    LOGGER.error("run() on AcceptThread exiting due to uncaught error", t);
  } finally {
    try {
      acceptSelector.close();
    } catch (IOException e) {
      LOGGER.error("Got an IOException while closing accept selector!", e);
    }
    // This will wake up the selector threads
    TThreadedSelectorServer.this.stop();
  }
}

AcceptThread::select()

/**
 * Select and process IO events appropriately: If there are connections to
 * be accepted, accept them.
 */
private void select() {
  try {
    // wait for connect events.
    acceptSelector.select();

    // process the io events we received
    Iterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator();
    while (!stopped_ && selectedKeys.hasNext()) {
      SelectionKey key = selectedKeys.next();
      selectedKeys.remove();

      // skip if not valid
      if (!key.isValid()) {
        continue;
      }

      if (key.isAcceptable()) {
        // 处理
        handleAccept();
      } else {
        LOGGER.warn("Unexpected state in select! " + key.interestOps());
      }
    }
  } catch (IOException e) {
    LOGGER.warn("Got an IOException while selecting!", e);
  }
}

handleAccept()

/**
 * Accept a new connection.
 */
private void handleAccept() {
  final TNonblockingTransport client = doAccept();
  if (client != null) {
    // Pass this connection to a selector thread
    final SelectorThread targetThread = threadChooser.nextThread();

    if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) {
      doAddAccept(targetThread, client);
    } else {
      // FAIR_ACCEPT
      try {
        invoker.submit(new Runnable() {
          public void run() {
            doAddAccept(targetThread, client);
          }
        });
      } catch (RejectedExecutionException rx) {
        LOGGER.warn("ExecutorService rejected accept registration!", rx);
        // close immediately
        client.close();
      }
    }
  }
}
private void doAddAccept(SelectorThread thread, TNonblockingTransport client) {
  if (!thread.addAcceptedConnection(client)) {
    client.close();
  }
}

requestInvoke()

/**
 * We override the standard invoke method here to queue the invocation for
 * invoker service instead of immediately invoking. If there is no thread
 * pool, handle the invocation inline on this thread
 */
@Override
protected boolean requestInvoke(FrameBuffer frameBuffer) {
  // 
  Runnable invocation = getRunnable(frameBuffer);
  if (invoker != null) {
    try {
      // 
      invoker.execute(invocation);
      return true;
    } catch (RejectedExecutionException rx) {
      LOGGER.warn("ExecutorService rejected execution!", rx);
      return false;
    }
  } else {
    // Invoke on the caller's thread
    invocation.run();
    return true;
  }
}
protected Runnable getRunnable(FrameBuffer frameBuffer) {
  return new Invocation(frameBuffer);
}

waitForShutdown()

/**
 * Joins the accept and selector threads and shuts down the executor service.
 */
@Override
protected void waitForShutdown() {
  try {
    joinThreads();
  } catch (InterruptedException e) {
    // Non-graceful shutdown occurred
    LOGGER.error("Interrupted while joining threads!", e);
  }
  gracefullyShutdownInvokerPool();
}
protected void joinThreads() throws InterruptedException {
  // wait until the io threads exit
  acceptThread.join();
  for (SelectorThread thread : selectorThreads) {
    thread.join();
  }
}
protected void gracefullyShutdownInvokerPool() {
  // try to gracefully shut down the executor service
  invoker.shutdown();

  // Loop until awaitTermination finally does return without a interrupted
  // exception. If we don't do this, then we'll shut down prematurely. We want
  // to let the executorService clear it's task queue, closing client sockets
  // appropriately.
  long timeoutMS = args.stopTimeoutUnit.toMillis(args.stopTimeoutVal);
  long now = System.currentTimeMillis();
  while (timeoutMS >= 0) {
    try {
      invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
      break;
    } catch (InterruptedException ix) {
      long newnow = System.currentTimeMillis();
      timeoutMS -= (newnow - now);
      now = newnow;
    }
  }
}

stop()

/**
 * Stop serving and shut everything down.
 */
@Override
public void stop() {
  stopped_ = true;

  // Stop queuing connect attempts asap
  stopListening();

  if (acceptThread != null) {
    acceptThread.wakeupSelector();
  }
  if (selectorThreads != null) {
    for (SelectorThread thread : selectorThreads) {
      if (thread != null)
        thread.wakeupSelector();
    }
  }
}

问题

(1) read tcp i/o timeout

thrift server start  listened_addr:8081

17:22:14 error processing request: read tcp 127.0.0.1:8081->127.0.0.1:57899: i/o timeout
17:22:14 errmsg=read tcp 127.0.0.1:8081->127.0.0.1:57899: i/o timeout
17:22:14 error processing request: read tcp 127.0.0.1:8081->127.0.0.1:57899: i/o timeout

17:26:40 error processing request: read tcp 127.0.0.1:8081->127.0.0.1:59781: i/o timeout
17:26:40 error processing request: read tcp 127.0.0.1:8081->127.0.0.1:59781: i/o timeout

server读数据超时

write: broken pipe

errmsg=write tcp 10.157.121.37:8300->10.159.184.191:57066: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:58842: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:50344: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:38252: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:22356: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:50002: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:32572: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:59450: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:59320: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:41668: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:36668: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:60856: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:49126: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:30506: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:55618: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:22438: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:43892: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:26776: write: broken pipe

这个问题比较有意思,client->server,client有连接池存了server的ip

Incorrect frame size

2022/11/18 14:46:43 error processing request: Incorrect frame size (1347375956)
2022/11/18 14:46:43 error processing request: Incorrect frame size (1347375956)

connection reset by peer

2022/11/18 16:31:05 error processing request: read tcp 127.0.0.1:8081->127.0.0.1:62781: read: connection reset by peer

2022/11/18 16:31:14 error processing request: read tcp 127.0.0.1:8081->127.0.0.1:62635: read: connection reset by peer
2022/11/18 16:31:14 error processing request: read tcp 127.0.0.1:8081->127.0.0.1:62706: read: connection reset by peer

在thrift Server这边Debug时,client发送请求早已经超时断开,这个时候Server再往连接管道里写数据,就会报 connection reset by peer

参考资料

[1] Apache Thrift系列详解(二) - 网络服务模型
[2] 由浅入深了解Thrift(三)——Thrift server端的几种工作模式分析
[3] 6.Thrift指南 thrift go源码解析 1
[4] [原创](翻译)Java版的各种Thrift server实现的比较
[5] 从网络IO到Thrift网络模型
[6] 03. Apache thrift 之网络模型