Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
.build();
server.start();
privatefinalint callId; // the client's call id 客户端idprivatefinalint retryCount; // the retry count of the call 重试次数privatefinal Writable rpcRequest; // Serialized Rpc request from client 序列号的请求privatefinal Connection connection; // connection to clientprivatelong timestamp; // time received when response is null// time served when response is not nullprivate ByteBuffer rpcResponse; // the response for this callprivatefinal RPC.RpcKind rpcKind;
privatefinalbyte[] clientId;
privatefinal Span traceSpan; // the tracing span on the server side
protectedServer(String bindAddress, int port,Class< extends Writable> rpcRequestClass, int handlerCount,int numReaders, int queueSizePerHandler, Configuration conf,String serverName, SecretManager< extends TokenIdentifier> secretManager,String portRangeConfig) throws IOException {
…………………………………..
this.callQueue = new CallQueueManager<Call>(getQueueClass(prefix, conf),maxQueueSize, prefix, conf);
…………………………………………
// Start the listener here and let it bind to the port
listener = new Listener();
this.port = listener.getAddress().getPort();
………………………………………………..
// Create the responder here
responder = new Responder();
…………………………………………….
}
publicsynchronizedvoidstart() {
responder.start();
listener.start();
handlers = new Handler[handlerCount];
for (int i = 0; i < handlerCount; i++) {
handlers[i] = new Handler(i);
handlers[i].start();
}
}
publicListener() throws IOException {
address = new InetSocketAddress(bindAddress, port);
// Create a new server socket and set to non blocking mode
acceptChannel = ServerSocketChannel.open();
acceptChannel.configureBlocking(false);
// Bind the server socket to the local host and port
bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port// create a selector;
selector= Selector.open();
readers = new Reader[readThreads];
for (int i = 0; i < readThreads; i++) {
Reader reader = new Reader(
"Socket Reader #" + (i + 1) + " for port " + port);
readers[i] = reader;
reader.start();
}
//监听OP_ACCEPT事件
**acceptChannel.register(selector, SelectionKey.OP_ACCEPT);**
this.setName("IPC Server listener on " + port);
this.setDaemon(true);
}
Reader线程读取数据
通过Listener的run方法我们看到如果一旦接受到请求,然后就让reader去处理
Connection c = connectionManager.register(channel);
// If the connectionManager can't take it, close the connection.if (c == null) {
if (channel.isOpen()) {
IOUtils.cleanup(null, channel);
}
continue;
}
key.attach(c); // so closeCurrentConnection can get the object
**reader.addConnection(c);**