写点什么

【Netty】「NIO」(五)多线程优化

作者:sidiot
  • 2023-06-08
    浙江
  • 本文字数:4786 字

    阅读完需:约 16 分钟

前言

本篇博文是《从 0 到 1 学习 Netty》中 NIO 系列的第五篇博文,主要内容是使用多线程对程序进行优化,充分利用 CPU 的能力,往期系列文章请访问博主的 Netty 专栏,博文中的所有代码全部收集在博主的 GitHub 仓库中;


引入

这前几篇文章中,都是采用单线程进行设计,虽然可以运行,但是没有充分利用 CPU 的性能,并且如果有一个事件的处理时间较长,则会影响其他事件的处理。


例如,开发一个项目,如果团队只有一个全栈工程师,那么他需要先完成前端,再完成后端,只能按部就班的完成任务,如果前端开发遭遇困难,花费了很多时间,则会大大拉长项目开发周期,而如果一个团队里有前端工程师和后端工程师,则前后端的开发能同步进行,这样会大大提高开发效率。


同理,对之前的代码进行优化,分两组选择器:


  • 选择一个线程配置一个选择器,作为 ‘Boss’,专门处理 accept 事件

  • 创建多个线程(最好与 CPU 核心数一直),作为 ‘Worker’,每个线程配置一个选择器,轮流处理 readwrite 等事件



实现

1、创建一个 Boss 线程,负责处理 accept 事件类型:


Thread.currentThread().setName("Boss");Selector boss = Selector.open();ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);SelectionKey bossKey = ssc.register(boss, 0, null);bossKey.interestOps(SelectionKey.OP_ACCEPT);ssc.bind(new InetSocketAddress(7999));
while (true) { boss.select(); Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); if (key.isAcceptable()) { ServerSocketChannel channel = (ServerSocketChannel) key.channel(); SocketChannel sc = channel.accept(); sc.configureBlocking(false); } }}
复制代码


2、创建 Worker 类,用于初始化 Worker 线程和 Selector,负责处理 read 事件类型:


class Worker implements Runnable{    private Thread thread;    private volatile Selector worker;    private String name;
public Worker(String name) { this.name = name; }
public void register() throws IOException { this.thread = new Thread(this, this.name); this.worker = Selector.open(); this.thread.start(); } @Override public void run() { while (true) { try { this.worker.select(); Iterator<SelectionKey> iter = this.worker.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); if (key.isReadable()) { ByteBuffer buffer = ByteBuffer.allocate(16); SocketChannel channel = (SocketChannel) key.channel(); channel.read(buffer); buffer.flip(); debugAll(buffer); } } } catch (IOException e) { throw new RuntimeException(e); }
} }}
复制代码


但是这里会有个问题,每次进行 register() 的时候会新创建一个线程,但我们只想一个 Worker 对应一个线程,所以我们需要对上述代码进行优化,使用标志符来进行判断是否完成过初始化:


private volatile boolean start = false;
public void register() throws IOException { if (!this.start) { this.thread = new Thread(this, this.name); this.selector = Selector.open(); this.thread.start(); this.start = true; }}
复制代码


注意,this.worker = Selector.open();this.thread.start(); 不要写反了,不然之后运行会出现空指针异常:


Exception in thread "worker-0" java.lang.NullPointerException        at com.sidiot.netty.c3.MultiThreadServer$Worker.run(MultiThreadServer.java:75)        at java.base/java.lang.Thread.run(Thread.java:832)
复制代码


3、将 Worker 进行关联,先创建一个 worker 线程:


Worker worker0 = new Worker("worker-0");worker0.register();
while (true) { ... while (iter.hasNext()) { ... if (key.isAcceptable()) { ... log.debug("connected... {}", sc.getRemoteAddress()); log.debug("before register {}", sc.getRemoteAddress()); sc.register(worker0.selector, SelectionKey.OP_READ, null); log.debug("after register {}", sc.getRemoteAddress()); } }}
复制代码


4、编写客户端:


public class MultiThreadClient {      public static void main(String[] args) throws IOException {          SocketChannel sc = SocketChannel.open();          sc.connect(new InetSocketAddress("localhost", 7999));          sc.write(Charset.defaultCharset().encode("Hello, World! --sidiot."));          System.in.read();      }  }
复制代码


5、运行服务端和客户端,运行结果如下:


20:30:30 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - connected... /127.0.0.1:5061220:30:30 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - before register /127.0.0.1:5061220:30:30 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - after register /127.0.0.1:50612
复制代码


发现 worker 并没有进行工作,或者说是客户端发送的数据并没有进入到 worker 的可读事件中,这是因为在 worker 的 run() 方法运行时,SocketChannel 还没有注册到 worker 的 selector 中,导致 worker 线程在 this.selector.select(); 的位置发生了阻塞;


6、由于 sc.register 发生在 boss 线程中,而 select 发生在 worker 线程中,无法确定两个线程的执行顺序,因此需要把两步操作都放入一个线程中;


SocketChannel 传到到 Worker 的 register() 方法中:


public void register(SocketChannel sc) throws IOException {      if (!this.start) {          this.thread = new Thread(this, this.name);          this.selector = Selector.open();          this.thread.start();          this.start = true;      }        sc.register(this.selector, SelectionKey.OP_READ, null);  }
复制代码


但这样还是不行的,因为 register() 方法还是在 boss 线程中执行,这就需要使用队列来完成线程间的通信了:


private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
public void register(SocketChannel sc) throws IOException { ... this.queue.add(() -> { try { sc.register(this.selector, SelectionKey.OP_READ, null); } catch (ClosedChannelException e) { throw new RuntimeException(e); } }); this.selector.wakeup();}
@Override public void run() { while (true) { try { this.selector.select(); Runnable task = this.queue.poll(); if (task != null) { task.run(); } Iterator<SelectionKey> iter = this.selector.selectedKeys().iterator(); ... }}
复制代码


注意,这里需要 this.selector.wakeup(); 来唤醒 selector 继续往下走;


还有另一种方法,参考代码点击这里;


7、将单线程 worker 转成多线程:


Worker[] workers = new Worker[4];  for (int i = 0; i < workers.length; i++) {      workers[i] = new Worker("worker-" + i);  }
复制代码


同时使用计数器来实现各个 worker 线程的轮询使用:


AtomicInteger index = new AtomicInteger();
while (true) { ... while (iter.hasNext()) { ... if (key.isAcceptable()) { ... workers[index.getAndIncrement() % workers.length].register(sc); } } }
复制代码


运行结果:


22:36:13 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - connected... /127.0.0.1:5466822:36:13 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - before register /127.0.0.1:5466822:36:13 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - after register /127.0.0.1:5466822:36:13 [DEBUG] [worker-0] c.s.n.c.MultiThreadServer - read... /127.0.0.1:54668+--------+-------------------- all ------------------------+----------------+position: [0], limit: [7]         +-------------------------------------------------+         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |+--------+-------------------------------------------------+----------------+|00000000| 73 69 64 69 6f 74 2e 00 00 00 00 00 00 00 00 00 |sidiot..........|+--------+-------------------------------------------------+----------------+
22:36:20 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - connected... /127.0.0.1:5467622:36:20 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - before register /127.0.0.1:5467622:36:20 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - after register /127.0.0.1:5467622:36:20 [DEBUG] [worker-1] c.s.n.c.MultiThreadServer - read... /127.0.0.1:54676+--------+-------------------- all ------------------------+----------------+position: [0], limit: [7] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f |+--------+-------------------------------------------------+----------------+|00000000| 73 69 64 69 6f 74 2e 00 00 00 00 00 00 00 00 00 |sidiot..........|+--------+-------------------------------------------------+----------------+
22:36:30 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - connected... /127.0.0.1:5468722:36:30 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - before register /127.0.0.1:5468722:36:30 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - after register /127.0.0.1:5468722:36:30 [DEBUG] [worker-0] c.s.n.c.MultiThreadServer - read... /127.0.0.1:54687+--------+-------------------- all ------------------------+----------------+position: [0], limit: [7] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f |+--------+-------------------------------------------------+----------------+|00000000| 73 69 64 69 6f 74 2e 00 00 00 00 00 00 00 00 00 |sidiot..........|+--------+-------------------------------------------------+----------------+
复制代码


后记

以上就是 多线程优化 的所有内容了,希望本篇博文对大家有所帮助!


参考:



📝 上篇精讲:「NIO」(四)消息边界与可写事件

💖 我是 𝓼𝓲𝓭𝓲𝓸𝓽,期待你的关注;

👍 创作不易,请多多支持;

🔥 系列专栏:探索 Netty:源码解析与应用案例分享

发布于: 刚刚阅读数: 6
用户头像

sidiot

关注

还未添加个人签名 2023-06-04 加入

还未添加个人简介

评论

发布
暂无评论
【Netty】「NIO」(五)多线程优化_Java_sidiot_InfoQ写作社区