情况只适合处理短连接比较多的场景。
针对连接数量非常多,数据流量比较少的场景,多路复用的IO模型就比较适合。如下图所示,每一个Channel可以把自己注册到一个独立运行的Selector线程中,这个Selector线程会轮询所有Channel的读写状态,当发现一个就绪的Channel时,就可以使用工作线程为这个Channel提供服务。这样工作线程就不需要阻塞在某一个Channel上,只有真正要进行数据读写时才分配给某个Channel,极大提高了线程的利用率。
NIO消息服务器示例
点击查看代码
@Slf4j
public class Server {
public static void main(String[] args) {
try (Selector selector = Selector.open(); ServerSocketChannel scc = ServerSocketChannel.open();) {
scc.configureBlocking(false);
scc.bind(new InetSocketAddress(8080));
final SelectionKey sccKey = scc.register(selector, 0, null);
sccKey.interestOps(SelectionKey.OP_ACCEPT); // 配置这个channel只关注accept事件
while (true) {
selector.select();
final Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // 每次准备就绪的channel会被放到selectedKeys这个集合中,但删除,需要手动删除
while (iter.hasNext()) {
final SelectionKey key = iter.next();
log.debug("key: {}", key);
iter.remove(); // 手动从集合中删除处理过的key
if (key.isAcceptable()) {
final ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
final SocketChannel sc = ssc.accept();
sc.configureBlocking(false); // 使用selector时,channel都需要配置成非阻塞的
log.debug("accept: {}", sc);
final SelectionKey scKey = sc.register(selector, 0, null);
scKey.interestOps(SelectionKey.OP_READ);
} else if (key.isReadable()) {
try {
final SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(16);
final int read = sc.read(buffer);
if (read < 0) { // 客户端调用close()正常关闭时,会生成一次READ事件,实际read的返回值是-1,这里处理客户端正常退出
log.warn("cancel {}", key);
key.cancel();
} else {
buffer.flip();
System.out.println(StandardCharsets.UTF_8.decode(buffer));
}
log.debug("after read: {}", sc);
} catch (IOException e) { // 这里处理客户端异常退出
e.printStackTrace();
key.cancel();
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
处理消息边界问题