thrift Server
(1) thrift Server 作用
Server将 thrift 所有功能整合到一起:
1、创建一个 Transport;
2、创建 Transport 使用的 I/O Protocol;
3、为 I/O Protocol 创建 Processor;
4、启动服务,等待客户端的连接;
thrift不同语言实现提供的服务器端的模式不一样
thrift Java版本为服务器端提供了多种模式: TSimpleServer 、 TThreadPoolServer 、 TNonblockingServer 、 THsHaServer 、 TThreadedSelectorServer
Thrift Go版本为服务器端提供了 TSimpleServer
IO模型 | Java | Go | 特点 |
---|---|---|---|
阻塞IO | TSimpleServer | - | 只有一个工作线程,循环监听新请求的到来并完成对请求的处理,一次只能接收和处理一个socket连接,效率比较低。 |
阻塞IO | TThreadPoolServer | TSimpleServer | |
IO多路复用 | TNonblockingServer | - | TNonblockingServer 单线程工作,采用NIO的方式,所有的socket都被注册到selector中 |
从是否为阻塞IO角度划分为:阻塞服务模型、非阻塞服务模型。
阻塞服务模型:TSimpleServer、TThreadPoolServer。
非阻塞服务模型:TNonblockingServer、THsHaServer 和 TThreadedSelectorServer。
从网络服务模型角度划分为:单线程/协程、多线程/协程、事件驱动
(2) Server启动及处理请求流程
- Server初始化
1.1 创建 ServerHandler 对应 calculatorHandler
1.2 为 ServerHandler 添加(注册) processor, 这些processor都实现了thrift.TProcessorFunction
接口
1.3 创建 ServerTransport (Server的主要实现)
1.4 根据 协议工厂(protocolFactory)、传输方式工厂(transportFactory)、服务传输方式(ServerTransport)、服务类(ServerHandler) 创建Server - Server启动
2.1 启动server 监听配置地址端口,
2.2
2.3 并循环处理请求 - Server处理请求
3.1 获取对应的processor 从ServerHandler注册的 - Server关闭
(3) Demo-Go
源码 https://github.com/weikeqin/thrift-tutorial-go-demo
func main() {
addr := flag.String("addr", "localhost:9090", "Address to listen to")
var protocolFactory thrift.TProtocolFactory
var transportFactory thrift.TTransportFactory
var transport thrift.TServerTransport
// 省略部分代码
// CalculatorHandler 实现了 Calculator 接口
calculatorHandler := NewCalculatorHandler()
// CalculatorProcessor 实现了 TProcessor 接口
calculatorProcessor := tutorial.NewCalculatorProcessor(calculatorHandler)
// 创建 TSimpleServer
server := thrift.NewTSimpleServer4(calculatorProcessor, transport, transportFactory, protocolFactory)
fmt.Println("Starting the simple server... on ", addr)
// 启动server
err := server.Serve()
if err != nil {
fmt.Println(err)
}
fmt.Println("server start")
}
(4) 源码分析-go
(4.1) TSimpleServer
首先看一下 TSimpleServer 的结构,里面标识了 服务状态、服务任务等待组、互斥锁、处理类的工厂、服务通信方式、输入通信方式工厂
/*
* 这不是典型的TSimpleServer,因为它在接受套接字后不会被阻塞。
* 它更像是一个TThreadedServer,可以在不同的goroutine中处理不同的连接。
* 如果golang用户在客户端实现了一个类似conn池的东西,这将起作用。
*
* This is not a typical TSimpleServer as it is not blocked after accept a socket.
* It is more like a TThreadedServer that can handle different connections in different goroutines.
* This will work if golang user implements a conn-pool like thing in client side.
*/
type TSimpleServer struct {
closed int32 // 关闭
wg sync.WaitGroup // 等待任务组
mu sync.Mutex // 互斥锁
processorFactory TProcessorFactory // 处理器工厂
serverTransport TServerTransport // 服务通信 server具体实现
inputTransportFactory TTransportFactory // 传输层输入工厂
outputTransportFactory TTransportFactory // 传输层输出工厂
inputProtocolFactory TProtocolFactory // 协议层输入工厂
outputProtocolFactory TProtocolFactory // 协议层输出工厂
// Headers to auto forward in THeaderProtocol
forwardHeaders []string
logger Logger
}
(4.1.1) Serve
Serve是启动服务的方法,主要做了2件事
1、监听指定地址及端口。
2、开始accept阻塞,循环 接收并处理客户端的请求。
//
func (p *TSimpleServer) Serve() error {
p.logger = fallbackLogger(p.logger)
// 开启监听
err := p.Listen()
if err != nil {
return err
}
// 阻塞,循环接收并处理客户端的请求
p.AcceptLoop()
return nil
}
(4.1.2) Listen()
// 监听
func (p *TSimpleServer) Listen() error {
return p.serverTransport.Listen()
}
(4.1.3) AcceptLoop()
// 循环接收请求
func (p *TSimpleServer) AcceptLoop() error {
for {
// 内部接受
closed, err := p.innerAccept()
// 这块出错只能是serverTransport接收请求出错
if err != nil {
// 出错,跳出循环,返回
return err
}
// closed=0是服务器开启状态
if closed != 0 {
// 服务关闭,跳出循环,返回
return nil
}
}
}
(4.1.4) innerAccept
func (p *TSimpleServer) innerAccept() (int32, error) {
// 接收请求 没有请求会被阻塞到这儿
client, err := p.serverTransport.Accept()
// 加互斥锁 保证处理请求并发安全
p.mu.Lock()
defer p.mu.Unlock()
// 获取closed的值
closed := atomic.LoadInt32(&p.closed)
// 检查服务是否关闭 closed=1是服务关闭
if closed != 0 {
return closed, nil
}
// 从serverTransport接收请求是否异常
if err != nil {
return 0, err
}
// client是传输方式
if client != nil {
p.wg.Add(1)
// 开一个goroutine单独处理
go func() {
defer p.wg.Done()
// 处理请求
if err := p.processRequests(client); err != nil {
p.logger(fmt.Sprintf("error processing request: %v", err))
}
}()
}
return 0, nil
}
(4.1.5) processRequests
// 处理请求
func (p *TSimpleServer) processRequests(client TTransport) (err error) {
defer func() {
err = treatEOFErrorsAsNil(err)
}()
// 获取处理器
// 这里获取到的是*tutorial.CalculatorProcessor
processor := p.processorFactory.GetProcessor(client)
// 获取传输层输入
inputTransport, err := p.inputTransportFactory.GetTransport(client)
if err != nil {
return err
}
// 协议层输入
inputProtocol := p.inputProtocolFactory.GetProtocol(inputTransport)
var outputTransport TTransport
var outputProtocol TProtocol
// for THeaderProtocol, we must use the same protocol instance for
// input and output so that the response is in the same dialect that
// the server detected the request was in.
headerProtocol, ok := inputProtocol.(*THeaderProtocol)
if ok {
outputProtocol = inputProtocol
} else {
// 传输层输出
oTrans, err := p.outputTransportFactory.GetTransport(client)
if err != nil {
return err
}
outputTransport = oTrans
// 协议层输出
outputProtocol = p.outputProtocolFactory.GetProtocol(outputTransport)
}
if inputTransport != nil {
defer inputTransport.Close()
}
if outputTransport != nil {
defer outputTransport.Close()
}
for {
if atomic.LoadInt32(&p.closed) != 0 {
return nil
}
ctx := SetResponseHelper(
defaultCtx,
TResponseHelper{
THeaderResponseHelper: NewTHeaderResponseHelper(outputProtocol),
},
)
if headerProtocol != nil {
// We need to call ReadFrame here, otherwise we won't
// get any headers on the AddReadTHeaderToContext call.
//
// ReadFrame is safe to be called multiple times so it
// won't break when it's called again later when we
// actually start to read the message.
if err := headerProtocol.ReadFrame(ctx); err != nil {
return err
}
ctx = AddReadTHeaderToContext(ctx, headerProtocol.GetReadHeaders())
ctx = SetWriteHeaderList(ctx, p.forwardHeaders)
}
// 调用对应处理方法处理请求
// 返回数据会写入到 outputProtocol的TTransport里
ok, err := processor.Process(ctx, inputProtocol, outputProtocol)
// 主动断开
if errors.Is(err, ErrAbandonRequest) {
return client.Close()
}
// 通信异常
if errors.As(err, new(TTransportException)) && err != nil {
// 经常遇到的错误 read: connection reset by peer 就是这儿返回的
// read tcp 127.0.0.1:9090->127.0.0.1:58470: read: connection reset by peer
// 还有 EOF
return err
}
var tae TApplicationException
if errors.As(err, &tae) && tae.TypeId() == UNKNOWN_METHOD {
continue
}
if !ok {
break
}
}
return nil
}
//
func (p *TSimpleServer) Stop() error {
p.mu.Lock() // 加互斥锁
defer p.mu.Unlock() // 互斥锁解锁
// 如果已经关闭 直接返回 closed=1是关闭 closed=0是开启
if atomic.LoadInt32(&p.closed) != 0 {
return nil
}
// 设置closed=1 关闭
atomic.StoreInt32(&p.closed, 1)
// 发送中断信号
p.serverTransport.Interrupt()
// 等待所有请求处理完
p.wg.Wait()
return nil
}
TServerTransport
// 服务通信方式
// Server transport. Object which provides client transports.
type TServerTransport interface {
Listen() error
Accept() (TTransport, error)
Close() error
// Optional method implementation. This signals to the server transport
// that it should break out of any accept() or listen() that it is currently
// blocked on. This method, if implemented, MUST be thread safe, as it may
// be called from a different thread context than the other TServerTransport
// methods.
Interrupt() error
}
TServerTransport接口的实现类有2个: TServerSocket
、TSSLServerSocket
//
type TServerSocket struct {
listener net.Listener
addr net.Addr
clientTimeout time.Duration
// Protects the interrupted value to make it thread safe.
mu sync.RWMutex
interrupted bool
}
//
func (p *TServerSocket) Listen() error {
// 加互斥锁 保护p.listener
p.mu.Lock()
defer p.mu.Unlock()
// 已经在监听 返回
if p.IsListening() {
return nil
}
// 监听指定地址端口
l, err := net.Listen(p.addr.Network(), p.addr.String())
if err != nil {
return err
}
// 赋值
p.listener = l
return nil
}
(5) Demo-Java
//
(6) 源码解析Java
(6.1) TServer
package org.apache.thrift.server;
/**
* 抽象 thrift服务
*
* Generic interface for a Thrift server.
*
*/
public abstract class TServer {
/**
* 核心处理工厂
* Core processor
*/
protected TProcessorFactory processorFactory_;
/**
* 服务通信
* Server transport
*/
protected TServerTransport serverTransport_;
/**
* 传输层输入工厂
* Input Transport Factory
*/
protected TTransportFactory inputTransportFactory_;
/**
* 传输层输出工厂
* Output Transport Factory
*/
protected TTransportFactory outputTransportFactory_;
/**
* 协议层输入工厂
* Input Protocol Factory
*/
protected TProtocolFactory inputProtocolFactory_;
/**
* 协议层输出工厂
* Output Protocol Factory
*/
protected TProtocolFactory outputProtocolFactory_;
/**
* 服务状态
* 启动 关闭
* volatile保证可见性
*/
private volatile boolean isServing;
protected TServerEventHandler eventHandler_;
// Flag for stopping the server
// Please see THRIFT-1795 for the usage of this flag
protected volatile boolean stopped_ = false;
/**
* 构造方法
* 设置处理工厂、服务通信方式、输入传输方式、输出通信方式、输入协议工厂、
*/
protected TServer(AbstractServerArgs args) {
processorFactory_ = args.processorFactory;
serverTransport_ = args.serverTransport;
inputTransportFactory_ = args.inputTransportFactory;
outputTransportFactory_ = args.outputTransportFactory;
inputProtocolFactory_ = args.inputProtocolFactory;
outputProtocolFactory_ = args.outputProtocolFactory;
}
public static class Args extends AbstractServerArgs<Args> {
public Args(TServerTransport transport) {
super(transport);
}
}
/**
* run方法启动服务器并开始运行。
*
* The run method fires up the server and gets things going.
*/
public abstract void serve();
/**
* 停止服务
* 这在每个实现的基础上是可选的。并非所有服务器都要求优雅停止。
*
* Stop the server. This is optional on a per-implementation basis. Not
* all servers are required to be cleanly stoppable.
*/
public void stop() {}
}
可以看到TServer
是一个抽象类
- 里面定义了创建server需要的信息:处理器工厂、服务通信方式(server的实现)、传输层输入工厂、传输层输出工厂、协议层输入工厂、协议层输出工厂
- 定义了TServer的构造方法
- 服务启动方法
serve()
是一个抽象方法,需要子类自己去实现; - 服务关闭方法
stop()
是一个普通方法,子类可以选择是否实现。
(6.2) TSimpleServer
Java实现的 TSimpleServer
工作模式采用最简单的阻塞IO,实现简洁明了,比较适合入门学习。
用于演示Thrift的工作流程,测试用,不会在正式环境使用。
Java实现的 TSimpleServer
一次只能接收和处理一个socket连接,效率比较低。
package org.apache.thrift.server;
/**
* Simple singlethreaded server for testing.
*
*/
public class TSimpleServer extends TServer {
public TSimpleServer(AbstractServerArgs args) {
super(args);
}
}
(6.2.1) serve
TSimpleServer启动流程
// 启动serve
public void serve() {
try {
// 监听配置的地址和端口
serverTransport_.listen();
} catch (TTransportException ttx) {
LOGGER.error("Error occurred during listening.", ttx);
return;
}
//
// Run the preServe event
if (eventHandler_ != null) {
eventHandler_.preServe();
}
// 设置服务状态为启动
setServing(true);
//
while (!stopped_) {
// 客户端传输方式
TTransport client = null;
// 对应方法的处理器
TProcessor processor = null;
// 输入传输方式
TTransport inputTransport = null;
// 输出传输方式
TTransport outputTransport = null;
// 输入协议
TProtocol inputProtocol = null;
// 输出协议
TProtocol outputProtocol = null;
ServerContext connectionContext = null;
try {
// 接收请求 这儿会被阻塞
client = serverTransport_.accept();
if (client != null) {
processor = processorFactory_.getProcessor(client);
inputTransport = inputTransportFactory_.getTransport(client);
outputTransport = outputTransportFactory_.getTransport(client);
inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
if (eventHandler_ != null) {
connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol);
}
// 循环处理请求
while (true) {
if (eventHandler_ != null) {
//
eventHandler_.processContext(connectionContext, inputTransport, outputTransport);
}
// 调对应的方法 处理请求
processor.process(inputProtocol, outputProtocol);
}
}
} catch (TTransportException ttx) {
// Client died, just move on
LOGGER.debug("Client Transportation Exception", ttx);
} catch (TException tx) {
if (!stopped_) {
LOGGER.error("Thrift error occurred during processing of message.", tx);
}
} catch (Exception x) {
if (!stopped_) {
LOGGER.error("Error occurred during processing of message.", x);
}
}
// 删除上下文
if (eventHandler_ != null) {
eventHandler_.deleteContext(connectionContext, inputProtocol, outputProtocol);
}
// 关闭 传输层输入
if (inputTransport != null) {
inputTransport.close();
}
// 关闭 传输层输出
if (outputTransport != null) {
outputTransport.close();
}
}
// 设置服务状态为 停止
setServing(false);
}
(6.3) TThreadPoolServer
/**
* 使用Java内置的ThreadPool管理的服务器来生成一个以阻塞方式处理客户端连接的工作线程池。
*
* Server which uses Java's built in ThreadPool management to spawn off
* a worker pool that deals with client connections in blocking way.
*/
public class TThreadPoolServer extends TServer {
// 处理client请求的工作线程
// Executor service for handling client connections
private ExecutorService executorService_;
private final TimeUnit stopTimeoutUnit;
// 线程池停止倒计时
private final long stopTimeoutVal;
public TThreadPoolServer(Args args) {
super(args);
stopTimeoutUnit = args.stopTimeoutUnit;
stopTimeoutVal = args.stopTimeoutVal;
executorService_ = args.executorService != null ?
args.executorService : createDefaultExecutorService(args);
}
/**
* 创建一个线程池
* 设置了核心线程数、最大线程数、工作线程存活时间、阻塞队列
*/
private static ExecutorService createDefaultExecutorService(Args args) {
return new ThreadPoolExecutor(args.minWorkerThreads, args.maxWorkerThreads, 60L, TimeUnit.SECONDS,
new SynchronousQueue<>(), new ThreadFactory() {
final AtomicLong count = new AtomicLong();
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName(String.format("TThreadPoolServer WorkerProcess-%d", count.getAndIncrement()));
return thread;
}
});
}
}
(6.3.1) serve()
/**
* 启动服务
* 循环处理请求
* 服务优雅关闭
*/
public void serve() {
//
if (!preServe()) {
return;
}
// 这个方法执行时会阻塞到这儿,直到服务器状态关闭
execute();
// 线程池优雅关闭
// 关闭线程池 不再接收新的任务
executorService_.shutdownNow();
// 等待线程池内任务完成
if (!waitForShutdown()) {
LOGGER.error("Shutdown is not done after " + stopTimeoutVal + stopTimeoutUnit);
}
// 设置服务状态为关闭
setServing(false);
}
/**
* 预启动
*/
protected boolean preServe() {
try {
// 监听配置地址端口
serverTransport_.listen();
} catch (TTransportException ttx) {
LOGGER.error("Error occurred during listening.", ttx);
return false;
}
// Run the preServe event
if (eventHandler_ != null) {
eventHandler_.preServe();
}
stopped_ = false;
// 设置服务状态为启动
setServing(true);
return true;
}
/**
*
*/
protected void execute() {
// 服务器未停止,循环处理
while (!stopped_) {
try {
// 接收调用端请求
TTransport client = serverTransport_.accept();
try {
// 新建一个任务放入线程池执行
executorService_.execute(new WorkerProcess(client));
} catch (RejectedExecutionException ree) {
// 线程池满后拒绝策略
if (!stopped_) {
LOGGER.warn("ThreadPool is saturated with incoming requests. Closing latest connection.");
}
// 关闭和client的通信传输连接
client.close();
}
} catch (TTransportException ttx) {
// 和client通信传输异常 不处理
if (!stopped_) { // 服务未关闭,打一条warn日志
LOGGER.warn("Transport error occurred during acceptance of message", ttx);
}
}
}
}
/**
* 线程池优雅停止
* 等待任务执行完再停止
*/
protected boolean waitForShutdown() {
// 循环直到 awaitTermination 最终返回没有中断的异常。
// 如果我们不这样做,那么我们将过早关闭。我们想让 executorService 清除它的任务队列,适当地关闭客户端套接字。
//
// Loop until awaitTermination finally does return without a interrupted
// exception. If we don't do this, then we'll shut down prematurely. We want
// to let the executorService clear it's task queue, closing client sockets
// appropriately.
long timeoutMS = stopTimeoutUnit.toMillis(stopTimeoutVal);
long now = System.currentTimeMillis();
while (timeoutMS >= 0) {
try {
//
return executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
} catch (InterruptedException ix) {
long newnow = System.currentTimeMillis();
timeoutMS -= (newnow - now);
now = newnow;
}
}
return false;
}
/**
* 任务处理线程
*/
private class WorkerProcess implements Runnable {
/**
* Client that this services.
*/
private TTransport client_;
/**
* Default constructor.
*
* @param client Transport to process
*/
private WorkerProcess(TTransport client) {
client_ = client;
}
/**
* 一直循环处理客户端的请求
*
* Loops on processing a client forever
*/
public void run() {
TProcessor processor = null;
TTransport inputTransport = null;
TTransport outputTransport = null;
TProtocol inputProtocol = null;
TProtocol outputProtocol = null;
Optional<TServerEventHandler> eventHandler = Optional.empty();
ServerContext connectionContext = null;
try {
// 获取 处理器
processor = processorFactory_.getProcessor(client_);
// 获取 传输层输入
inputTransport = inputTransportFactory_.getTransport(client_);
// 获取 传输层输出
outputTransport = outputTransportFactory_.getTransport(client_);
// 获取 协议层输入
inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
// 获取 协议层输出
outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
// 事件处理器
eventHandler = Optional.ofNullable(getEventHandler());
if (eventHandler.isPresent()) {
connectionContext = eventHandler.get().createContext(inputProtocol, outputProtocol);
}
while (true) {
// 线程是否中断
if (Thread.currentThread().isInterrupted()) {
LOGGER.debug("WorkerProcess requested to shutdown");
break;
}
//
if (eventHandler.isPresent()) {
eventHandler.get().processContext(connectionContext, inputTransport, outputTransport);
}
// 核心处理逻辑
// 不能通过中断标识来中断这个线程
// 这个线程处理完后会把消息返回(把消息写入传输层输出),或者套接字已经超时,此时它将返回并检查线程的中断状态
//
// This process cannot be interrupted by Interrupting the Thread. This
// will return once a message has been processed or the socket timeout
// has elapsed, at which point it will return and check the interrupt
// state of the thread.
processor.process(inputProtocol, outputProtocol);
}
} catch (Exception x) {
LOGGER.debug("Error processing request", x);
// 解析异常 并判断异常是否需要忽略
// 会忽略掉所有传输层异常
//
// We'll usually receive RuntimeException types here
// Need to unwrap to ascertain real causing exception before we choose to ignore
// Ignore err-logging all transport-level/type exceptions
if (!isIgnorableException(x)) {
// Log the exception at error level and continue
LOGGER.error((x instanceof TException ? "Thrift " : "") + "Error occurred during processing of message.", x);
}
} finally {
//
if (eventHandler.isPresent()) {
eventHandler.get().deleteContext(connectionContext, inputProtocol, outputProtocol);
}
// 关闭传输层输入
if (inputTransport != null) {
inputTransport.close();
}
// 关闭传输层输出
if (outputTransport != null) {
outputTransport.close();
}
// 关闭 客户端连接
if (client_.isOpen()) {
client_.close();
}
}
}
/**
* 判断异常是否可以忽略
*
* 可以忽略超时异常 和 文件读结束异常
*/
private boolean isIgnorableException(Exception x) {
TTransportException tTransportException = null;
if (x instanceof TTransportException) {
tTransportException = (TTransportException) x;
} else if (x.getCause() instanceof TTransportException) {
tTransportException = (TTransportException) x.getCause();
}
if (tTransportException != null) {
switch(tTransportException.getType()) {
// 读完文件
case TTransportException.END_OF_FILE:
// 传输超时
case TTransportException.TIMED_OUT:
return true;
}
}
return false;
}
}
(6.4) AbstractNonblockingServer
/**
* 提供非阻塞TServer实现类的 常用方法和公共类。
*
* Provides common methods and classes used by nonblocking TServer
* implementations.
*/
public abstract class AbstractNonblockingServer extends TServer {
/**
* 我们将一次分配给客户端 IO 缓冲区的最大内存量。
* 如果没有这个限制,服务器会很乐意将客户端缓冲区分配导致内存不足异常,而不是等待。
*
* The maximum amount of memory we will allocate to client IO buffers at a
* time. Without this limit, the server will gladly allocate client buffers
* right into an out of memory exception, rather than waiting.
*/
final long MAX_READ_BUFFER_BYTES;
/**
* 当前分配给读取缓冲区的字节数。
*
* How many bytes are currently allocated to read buffers.
*/
final AtomicLong readBufferBytesAllocated = new AtomicLong(0);
public AbstractNonblockingServer(AbstractNonblockingServerArgs args) {
super(args);
MAX_READ_BUFFER_BYTES = args.maxReadBufferBytes;
}
}
(6.4.1) serve()
/**
* 开始接受连接和处理调用。
*
* Begin accepting connections and processing invocations.
*/
public void serve() {
// 启动任何 IO 线程
// start any IO threads
if (!startThreads()) {
return;
}
// 开始监听 或者退出
// start listening, or exit
if (!startListening()) {
return;
}
// 设置服务状态为服务中
setServing(true);
// 服务时会阻塞在这儿
// this will block while we serve
waitForShutdown();
// 设置服务状态为停止服务
setServing(false);
// 优雅关闭
// 做一些清理工作
// do a little cleanup
stopListening();
}
/**
* 启动服务所需的任何线程。
*
* Starts any threads required for serving.
*
* @return true if everything went ok, false if threads could not be started.
*/
protected abstract boolean startThreads();
/**
* 在处理服务的线程被关闭之前 将阻塞的方法
*
* A method that will block until when threads handling the serving have been
* shut down.
*/
protected abstract void waitForShutdown();
/**
* 让服务器传输开始接受请求/连接。
*
* Have the server transport start accepting connections.
*
* @return true if we started listening successfully, false if something went
* wrong.
*/
protected boolean startListening() {
try {
// 监听指定地址端口
serverTransport_.listen();
return true;
} catch (TTransportException ttx) {
LOGGER.error("Failed to start listening on server socket!", ttx);
return false;
}
}
/**
* 停止监听连接
*
* Stop listening for connections.
*/
protected void stopListening() {
serverTransport_.close();
}
/**
* FrameBuffer 状态机的可能状态。
*
* Possible states for the FrameBuffer state machine.
*/
private enum FrameBufferState {
// 从网络读取帧大小的过程中
// in the midst of reading the frame size off the wire
READING_FRAME_SIZE,
// 在读取实际的帧数据,但还没有全部完成
// reading the actual frame data now, but not all the way done yet
READING_FRAME,
// 完全读取帧,因此现在可以进行调用
// completely read the frame, so an invocation can now happen
READ_FRAME_COMPLETE,
// 等待切换到监听写事件
// waiting to get switched to listening for write events
AWAITING_REGISTER_WRITE,
// 开始写入响应数据,尚未完全完成
// started writing response data, not fully complete yet
WRITING,
// 另一个线程希望这个帧缓冲区返回读取
// another thread wants this framebuffer to go back to reading
AWAITING_REGISTER_READ,
// 我们希望我们的传输和选择键在选择器线程中失效
// we want our transport and selection key invalidated in the selector
// thread
AWAITING_CLOSE
}
(6.5) TNonblockingServer
package org.apache.thrift.server;
/**
*
* 非阻塞 TServer 实现。就调用而言,这允许所有连接的客户端之间的公平性。
*
* 该服务器本质上是单线程的。如果您想要一个有限的线程池以及调用公平,请参阅 THsHaServer。
*
* 要使用此服务器,您必须在最外层传输处使用 TFramedTransport,否则此服务器将无法确定何时已从线路读取整个方法调用。客户端还必须使用 TFramedTransport。
*
*
* A nonblocking TServer implementation. This allows for fairness amongst all
* connected clients in terms of invocations.
*
* This server is inherently single-threaded. If you want a limited thread pool
* coupled with invocation-fairness, see THsHaServer.
*
* To use this server, you MUST use a TFramedTransport at the outermost
* transport, otherwise this server will be unable to determine when a whole
* method call has been read off the wire. Clients must also use TFramedTransport.
*/
public class TNonblockingServer extends AbstractNonblockingServer {
// 选择接受线程
private SelectAcceptThread selectAcceptThread_;
public TNonblockingServer(AbstractNonblockingServerArgs args) {
super(args);
}
}
TNonblockingServer里有比较重要的几个方法 TNonblockingServer()
、 startThreads()
、 requestInvoke()
、 waitForShutdown()
、 joinSelector()
、 isStopped()
、 stop()
以及比较重要的类 SelectAcceptThread
(6.5.1) startThreads()
/**
* Start the selector thread to deal with accepts and client messages.
*
* @return true if everything went ok, false if we couldn't start for some
* reason.
*/
@Override
protected boolean startThreads() {
// 启动 选择接受线程
// start the selector
try {
// 新创建一个选择接受线程
selectAcceptThread_ = new SelectAcceptThread((TNonblockingServerTransport)serverTransport_);
// 启动线程
selectAcceptThread_.start();
return true;
} catch (IOException e) {
LOGGER.error("Failed to start selector thread!", e);
return false;
}
}
(6.5.2) SelectAcceptThread 重点
/**
* The thread that will be doing all the selecting, managing new connections
* and those that still need to be read.
*/
protected class SelectAcceptThread extends AbstractSelectThread {
// The server transport on which new client transports will be accepted
private final TNonblockingServerTransport serverTransport;
/**
* Set up the thread that will handle the non-blocking accepts, reads, and
* writes.
*/
public SelectAcceptThread(final TNonblockingServerTransport serverTransport)
throws IOException {
this.serverTransport = serverTransport;
serverTransport.registerSelector(selector);
}
}
SelectAcceptThread
类比较重要的几个方法 SelectAcceptThread
、 run()
、 select()
、 handleAccept()
run()
/**
* 工作循环。
* 处理选择(所有 IO 操作)和管理所有现有连接的选择首选项。
*
* The work loop. Handles both selecting (all IO operations) and managing
* the selection preferences of all existing connections.
*/
public void run() {
try {
if (eventHandler_ != null) {
eventHandler_.preServe();
}
// 只要服务不停止,就一直循环
while (!stopped_) {
// 选择和处理IO事件
select();
// 处理
processInterestChanges();
}
for (SelectionKey selectionKey : selector.keys()) {
// 清理
cleanupSelectionKey(selectionKey);
}
} catch (Throwable t) {
LOGGER.error("run() exiting due to uncaught error", t);
} finally {
try {
selector.close();
} catch (IOException e) {
LOGGER.error("Got an IOException while closing selector!", e);
}
stopped_ = true;
}
}
select()
/**
* 适当地选择和处理 IO 事件:
* 如果有要接受的连接,请接受它们。
* 如果存在等待读取的数据的现有连接,则读取它,缓冲直到读取整个帧。
* 如果有任何待处理的响应,缓冲它们直到它们的目标客户端可用,然后发送数据。
*
* Select and process IO events appropriately:
* If there are connections to be accepted, accept them.
* If there are existing connections with data waiting to be read, read it,
* buffering until a whole frame has been read.
* If there are any pending responses, buffer them until their target client
* is available, and then send the data.
*/
private void select() {
try {
// 等待IO事件
// wait for io events.
selector.select();
// 当IO事件到达时处理
// process the io events we received
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
// 服务器未停止并且有IO事件时 循环处理
while (!stopped_ && selectedKeys.hasNext()) {
//
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
// 跳过不可用的事件
// skip if not valid
if (!key.isValid()) {
cleanupSelectionKey(key);
continue;
}
// 如果密钥被标记为接受,那么它必须是服务器传输。
// if the key is marked Accept, then it has to be the server
// transport.
if (key.isAcceptable()) {
// 处理接受请求/新建连接操作
handleAccept();
} else if (key.isReadable()) {
// 处理读取操作
// deal with reads
handleRead(key);
} else if (key.isWritable()) {
// 处理写入操作
// deal with writes
handleWrite(key);
} else {
LOGGER.warn("Unexpected state in select! " + key.interestOps());
}
}
} catch (IOException e) {
LOGGER.warn("Got an IOException while selecting!", e);
}
}
handleAccept()
/**
* 接受新连接
*
* Accept a new connection.
*/
private void handleAccept() throws IOException {
SelectionKey clientKey = null;
TNonblockingTransport client = null;
try {
// 接受连接
// accept the connection
client = serverTransport.accept();
// 注册选择器 读取操作
clientKey = client.registerSelector(selector, SelectionKey.OP_READ);
//
// add this key to the map
FrameBuffer frameBuffer = createFrameBuffer(client, clientKey, SelectAcceptThread.this);
// 在clientKey上附加frameBuffer
clientKey.attach(frameBuffer);
} catch (TTransportException tte) {
// something went wrong accepting.
LOGGER.warn("Exception trying to accept!", tte);
if (clientKey != null) cleanupSelectionKey(clientKey);
if (client != null) client.close();
}
}
/**
*
*/
protected FrameBuffer createFrameBuffer(final TNonblockingTransport trans,
final SelectionKey selectionKey,
final AbstractSelectThread selectThread) throws TTransportException {
return processorFactory_.isAsyncProcessor() ?
new AsyncFrameBuffer(trans, selectionKey, selectThread) :
new FrameBuffer(trans, selectionKey, selectThread);
}
(6.5.2) AbstractNonblockingServer
handleRead(key)
package org.apache.thrift.server;
// class AbstractNonblockingServer
/**
* Do the work required to read from a readable client. If the frame is
* fully read, then invoke the method call.
*/
protected void handleRead(SelectionKey key) {
FrameBuffer buffer = (FrameBuffer) key.attachment();
// 数据校验 如果buffer还未读取,直接返回
if (!buffer.read()) {
cleanupSelectionKey(key);
return;
}
// buffer读取完成,调用方法
// if the buffer's frame read is complete, invoke the method.
if (buffer.isFrameFullyRead()) {
//
if (!requestInvoke(buffer)) {
//
cleanupSelectionKey(key);
}
}
}
package org.apache.thrift.server;
// class TNonblockingServer
/**
* Perform an invocation. This method could behave several different ways
* - invoke immediately inline, queue for separate execution, etc.
*/
@Override
protected boolean requestInvoke(FrameBuffer frameBuffer) {
frameBuffer.invoke();
return true;
}
package org.apache.thrift.server;
// class AbstractNonblockingServer
/**
* Actually invoke the method signified by this FrameBuffer.
*/
public void invoke() {
//
frameTrans_.reset(buffer_.array());
response_.reset();
try {
if (eventHandler_ != null) {
eventHandler_.processContext(context_, inTrans_, outTrans_);
}
// 获取处理器处理
// 调用对应的服务方法执行
processorFactory_.getProcessor(inTrans_).process(inProt_, outProt_);
responseReady();
return;
} catch (TException te) {
LOGGER.warn("Exception while invoking!", te);
} catch (Throwable t) {
LOGGER.error("Unexpected throwable while invoking!", t);
}
// This will only be reached when there is a throwable.
state_ = FrameBufferState.AWAITING_CLOSE;
requestSelectInterestChange();
}
handleWrite(key)
//
waitForShutdown()
package org.apache.thrift.server;
// class TNonblockingServer
/**
* 等待服务关闭
*/
@Override
protected void waitForShutdown() {
//
joinSelector();
}
/**
* 阻塞 直到 选择接受线程退出
*
* Block until the selector thread exits.
*/
protected void joinSelector() {
// wait until the selector thread exits
try {
// 等待 选择接受线程 执行完
selectAcceptThread_.join();
} catch (InterruptedException e) {
LOGGER.debug("Interrupted while waiting for accept thread", e);
Thread.currentThread().interrupt();
}
}
stop()
/**
* 停止服务
*
* Stop serving and shut everything down.
*/
@Override
public void stop() {
stopped_ = true;
if (selectAcceptThread_ != null) {
//
selectAcceptThread_.wakeupSelector();
}
}
(6.6) THsHaServer
package org.apache.thrift.server;
/**
* 半同步/半异步服务器(THsHaServer)是对TNonblockingServer的扩展。
* 与 TNonblockingServer 一样,它依赖于 TFramedTransport 的使用。
*
* An extension of the TNonblockingServer to a Half-Sync/Half-Async server.
* Like TNonblockingServer, it relies on the use of TFramedTransport.
*/
public class THsHaServer extends TNonblockingServer {
// This wraps all the functionality of queueing and thread pool management
// for the passing of Invocations from the Selector to workers.
private final ExecutorService invoker;
private final Args args;
/**
* Create the server with the specified Args configuration
*/
public THsHaServer(Args args) {
super(args);
invoker = args.executorService == null ? createInvokerPool(args) : args.executorService;
this.args = args;
}
}
(6.6.1) createInvokerPool()
/**
* Helper to create an invoker pool
*/
protected static ExecutorService createInvokerPool(Args options) {
int minWorkerThreads = options.minWorkerThreads;
int maxWorkerThreads = options.maxWorkerThreads;
int stopTimeoutVal = options.stopTimeoutVal;
TimeUnit stopTimeoutUnit = options.stopTimeoutUnit;
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
ExecutorService invoker = new ThreadPoolExecutor(minWorkerThreads,
maxWorkerThreads, stopTimeoutVal, stopTimeoutUnit, queue);
return invoker;
}
(6.6.2) requestInvoke()
/**
* We override the standard invoke method here to queue the invocation for
* invoker service instead of immediately invoking. The thread pool takes care
* of the rest.
*/
@Override
protected boolean requestInvoke(FrameBuffer frameBuffer) {
try {
// 新建一个线程
Runnable invocation = getRunnable(frameBuffer);
//
invoker.execute(invocation);
return true;
} catch (RejectedExecutionException rx) {
LOGGER.warn("ExecutorService rejected execution!", rx);
return false;
}
}
(6.6.3) waitForShutdown()
/**
* {@inheritDoc}
*/
@Override
protected void waitForShutdown() {
joinSelector();
gracefullyShutdownInvokerPool();
}
protected void gracefullyShutdownInvokerPool() {
// try to gracefully shut down the executor service
invoker.shutdown();
// Loop until awaitTermination finally does return without a interrupted
// exception. If we don't do this, then we'll shut down prematurely. We want
// to let the executorService clear it's task queue, closing client sockets
// appropriately.
long timeoutMS = args.stopTimeoutUnit.toMillis(args.stopTimeoutVal);
long now = System.currentTimeMillis();
while (timeoutMS >= 0) {
try {
// 优雅关闭
//
invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
break;
} catch (InterruptedException ix) {
long newnow = System.currentTimeMillis();
timeoutMS -= (newnow - now);
now = newnow;
}
}
}
(6.7) TThreadedSelectorServer
/**
*
* 具有单独的线程池来处理非阻塞 I/O 的半同步/半异步服务器。
* 接受在单个线程上处理,并且可配置数量的非阻塞选择器线程管理客户端连接的读取和写入。
* 同步工作线程池处理请求的处理。
*
* 当瓶颈是单个选择器线程处理 I/O 上的 CPU 时,在多核环境中的性能优于 TNonblockingServer/THsHaServer。此外,由于接受处理与读/写和调用分离,服务器有更好的能力处理来自新连接的背压(例如,忙时停止接受)。
* 与 TNonblockingServer 一样,它依赖于 TFramedTransport 的使用。
*
* A Half-Sync/Half-Async server with a separate pool of threads to handle
* non-blocking I/O. Accepts are handled on a single thread, and a configurable
* number of nonblocking selector threads manage reading and writing of client
* connections. A synchronous worker thread pool handles processing of requests.
*
* Performs better than TNonblockingServer/THsHaServer in multi-core
* environments when the the bottleneck is CPU on the single selector thread
* handling I/O. In addition, because the accept handling is decoupled from
* reads/writes and invocation, the server has better ability to handle back-
* pressure from new connections (e.g. stop accepting when busy).
*
* Like TNonblockingServer, it relies on the use of TFramedTransport.
*/
public class TThreadedSelectorServer extends AbstractNonblockingServer {
// 包装队列和线程池管理的所有功能,以便将调用从选择器线程传递到工作线程(如果有)。
//
// This wraps all the functionality of queueing and thread pool management
// for the passing of Invocations from the selector thread(s) to the workers
// (if any).
private final ExecutorService invoker;
private final Args args;
/**
* Create the server with the specified Args configuration
*/
public TThreadedSelectorServer(Args args) {
super(args);
args.validate();
invoker = args.executorService == null ? createDefaultExecutor(args) : args.executorService;
this.args = args;
}
}
serve流程
- startThreads();
- startListening();
- setServing(true);
- waitForShutdown();
- setServing(false);
- stopListening();
(6.7.1) startThreads()
/**
* Start the accept and selector threads running to deal with clients.
*
* @return true if everything went ok, false if we couldn't start for some
* reason.
*/
@Override
protected boolean startThreads() {
try {
for (int i = 0; i < args.selectorThreads; ++i) {
selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread));
}
acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_,
createSelectorThreadLoadBalancer(selectorThreads));
for (SelectorThread thread : selectorThreads) {
//
thread.start();
}
//
acceptThread.start();
return true;
} catch (IOException e) {
LOGGER.error("Failed to start threads!", e);
return false;
}
}
(6.7.2) SelectorThread
/**
* The SelectorThread(s) will be doing all the selecting on accepted active
* connections.
*/
protected class SelectorThread extends AbstractSelectThread {
// Accepted connections added by the accept thread.
private final BlockingQueue<TNonblockingTransport> acceptedQueue;
private int SELECTOR_AUTO_REBUILD_THRESHOLD = 512;
private long MONITOR_PERIOD = 1000L;
private int jvmBug = 0;
/**
* Set up the SelectorThread with an unbounded queue for incoming accepts.
*
* @throws IOException
* if a selector cannot be created
*/
public SelectorThread() throws IOException {
this(new LinkedBlockingQueue<TNonblockingTransport>());
}
}
SelectorThread::run()
/**
* The work loop. Handles selecting (read/write IO), dispatching, and
* managing the selection preferences of all existing connections.
*/
public void run() {
try {
while (!stopped_) {
//
select();
//
processAcceptedConnections();
//
processInterestChanges();
}
for (SelectionKey selectionKey : selector.keys()) {
cleanupSelectionKey(selectionKey);
}
} catch (Throwable t) {
LOGGER.error("run() on SelectorThread exiting due to uncaught error", t);
} finally {
try {
selector.close();
} catch (IOException e) {
LOGGER.error("Got an IOException while closing selector!", e);
}
// This will wake up the accept thread and the other selector threads
TThreadedSelectorServer.this.stop();
}
}
SelectorThread::select()
/**
* Select and process IO events appropriately: If there are existing
* connections with data waiting to be read, read it, buffering until a
* whole frame has been read. If there are any pending responses, buffer
* them until their target client is available, and then send the data.
*/
private void select() {
try {
doSelect();
// process the io events we received
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (!stopped_ && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
// skip if not valid
if (!key.isValid()) {
cleanupSelectionKey(key);
continue;
}
if (key.isReadable()) {
// deal with reads
handleRead(key);
} else if (key.isWritable()) {
// deal with writes
handleWrite(key);
} else {
LOGGER.warn("Unexpected state in select! " + key.interestOps());
}
}
} catch (IOException e) {
LOGGER.warn("Got an IOException while selecting!", e);
}
}
doSelect()
/**
* Do select and judge epoll bug happen.
* See : https://issues.apache.org/jira/browse/THRIFT-4251
*/
private void doSelect() throws IOException {
long beforeSelect = System.currentTimeMillis();
int selectedNums = selector.select();
long afterSelect = System.currentTimeMillis();
if (selectedNums == 0) {
jvmBug++;
} else {
jvmBug = 0;
}
long selectedTime = afterSelect - beforeSelect;
if (selectedTime >= MONITOR_PERIOD) {
jvmBug = 0;
} else if (jvmBug > SELECTOR_AUTO_REBUILD_THRESHOLD) {
LOGGER.warn("In {} ms happen {} times jvm bug; rebuilding selector.", MONITOR_PERIOD, jvmBug);
rebuildSelector();
selector.selectNow();
jvmBug = 0;
}
}
rebuildSelector()
/**
* Replaces the current Selector of this SelectorThread with newly created Selector to work
* around the infamous epoll 100% CPU bug.
*/
private synchronized void rebuildSelector() {
final Selector oldSelector = selector;
if (oldSelector == null) {
return;
}
Selector newSelector = null;
try {
newSelector = Selector.open();
LOGGER.warn("Created new Selector.");
} catch (IOException e) {
LOGGER.error("Create new Selector error.", e);
}
for (SelectionKey key : oldSelector.selectedKeys()) {
if (!key.isValid() && key.readyOps() == 0)
continue;
SelectableChannel channel = key.channel();
Object attachment = key.attachment();
try {
if (attachment == null) {
channel.register(newSelector, key.readyOps());
} else {
channel.register(newSelector, key.readyOps(), attachment);
}
} catch (ClosedChannelException e) {
LOGGER.error("Register new selector key error.", e);
}
}
selector = newSelector;
try {
oldSelector.close();
} catch (IOException e) {
LOGGER.error("Close old selector error.", e);
}
LOGGER.warn("Replace new selector success.");
}
processAcceptedConnections()
private void processAcceptedConnections() {
// Register accepted connections
while (!stopped_) {
TNonblockingTransport accepted = acceptedQueue.poll();
if (accepted == null) {
break;
}
registerAccepted(accepted);
}
}
registerAccepted()
private void registerAccepted(TNonblockingTransport accepted) {
SelectionKey clientKey = null;
try {
clientKey = accepted.registerSelector(selector, SelectionKey.OP_READ);
FrameBuffer frameBuffer = createFrameBuffer(accepted, clientKey, SelectorThread.this);
clientKey.attach(frameBuffer);
} catch (IOException | TTransportException e) {
LOGGER.warn("Failed to register accepted connection to selector!", e);
if (clientKey != null) {
cleanupSelectionKey(clientKey);
}
accepted.close();
}
}
processInterestChanges();
processInterestChanges()方法用的父类 里的processInterestChanges()
package org.apache.thrift.server;
// class AbstractNonblockingServer
/**
* Check to see if there are any FrameBuffers that have switched their
* interest type from read to write or vice versa.
*/
protected void processInterestChanges() {
synchronized (selectInterestChanges) {
for (FrameBuffer fb : selectInterestChanges) {
fb.changeSelectInterests();
}
selectInterestChanges.clear();
}
}
AcceptThread
/**
* The thread that selects on the server transport (listen socket) and accepts
* new connections to hand off to the IO selector threads
*/
protected class AcceptThread extends Thread {
// The listen socket to accept on
private final TNonblockingServerTransport serverTransport;
private final Selector acceptSelector;
private final SelectorThreadLoadBalancer threadChooser;
/**
* Set up the AcceptThead
*
* @throws IOException
*/
public AcceptThread(TNonblockingServerTransport serverTransport,
SelectorThreadLoadBalancer threadChooser) throws IOException {
this.serverTransport = serverTransport;
this.threadChooser = threadChooser;
this.acceptSelector = SelectorProvider.provider().openSelector();
this.serverTransport.registerSelector(acceptSelector);
}
}
AcceptThread::run()
/**
* The work loop. Selects on the server transport and accepts. If there was
* a server transport that had blocking accepts, and returned on blocking
* client transports, that should be used instead
*/
public void run() {
try {
if (eventHandler_ != null) {
eventHandler_.preServe();
}
while (!stopped_) {
select();
}
} catch (Throwable t) {
LOGGER.error("run() on AcceptThread exiting due to uncaught error", t);
} finally {
try {
acceptSelector.close();
} catch (IOException e) {
LOGGER.error("Got an IOException while closing accept selector!", e);
}
// This will wake up the selector threads
TThreadedSelectorServer.this.stop();
}
}
AcceptThread::select()
/**
* Select and process IO events appropriately: If there are connections to
* be accepted, accept them.
*/
private void select() {
try {
// wait for connect events.
acceptSelector.select();
// process the io events we received
Iterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator();
while (!stopped_ && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
// skip if not valid
if (!key.isValid()) {
continue;
}
if (key.isAcceptable()) {
// 处理
handleAccept();
} else {
LOGGER.warn("Unexpected state in select! " + key.interestOps());
}
}
} catch (IOException e) {
LOGGER.warn("Got an IOException while selecting!", e);
}
}
handleAccept()
/**
* Accept a new connection.
*/
private void handleAccept() {
final TNonblockingTransport client = doAccept();
if (client != null) {
// Pass this connection to a selector thread
final SelectorThread targetThread = threadChooser.nextThread();
if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) {
doAddAccept(targetThread, client);
} else {
// FAIR_ACCEPT
try {
invoker.submit(new Runnable() {
public void run() {
doAddAccept(targetThread, client);
}
});
} catch (RejectedExecutionException rx) {
LOGGER.warn("ExecutorService rejected accept registration!", rx);
// close immediately
client.close();
}
}
}
}
private void doAddAccept(SelectorThread thread, TNonblockingTransport client) {
if (!thread.addAcceptedConnection(client)) {
client.close();
}
}
requestInvoke()
/**
* We override the standard invoke method here to queue the invocation for
* invoker service instead of immediately invoking. If there is no thread
* pool, handle the invocation inline on this thread
*/
@Override
protected boolean requestInvoke(FrameBuffer frameBuffer) {
//
Runnable invocation = getRunnable(frameBuffer);
if (invoker != null) {
try {
//
invoker.execute(invocation);
return true;
} catch (RejectedExecutionException rx) {
LOGGER.warn("ExecutorService rejected execution!", rx);
return false;
}
} else {
// Invoke on the caller's thread
invocation.run();
return true;
}
}
protected Runnable getRunnable(FrameBuffer frameBuffer) {
return new Invocation(frameBuffer);
}
waitForShutdown()
/**
* Joins the accept and selector threads and shuts down the executor service.
*/
@Override
protected void waitForShutdown() {
try {
joinThreads();
} catch (InterruptedException e) {
// Non-graceful shutdown occurred
LOGGER.error("Interrupted while joining threads!", e);
}
gracefullyShutdownInvokerPool();
}
protected void joinThreads() throws InterruptedException {
// wait until the io threads exit
acceptThread.join();
for (SelectorThread thread : selectorThreads) {
thread.join();
}
}
protected void gracefullyShutdownInvokerPool() {
// try to gracefully shut down the executor service
invoker.shutdown();
// Loop until awaitTermination finally does return without a interrupted
// exception. If we don't do this, then we'll shut down prematurely. We want
// to let the executorService clear it's task queue, closing client sockets
// appropriately.
long timeoutMS = args.stopTimeoutUnit.toMillis(args.stopTimeoutVal);
long now = System.currentTimeMillis();
while (timeoutMS >= 0) {
try {
invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
break;
} catch (InterruptedException ix) {
long newnow = System.currentTimeMillis();
timeoutMS -= (newnow - now);
now = newnow;
}
}
}
stop()
/**
* Stop serving and shut everything down.
*/
@Override
public void stop() {
stopped_ = true;
// Stop queuing connect attempts asap
stopListening();
if (acceptThread != null) {
acceptThread.wakeupSelector();
}
if (selectorThreads != null) {
for (SelectorThread thread : selectorThreads) {
if (thread != null)
thread.wakeupSelector();
}
}
}
问题
(1) read tcp i/o timeout
thrift server start listened_addr:8081
17:22:14 error processing request: read tcp 127.0.0.1:8081->127.0.0.1:57899: i/o timeout
17:22:14 errmsg=read tcp 127.0.0.1:8081->127.0.0.1:57899: i/o timeout
17:22:14 error processing request: read tcp 127.0.0.1:8081->127.0.0.1:57899: i/o timeout
17:26:40 error processing request: read tcp 127.0.0.1:8081->127.0.0.1:59781: i/o timeout
17:26:40 error processing request: read tcp 127.0.0.1:8081->127.0.0.1:59781: i/o timeout
server读数据超时
write: broken pipe
errmsg=write tcp 10.157.121.37:8300->10.159.184.191:57066: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:58842: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:50344: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:38252: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:22356: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:50002: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:32572: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:59450: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:59320: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:41668: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:36668: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:60856: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:49126: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:30506: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:55618: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:22438: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:43892: write: broken pipe
2022/11/18 11:41:33 error processing request: write tcp 10.157.121.37:8300->10.159.184.191:26776: write: broken pipe
这个问题比较有意思,client->server,client有连接池存了server的ip
Incorrect frame size
2022/11/18 14:46:43 error processing request: Incorrect frame size (1347375956)
2022/11/18 14:46:43 error processing request: Incorrect frame size (1347375956)
connection reset by peer
2022/11/18 16:31:05 error processing request: read tcp 127.0.0.1:8081->127.0.0.1:62781: read: connection reset by peer
2022/11/18 16:31:14 error processing request: read tcp 127.0.0.1:8081->127.0.0.1:62635: read: connection reset by peer
2022/11/18 16:31:14 error processing request: read tcp 127.0.0.1:8081->127.0.0.1:62706: read: connection reset by peer
在thrift Server这边Debug时,client发送请求早已经超时断开,这个时候Server再往连接管道里写数据,就会报 connection reset by peer
参考资料
[1] Apache Thrift系列详解(二) - 网络服务模型
[2] 由浅入深了解Thrift(三)——Thrift server端的几种工作模式分析
[3] 6.Thrift指南 thrift go源码解析 1
[4] [原创](翻译)Java版的各种Thrift server实现的比较
[5] 从网络IO到Thrift网络模型
[6] 03. Apache thrift 之网络模型