简介
server端接收数据:
我们常听到osi七层模型,这张图表明thrift的实现也有着很清晰的分层接口
- client : client –> protocol –> transport
- server : Transport –> protocol –> processor ==> Fuction
同步调用
thrfit调用客户端例子
TTransport transport = new TSocket("localhost", 1234);
TProtocol protocol = new TBinaryProtocol(transport);
RemotePingService.Client client = new RemotePingService.Client(protocol);
transport.open();
client.ping(2012);
transport.close();
基本框架类
TTransport ==> TIOStreamTransport ==> TSocket,这三个类是典型的“上层定义接口,下层负责实现”(还有一种模式是:上层定义基本功能,下层扩展新的方法)。
TTransport提供字节数组的read和write接口,底层可以是内存间的io,文件io,网络io可以是http,tcp,实现方式可以是netty等
class TIOStreamTransport extends TTransport{
protected InputStream inputStream_ = null;
protected OutputStream outputStream_ = null;
}
上述的TTransport的read和write接口,由inputStream和outputStream成员具体实现。
TSocket extends TIOStreamTransport{
private Socket socket_ = null;
private String host_ = null;
private int port_ = 0;
private int socketTimeout_ = 0;
private int connectTimeout_ = 0;
}
TSocket初始化Socket,并为其父类的inputStream和outputStream赋值。
TProtocol ==> TBinaryProtocol,TProtocol一堆writeBool,readBool等基本数据类型的方法,负责将一个方法调用中的方法名和参数等用byte[]
描述出来。
IDL生成类
假设有以下实例
namespace java org.lqk.thrift
service RemotePingService{
void ping(1: i32 length)
}
通过thrift程序会生成RemotePingService.java,里面提供了
- RemotePingService.Iface ping(int length)
- RemotePingService.syncIface ping(int length, org.apache.thrift.async.AsyncMethodCallback resultHandler)
- RemotePingService.Client
- RemotePingService.AsyncClient
- RemotePingService.Processor
- RemotePingService.AsyncProcessor
RemotePingService.Client extends org.apache.thrift.TServiceClient implements Iface
TServiceClient{
void sendBase(String methodName, TBase<?,?> args)
void sendBaseOneway(String methodName, TBase<?,?> args)
sendBase(String methodName, TBase<?,?> args, byte type)
receiveBase(TBase<?,?> result, String methodName)
}
TServiceClient提供发送数据(方法名和参数)和接收数据的一般抽象,将方法名和参数按一定的约定发出。
RemotePingService.client作为客户端关于RemotePingService的代理类,实现Iface接口,模拟出远程方法就在本地的感觉,将ping的调用转化成对sendBase和receiveBase的调用。
public void ping(int length) throws org.apache.thrift.TException{
send_ping(length);
recv_ping();
}
public void send_ping(int length) throws org.apache.thrift.TException{
ping_args args = new ping_args();
args.setLength(length);
sendBase("ping", args);
}
public void recv_ping() throws org.apache.thrift.TException{
ping_result result = new ping_result();
receiveBase(result, "ping");
return;
}
从这个角度,我们可以看到,任何rpc框架的实现都逃不过以下几个基本问题:
- 基本的网络通信,bio,nio,http,tcp,原生socket,netty等。
- 通信数据的序列化与反序列化
- 通信协议的制定
同步调用代码,只要耐心分析,还是比较简单的,并且是一种很有代表性的rpc基本实现,其类划分设计非常值得学习。
异步调用
- RemotePingService.Iface void ping(int length)
- RemotePingService.syncIface void ping(int length, org.apache.thrift.async.AsyncMethodCallback resultHandler)
解释下自己理解的一个误区,异步调用方法一般要传一个callback,但对于网络调用来说,这个callback不会传到服务端,callback是负责网络io的线程在接到数据后触发执行的。
AsyncClient extends org.apache.thrift.async.TAsyncClient implements AsyncIface
通过观察TAsyncClient,一个初步结论是:异步调用,底层网络通信和和上层数据处理都要改动.
-
在通信层面
abstract class TNonblockingTransport extends TTransport{ public abstract boolean startConnect() throws IOException; public abstract boolean finishConnect() throws IOException; public abstract SelectionKey registerSelector(Selector selector, int interests) throws IOException; public abstract int read(ByteBuffer buffer) throws IOException; public abstract int write(ByteBuffer buffer) throws IOException; }
可以看到,如果说thrift同步调用对通信模型的选择基本没有限定的话,那么thrift异步调用就只能选nio(至少nio相对其它方案是比较好的选择)。
class TNonblockingSocket extends TNonblockingTransport
基本逻辑就是初始化SocketChannel,并使用SocketChannel实现TNonblockingTransport和TTransport定义的方法 -
在上层实现层面
// AsyncClient.ping public void ping(int length, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {checkReady(); ping_call method_call = new ping_call(length, resultHandler, this, ___protocolFactory, ___transport); this.___currentMethod = method_call; ___manager.call(method_call); } // TAsyncClientManager public void call(TAsyncMethodCall method) throws TException { if (!isRunning()) { throw new TException("SelectThread is not running"); } method.prepareMethodCall(); pendingCalls.add(method); selectThread.getSelector().wakeup(); }
TAsyncClientManager,Contains selector thread which transitions(过渡,转场) methodcall objects.
TAsyncClientManager有一个SelectThread和pendingCalls(
ConcurrentLinkedQueue<TAsyncMethodCall>
),干的事跟netty的eventloop很像:- 监听连接;
-
处理pendingCalls中的TAsyncMethodCall,从类设计上TAsyncMethodCall会“自给自足”并记住自己执行到哪了(该发数据,还是该收数据),只需将selectKey传给它即可。TAsyncMethodCall定义了一系列状态,并有一个方法,只要传给它一个key,它会处理连接、发送和读取结果的所有逻辑,完成状态的转换。
protected void transition(SelectionKey key) { // Ensure key is valid if (!key.isValid()) { key.cancel(); Exception e = new TTransportException("Selection key not valid!"); onError(e); return; } // Transition function try { switch (state) { case CONNECTING: doConnecting(key); break; case WRITING_REQUEST_SIZE: doWritingRequestSize(); break; case WRITING_REQUEST_BODY: doWritingRequestBody(key); break; case READING_RESPONSE_SIZE: doReadingResponseSize(); break; case READING_RESPONSE_BODY: doReadingResponseBody(key); // 这里会调用callback break; default: // RESPONSE_READ, ERROR, or bug throw new IllegalStateException("Method call in state " + state + " but selector called transition method. Seems like a bug..."); } } catch (Exception e) { key.cancel(); key.attach(null); onError(e); } }
汇总起来,一次异步ping的执行就是:
- 实例化一个TAsyncMethodCall(这里是ping_call),加入到TAsyncClientManager.pendingCalls队列中。
- TAsyncClientManager.selectThread运行eventloop逻辑,驱动TAsyncMethodCall状态的转换,最终在读取response后,执行callback。
总体来讲,异步请求的逻辑要比同步请求复杂的多,这种复杂性主要是nio的通信方式带来的
- 将发送与接收的数据交给ByteBuffer,并留意数据的清除
- 处理nio,往往需要一个eventloop(一个线程(或线程池))驱动io数据的处理
- 逻辑的复杂性增加了类划分的复杂性