写点什么

Netty 入门之可写事件以及多线程版的通信

作者:派大星
  • 2023-07-14
    辽宁
  • 本文字数:9420 字

    阅读完需:约 31 分钟

本次主要讲解如何处理 ByteBuffer 的可写事件.

先上代码:

  • Server

public static void main(String[] args) throws IOException {        ServerSocketChannel ssc = ServerSocketChannel.open();        ssc.configureBlocking(false);
        Selector selector = Selector.open();        // 注册并绑定accept事件        ssc.register(selector, SelectionKey.OP_ACCEPT);        ssc.bind(new InetSocketAddress(8080));
        while(true){            selector.select();            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();            while(iter.hasNext()){                SelectionKey key = iter.next();                // 这里记得要移除key                iter.remove();                if(key.isAcceptable()) {                    // 因为serverSocketChannel的key只有一个。所以这里简写了。直接                    SocketChannel sc = ssc.accept();                    sc.configureBlocking(false);
                    StringBuilder sb  = new StringBuilder();                    for(int i=0;i<=3000000; i++){                        sb.append("a");                    }                    ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());                    while (buffer.hasRemaining()){                        // 这里不能保证一次全部写入进去 返回实际写入的字节数                        int write = sc.write(buffer);                        System.out.println(write);                    }                }            }
        }    }
复制代码
  • Client

    public static void main(String[] args) throws IOException {        SocketChannel sc = SocketChannel.open();        sc.connect(new InetSocketAddress("localhost",8080));        int count = 0;
        while(true){            // 接收数据            ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);            count += sc.read(buffer);            System.out.println(count);            buffer.clear();
        }    }
复制代码

上述代码的运行结果如图所示:

  • server

  • client

通过上述结果我们不难发现这个 server 端发送数据的时候并不是一次全部发送出去的,他尝试了很多次,效率很低, 并且有的时候 Buffer 是满的( server端打印0的时候,它是无法写的)他也无法发送,这样其实无法满足非阻塞模式的,接下来进行一个优化: 当 buffer 满的时候,我去进行别的操作,当 buffer 清空了触发一个写事件 上代码:

  • server(就是对上述代码进行了优化)

public class WriteServer {    public static void main(String[] args) throws IOException {        ServerSocketChannel ssc = ServerSocketChannel.open();        ssc.configureBlocking(false);
        Selector selector = Selector.open();        // 注册并绑定accept事件        ssc.register(selector, SelectionKey.OP_ACCEPT);        ssc.bind(new InetSocketAddress(8080));
        while(true){            selector.select();            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();            while(iter.hasNext()){                SelectionKey key = iter.next();                // 这里记得要移除key                iter.remove();                if(key.isAcceptable()) {                    // 因为serverSocketChannel的key只有一个。所以这里简写了。直接                    SocketChannel sc = ssc.accept();                    sc.configureBlocking(false);                    SelectionKey sckey = sc.register(selector,0,null);                    sckey.interestOps(SelectionKey.OP_READ);                    StringBuilder sb  = new StringBuilder();                    for(int i=0;i<=3000000; i++){                        sb.append("a");                    }                    ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());


                    // 判断是否有剩余内容                    if ( buffer.hasRemaining()){                        // 关注可写事件  这里需要注意以下,避免替换掉之前关注的 可读事件                        sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE );//                        sckey.interestOps(sckey.interestOps() | SelectionKey.OP_WRITE );                        // 把未写完的数据挂到sckey上  通过附件的方式                        sckey.attach(buffer);
                    }                }else if (key.isWritable()){                    ByteBuffer buffer = (ByteBuffer) key.attachment();                    SocketChannel sc = (SocketChannel) key.channel();                    int write = sc.write(buffer);                    System.out.println(write);                    // 清理操作                    if (!buffer.hasRemaining()){                        // 清除buffer                        key.attach(null);                        // 不需要关注可写事件                        key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);                    }                }            }
        }    }}

复制代码

主要就是利用附件的特性和关注可写事件

关于可读事件就讲这些,接下来给大家说一下如何利用多线程来进行优化通信,充分利用多核 CPU

如图所示:

说明

  • 黄色框框代表客户端

  • Boss 建立连接 accept 事件

  • worker 关注读写事件

单个 worker 版本
  • server

@Slf4jpublic class MultiThreadServer {    public static void main(String[] args) throws IOException {        Thread.currentThread().setName("Boss");        ServerSocketChannel ssc = ServerSocketChannel.open();        ssc.configureBlocking(false);        Selector boss = Selector.open();        SelectionKey bossKey = ssc.register(boss, 0, null);        bossKey.interestOps(SelectionKey.OP_ACCEPT);        ssc.bind(new InetSocketAddress(8080));        //1. 创建固定数量的worker 并初始化        Worker worker = new Worker("worker-0");        while (true){            boss.select();            Iterator<SelectionKey> iterator = boss.selectedKeys().iterator();            while (iterator.hasNext()){                SelectionKey key = iterator.next();                iterator.remove();                if (key.isAcceptable()){                    SocketChannel sc = ssc.accept();                    sc.configureBlocking(false);                    log.debug("connected...{}",sc.getRemoteAddress());                    //2. 关联selector                    // 初始化selector 启动worker-0                    log.debug("before register...{}",sc.getRemoteAddress());                    worker.register(sc);                    log.debug("after register...{}",sc.getRemoteAddress());
                }            }        }    }    static class Worker implements Runnable{        private Thread thread;        private Selector selector;        private String name;        private volatile boolean start = false; // 还未初始化        private ConcurrentLinkedDeque<Runnable> queue = new ConcurrentLinkedDeque<>();        public Worker(String name){            this.name = name;        }        // 初始化线程 和selector        public void register(SocketChannel sc) throws IOException {            if (!start){                selector = Selector.open();                thread = new Thread(this,name);                thread.start();                start =true;            }            // 此时这里还是boss线程执行的  因为run方法才是worker-0线程 可以利用消息队列 ConcurrentLinkedDeque//            sc.register(selector,SelectionKey.OP_READ,null);            // 像队列添加任务 但是这个任务并没有立即执行  我们在worker-0线程中取出来执行            queue.add(()->{                try {                    sc.register(selector,SelectionKey.OP_READ,null);                } catch (ClosedChannelException e) {                    e.printStackTrace();                }            });            selector.wakeup();// 唤醒selector        }
        @Override        public void run() {            while (true){                try {                    selector.select();  // worker-0 阻塞 wakeup                    //  取出来的可能为空                    Runnable task = queue.poll();                    if (task !=null) {                        task.run();// 执行了   sc.register(selector,SelectionKey.OP_READ,null);                    }                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();                    while (iterator.hasNext()){                        SelectionKey key = iterator.next();                        iterator.remove();                        if (key.isReadable()) {                            ByteBuffer buffer = ByteBuffer.allocate(16);                            SocketChannel channel = (SocketChannel) key.channel();                            log.debug("read ....{}",channel.getLocalAddress());                            // 这里有很多细节 具体的我不在这里进行赘述,往期文章有写  比如这里的buffer可能会出现黏包半包                            // 客户端异常断开 检测异常  还有写的数据量过多 等等问题  往期文章都有写这里简单写一下关于多线程的逻辑                            channel.read(buffer);                            buffer.flip();                            debugAll(buffer);                        }                    }                } catch (IOException e) {                    e.printStackTrace();                }            }        }    }
    // worker.register();log.debug("before register...{}",sc.getLocalAddress()); 启动worker-0    // sc.register(worker.selector,SelectionKey.OP_READ,null);
    //*这两个代码是运行在两个线程中  当有客户端连接的时候它会阻塞住  这里如何解决? 方法有很多,  接下来模仿Netty的    // 让register 也运行在worker-0线程中*/}
复制代码
  • Client

public class Client {    public static void main(String[] args) throws IOException {        SocketChannel sc = SocketChannel.open();        sc.connect(new InetSocketAddress("localhost",8080));        sc.write(Charset.defaultCharset().encode("1234567890abcdef"));        System.in.read();//        System.out.println("waiting.......");    }
}
复制代码
  • 注意:

这里采用了队列的方式

多个 worker 版本

主要修改的地方

  • 完整代码

public class MultiThreadServer {    public static void main(String[] args) throws IOException {        Thread.currentThread().setName("Boss");        ServerSocketChannel ssc = ServerSocketChannel.open();        ssc.configureBlocking(false);        Selector boss = Selector.open();        SelectionKey bossKey = ssc.register(boss, 0, null);        bossKey.interestOps(SelectionKey.OP_ACCEPT);        ssc.bind(new InetSocketAddress(8080));        //1. 创建固定数量的worker 并初始化        Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];        for (int i = 0; i < workers.length; i++) {            workers[i] = new Worker("worker-" + i);        }        // 计时器        AtomicInteger index = new AtomicInteger();        while (true){            boss.select();            Iterator<SelectionKey> iterator = boss.selectedKeys().iterator();            while (iterator.hasNext()){                SelectionKey key = iterator.next();                iterator.remove();                if (key.isAcceptable()){                    SocketChannel sc = ssc.accept();                    sc.configureBlocking(false);                    log.debug("connected...{}",sc.getRemoteAddress());                    //2. 关联selector                    // 初始化selector 启动worker-0                    log.debug("before register...{}",sc.getRemoteAddress());                    // round robin 负载均衡算法                    workers[index.getAndIncrement() % workers.length].register(sc);                    log.debug("after register...{}",sc.getRemoteAddress());
                }            }        }    }    static class Worker implements Runnable{        private Thread thread;        private Selector selector;        private String name;        private volatile boolean start = false; // 还未初始化        private ConcurrentLinkedDeque<Runnable> queue = new ConcurrentLinkedDeque<>();        public Worker(String name){            this.name = name;        }        // 初始化线程 和selector        public void register(SocketChannel sc) throws IOException {            if (!start){                selector = Selector.open();                thread = new Thread(this,name);                thread.start();                start =true;            }            queue.add(()->{                try {                    sc.register(selector,SelectionKey.OP_READ,null);                } catch (ClosedChannelException e) {                    e.printStackTrace();                }            });            selector.wakeup();// 唤醒selector        }
        @Override        public void run() {            while (true){                try {                    selector.select();  // worker-0 阻塞 wakeup                    //  取出来的可能为空                    Runnable task = queue.poll();                    if (task !=null) {                        task.run();// 执行了   sc.register(selector,SelectionKey.OP_READ,null);                    }                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();                    while (iterator.hasNext()){                        SelectionKey key = iterator.next();                        iterator.remove();                        if (key.isReadable()) {                            ByteBuffer buffer = ByteBuffer.allocate(16);                            SocketChannel channel = (SocketChannel) key.channel();                            log.debug("read ....{}",channel.getLocalAddress());                            // 这里有很多细节 具体的我不在这里进行赘述,往期文章有写  比如这里的buffer可能会出现黏包半包                            // 客户端异常断开 检测异常  还有写的数据量过多 等等问题  往期文章都有写这里简单写一下关于多线程的逻辑                            channel.read(buffer);                            buffer.flip();                            debugAll(buffer);                        }                    }                } catch (IOException e) {                    e.printStackTrace();                }            }        }    }}

复制代码

多个 worker 版本的个数获取Runtime.getRuntime().availableProcessors() 这里需要注意

  • Runtime.getRuntime().availableProcessors() 如果工作在 docker 容器下,因为容器不是物理隔离的,会拿到物理 cpu 个数,而不是容器申请时的个数

  • 这个问题直到 jdk 10 才修复,使用 jvm 参数 UseContainerSupport 配置, 默认开启


关于多线程版的通信讲到这里就告一段落了。


发布于: 2023-07-14阅读数: 17
用户头像

派大星

关注

微信搜索【码上遇见你】,获取更多精彩内容 2021-12-13 加入

微信搜索【码上遇见你】,获取更多精彩内容

评论

发布
暂无评论
Netty入门之可写事件以及多线程版的通信_派大星_InfoQ写作社区