r(channel, buffer); ? ? ? ? } ? ? } ? ? ? private void writeBuffer(final AsynchronousSocketChannel channel, ByteBuffer buffer) { ? ? ? ? channel.write(buffer, buffer, new CompletionHandler() { ? ? ? ? ? ? @Override ? ? ? ? ? ? public void completed(Integer result, ByteBuffer buffer) { ? ? ? ? ? ? ? ? if (buffer.hasRemaining()) { ? ? ? ? ? ? ? ? ? ? channel.write(buffer, buffer, this); ? ? ? ? ? ? ? ? } else { ? ? ? ? ? ? ? ? ? ? // Go back and check if there is new data to write ? ? ? ? ? ? ? ? ? ? writeFromQueue(channel); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? } ? ? ? ? ? ? ? @Override ? ? ? ? ? ? public void failed(Throwable exc, ByteBuffer attachment) { ? ? ? ? ? ? ? ? System.out.println("server write failed: " + exc); ? ? ? ? ? ? } ? ? ? ? }); ? ? } ? ? ? /** ? ? * Sends a message ? ? * @param string the message ? ? * @throws CharacterCodingException ? ? */ ? ? private void writeStringMessage(final AsynchronousSocketChannel channel, String msg) throws CharacterCodingException { ? ? ? ? writeMessage(channel, Charset.forName("UTF-8").newEncoder().encode(CharBuffer.wrap(msg))); ? ? } }
客户端代码:
package com.stevex.app.aio; ? import java.io.IOException; import java.net.InetSocketAddress; import java.net.StandardSocketOptions; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; ? import com.stevex.app.nio.CharsetHelper; ? public class Client implements Runnable{ ? ? private AsynchronousSocketChannel channel; ? ? private Helper helper; ? ? private CountDownLatch latch; ? ? private final Queue queue = new LinkedList(); ? ? private boolean writing = false; ? ? ? ? public Client(AsynchronousChannelGroup channelGroup, CountDownLatch latch) throws IOException, InterruptedException{ ? ? ? ? this.latch = latch; ? ? ? ? helper = new Helper(); ? ? ? ? initChannel(channelGroup); ? ? } ? ? ? private void initChannel(AsynchronousChannelGroup channelGroup) throws IOException { ? ? ? ? //在默认channel group下创建一个socket channel ? ? ? ? channel = AsynchronousSocketChannel.open(channelGroup); ? ? ? ? //设置Socket选项 ? ? ? ? channel.setOption(StandardSocketOptions.TCP_NODELAY, true); ? ? ? ? channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); ? ? ? ? channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); ? ? } ? ? ? public static void main(String[] args) throws IOException, InterruptedException { ? ? ? ? int sleepTime = Integer.parseInt(args[0]); ? ? ? ? Helper.sleep(sleepTime); ? ? ? ? ? ? ? ? AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(Runtime.getRuntime().availableProcessors(), Executors.defaultThreadFactory()); ? ? ? ? //只能跑一个线程,第二个线程connect会挂住,暂时不明原因 ? ? ? ? final int THREAD_NUM = 1; ? ? ? ? CountDownLatch latch = new CountDownLatch(THREAD_NUM); ? ? ? ? ? ? ? ? //创建个多线程模拟多个客户端,模拟失败,无效 ? ? ? ? //只能通过命令行同时运行多个进程来模拟多个客户端 ? ? ? ? for(int i=0; i? ? ? ? ? ? Client c = new Client(channelGroup, latch); ? ? ? ? ? ? Thread t = new Thread(c); ? ? ? ? ? ? System.out.println(t.getName() + "---start"); ? ? ? ? ? ? t.start(); ? ? ? ? ? ? //让主线程等待子线程处理再退出, 这对于异步调用无效 ? ? ? ? ? ? //t.join(); ? ? ? ? }? ? ? ? ? ? ? ? ? ? ? ? ? latch.await(); ? ? ? ? ? ? ? ? if(channelGroup !=null){ ? ? ? ? ? ? channelGroup.shutdown(); ? ? ? ? } ? ? } ? |