Netty 入门之可写事件以及多线程版的通信
- 2023-07-14 辽宁
本文字数:9420 字
阅读完需:约 31 分钟
本次主要讲解如何处理 ByteBuffer 的可写事件.
先上代码:
Server
public static void main(String[] args) throws IOException { ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false);
Selector selector = Selector.open(); // 注册并绑定accept事件 ssc.register(selector, SelectionKey.OP_ACCEPT); ssc.bind(new InetSocketAddress(8080));
while(true){ selector.select(); Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while(iter.hasNext()){ SelectionKey key = iter.next(); // 这里记得要移除key iter.remove(); if(key.isAcceptable()) { // 因为serverSocketChannel的key只有一个。所以这里简写了。直接 SocketChannel sc = ssc.accept(); sc.configureBlocking(false);
StringBuilder sb = new StringBuilder(); for(int i=0;i<=3000000; i++){ sb.append("a"); } ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString()); while (buffer.hasRemaining()){ // 这里不能保证一次全部写入进去 返回实际写入的字节数 int write = sc.write(buffer); System.out.println(write); } } }
} }
Client
public static void main(String[] args) throws IOException { SocketChannel sc = SocketChannel.open(); sc.connect(new InetSocketAddress("localhost",8080)); int count = 0;
while(true){ // 接收数据 ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024); count += sc.read(buffer); System.out.println(count); buffer.clear();
} }
上述代码的运行结果如图所示:
server
client
通过上述结果我们不难发现这个 server 端发送数据的时候并不是一次全部发送出去的,他尝试了很多次,效率很低, 并且有的时候 Buffer 是满的( server端打印0的时候,它是无法写的)他也无法发送,这样其实无法满足非阻塞模式的,接下来进行一个优化: 当 buffer 满的时候,我去进行别的操作,当 buffer 清空了触发一个写事件 上代码:
server(就是对上述代码进行了优化)
public class WriteServer { public static void main(String[] args) throws IOException { ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false);
Selector selector = Selector.open(); // 注册并绑定accept事件 ssc.register(selector, SelectionKey.OP_ACCEPT); ssc.bind(new InetSocketAddress(8080));
while(true){ selector.select(); Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while(iter.hasNext()){ SelectionKey key = iter.next(); // 这里记得要移除key iter.remove(); if(key.isAcceptable()) { // 因为serverSocketChannel的key只有一个。所以这里简写了。直接 SocketChannel sc = ssc.accept(); sc.configureBlocking(false); SelectionKey sckey = sc.register(selector,0,null); sckey.interestOps(SelectionKey.OP_READ); StringBuilder sb = new StringBuilder(); for(int i=0;i<=3000000; i++){ sb.append("a"); } ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
// 判断是否有剩余内容 if ( buffer.hasRemaining()){ // 关注可写事件 这里需要注意以下,避免替换掉之前关注的 可读事件 sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE );// sckey.interestOps(sckey.interestOps() | SelectionKey.OP_WRITE ); // 把未写完的数据挂到sckey上 通过附件的方式 sckey.attach(buffer);
} }else if (key.isWritable()){ ByteBuffer buffer = (ByteBuffer) key.attachment(); SocketChannel sc = (SocketChannel) key.channel(); int write = sc.write(buffer); System.out.println(write); // 清理操作 if (!buffer.hasRemaining()){ // 清除buffer key.attach(null); // 不需要关注可写事件 key.interestOps(key.interestOps() - SelectionKey.OP_WRITE); } } }
} }}
主要就是利用
附件的特性和关注可写事件关于可读事件就讲这些,接下来给大家说一下如何利用多线程来进行优化通信,充分利用多核 CPU
如图所示:
说明
黄色框框代表客户端
Boss 建立连接 accept 事件
worker 关注读写事件
单个 worker 版本
server
@Slf4jpublic class MultiThreadServer { public static void main(String[] args) throws IOException { Thread.currentThread().setName("Boss"); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); Selector boss = Selector.open(); SelectionKey bossKey = ssc.register(boss, 0, null); bossKey.interestOps(SelectionKey.OP_ACCEPT); ssc.bind(new InetSocketAddress(8080)); //1. 创建固定数量的worker 并初始化 Worker worker = new Worker("worker-0"); while (true){ boss.select(); Iterator<SelectionKey> iterator = boss.selectedKeys().iterator(); while (iterator.hasNext()){ SelectionKey key = iterator.next(); iterator.remove(); if (key.isAcceptable()){ SocketChannel sc = ssc.accept(); sc.configureBlocking(false); log.debug("connected...{}",sc.getRemoteAddress()); //2. 关联selector // 初始化selector 启动worker-0 log.debug("before register...{}",sc.getRemoteAddress()); worker.register(sc); log.debug("after register...{}",sc.getRemoteAddress());
} } } } static class Worker implements Runnable{ private Thread thread; private Selector selector; private String name; private volatile boolean start = false; // 还未初始化 private ConcurrentLinkedDeque<Runnable> queue = new ConcurrentLinkedDeque<>(); public Worker(String name){ this.name = name; } // 初始化线程 和selector public void register(SocketChannel sc) throws IOException { if (!start){ selector = Selector.open(); thread = new Thread(this,name); thread.start(); start =true; } // 此时这里还是boss线程执行的 因为run方法才是worker-0线程 可以利用消息队列 ConcurrentLinkedDeque// sc.register(selector,SelectionKey.OP_READ,null); // 像队列添加任务 但是这个任务并没有立即执行 我们在worker-0线程中取出来执行 queue.add(()->{ try { sc.register(selector,SelectionKey.OP_READ,null); } catch (ClosedChannelException e) { e.printStackTrace(); } }); selector.wakeup();// 唤醒selector }
@Override public void run() { while (true){ try { selector.select(); // worker-0 阻塞 wakeup // 取出来的可能为空 Runnable task = queue.poll(); if (task !=null) { task.run();// 执行了 sc.register(selector,SelectionKey.OP_READ,null); } Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()){ SelectionKey key = iterator.next(); iterator.remove(); if (key.isReadable()) { ByteBuffer buffer = ByteBuffer.allocate(16); SocketChannel channel = (SocketChannel) key.channel(); log.debug("read ....{}",channel.getLocalAddress()); // 这里有很多细节 具体的我不在这里进行赘述,往期文章有写 比如这里的buffer可能会出现黏包半包 // 客户端异常断开 检测异常 还有写的数据量过多 等等问题 往期文章都有写这里简单写一下关于多线程的逻辑 channel.read(buffer); buffer.flip(); debugAll(buffer); } } } catch (IOException e) { e.printStackTrace(); } } } }
// worker.register();log.debug("before register...{}",sc.getLocalAddress()); 启动worker-0 // sc.register(worker.selector,SelectionKey.OP_READ,null);
//*这两个代码是运行在两个线程中 当有客户端连接的时候它会阻塞住 这里如何解决? 方法有很多, 接下来模仿Netty的 // 让register 也运行在worker-0线程中*/}
Client
public class Client { public static void main(String[] args) throws IOException { SocketChannel sc = SocketChannel.open(); sc.connect(new InetSocketAddress("localhost",8080)); sc.write(Charset.defaultCharset().encode("1234567890abcdef")); System.in.read();// System.out.println("waiting......."); }
}
注意:
这里采用了队列的方式
多个 worker 版本
主要修改的地方
完整代码
public class MultiThreadServer { public static void main(String[] args) throws IOException { Thread.currentThread().setName("Boss"); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); Selector boss = Selector.open(); SelectionKey bossKey = ssc.register(boss, 0, null); bossKey.interestOps(SelectionKey.OP_ACCEPT); ssc.bind(new InetSocketAddress(8080)); //1. 创建固定数量的worker 并初始化 Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()]; for (int i = 0; i < workers.length; i++) { workers[i] = new Worker("worker-" + i); } // 计时器 AtomicInteger index = new AtomicInteger(); while (true){ boss.select(); Iterator<SelectionKey> iterator = boss.selectedKeys().iterator(); while (iterator.hasNext()){ SelectionKey key = iterator.next(); iterator.remove(); if (key.isAcceptable()){ SocketChannel sc = ssc.accept(); sc.configureBlocking(false); log.debug("connected...{}",sc.getRemoteAddress()); //2. 关联selector // 初始化selector 启动worker-0 log.debug("before register...{}",sc.getRemoteAddress()); // round robin 负载均衡算法 workers[index.getAndIncrement() % workers.length].register(sc); log.debug("after register...{}",sc.getRemoteAddress());
} } } } static class Worker implements Runnable{ private Thread thread; private Selector selector; private String name; private volatile boolean start = false; // 还未初始化 private ConcurrentLinkedDeque<Runnable> queue = new ConcurrentLinkedDeque<>(); public Worker(String name){ this.name = name; } // 初始化线程 和selector public void register(SocketChannel sc) throws IOException { if (!start){ selector = Selector.open(); thread = new Thread(this,name); thread.start(); start =true; } queue.add(()->{ try { sc.register(selector,SelectionKey.OP_READ,null); } catch (ClosedChannelException e) { e.printStackTrace(); } }); selector.wakeup();// 唤醒selector }
@Override public void run() { while (true){ try { selector.select(); // worker-0 阻塞 wakeup // 取出来的可能为空 Runnable task = queue.poll(); if (task !=null) { task.run();// 执行了 sc.register(selector,SelectionKey.OP_READ,null); } Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()){ SelectionKey key = iterator.next(); iterator.remove(); if (key.isReadable()) { ByteBuffer buffer = ByteBuffer.allocate(16); SocketChannel channel = (SocketChannel) key.channel(); log.debug("read ....{}",channel.getLocalAddress()); // 这里有很多细节 具体的我不在这里进行赘述,往期文章有写 比如这里的buffer可能会出现黏包半包 // 客户端异常断开 检测异常 还有写的数据量过多 等等问题 往期文章都有写这里简单写一下关于多线程的逻辑 channel.read(buffer); buffer.flip(); debugAll(buffer); } } } catch (IOException e) { e.printStackTrace(); } } } }}
多个 worker 版本的个数获取Runtime.getRuntime().availableProcessors() 这里需要注意
Runtime.getRuntime().availableProcessors() 如果工作在 docker 容器下,因为容器不是物理隔离的,会拿到物理 cpu 个数,而不是容器申请时的个数
这个问题直到 jdk 10 才修复,使用 jvm 参数 UseContainerSupport 配置, 默认开启
关于多线程版的通信讲到这里就告一段落了。
版权声明: 本文为 InfoQ 作者【派大星】的原创文章。
原文链接:【http://xie.infoq.cn/article/7ba745a91d8d570d3a25442ad】。
本文遵守【CC BY-NC-ND】协议,转载请保留原文出处及本版权声明。
派大星
微信搜索【码上遇见你】,获取更多精彩内容 2021-12-13 加入
微信搜索【码上遇见你】,获取更多精彩内容










评论