Netty 入门之可写事件以及多线程版的通信
- 2023-07-14 辽宁
本文字数:9420 字
阅读完需:约 31 分钟
本次主要讲解如何处理 ByteBuffer 的可写事件.
先上代码:
Server
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector selector = Selector.open();
// 注册并绑定accept事件
ssc.register(selector, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
while(true){
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while(iter.hasNext()){
SelectionKey key = iter.next();
// 这里记得要移除key
iter.remove();
if(key.isAcceptable()) {
// 因为serverSocketChannel的key只有一个。所以这里简写了。直接
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
StringBuilder sb = new StringBuilder();
for(int i=0;i<=3000000; i++){
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
while (buffer.hasRemaining()){
// 这里不能保证一次全部写入进去 返回实际写入的字节数
int write = sc.write(buffer);
System.out.println(write);
}
}
}
}
}
Client
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost",8080));
int count = 0;
while(true){
// 接收数据
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
count += sc.read(buffer);
System.out.println(count);
buffer.clear();
}
}
上述代码的运行结果如图所示:
server
client
通过上述结果我们不难发现这个 server 端发送数据的时候并不是一次全部发送出去的,他尝试了很多次,效率很低, 并且有的时候 Buffer 是满的( server端打印0的时候,它是无法写的
)他也无法发送,这样其实无法满足非阻塞模式的,接下来进行一个优化: 当 buffer 满的时候,我去进行别的操作,当 buffer 清空了触发一个写事件 上代码:
server(就是对上述代码进行了优化)
public class WriteServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector selector = Selector.open();
// 注册并绑定accept事件
ssc.register(selector, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
while(true){
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while(iter.hasNext()){
SelectionKey key = iter.next();
// 这里记得要移除key
iter.remove();
if(key.isAcceptable()) {
// 因为serverSocketChannel的key只有一个。所以这里简写了。直接
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
SelectionKey sckey = sc.register(selector,0,null);
sckey.interestOps(SelectionKey.OP_READ);
StringBuilder sb = new StringBuilder();
for(int i=0;i<=3000000; i++){
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
// 判断是否有剩余内容
if ( buffer.hasRemaining()){
// 关注可写事件 这里需要注意以下,避免替换掉之前关注的 可读事件
sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE );
// sckey.interestOps(sckey.interestOps() | SelectionKey.OP_WRITE );
// 把未写完的数据挂到sckey上 通过附件的方式
sckey.attach(buffer);
}
}else if (key.isWritable()){
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel sc = (SocketChannel) key.channel();
int write = sc.write(buffer);
System.out.println(write);
// 清理操作
if (!buffer.hasRemaining()){
// 清除buffer
key.attach(null);
// 不需要关注可写事件
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
}
}
}
}
}
}
主要就是利用
附件
的特性和关注可写事件
关于可读事件就讲这些,接下来给大家说一下如何利用多线程来进行优化通信,充分利用多核 CPU
如图所示:
说明
黄色框框代表客户端
Boss 建立连接 accept 事件
worker 关注读写事件
单个 worker 版本
server
@Slf4j
public class MultiThreadServer {
public static void main(String[] args) throws IOException {
Thread.currentThread().setName("Boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector boss = Selector.open();
SelectionKey bossKey = ssc.register(boss, 0, null);
bossKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
//1. 创建固定数量的worker 并初始化
Worker worker = new Worker("worker-0");
while (true){
boss.select();
Iterator<SelectionKey> iterator = boss.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()){
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
log.debug("connected...{}",sc.getRemoteAddress());
//2. 关联selector
// 初始化selector 启动worker-0
log.debug("before register...{}",sc.getRemoteAddress());
worker.register(sc);
log.debug("after register...{}",sc.getRemoteAddress());
}
}
}
}
static class Worker implements Runnable{
private Thread thread;
private Selector selector;
private String name;
private volatile boolean start = false; // 还未初始化
private ConcurrentLinkedDeque<Runnable> queue = new ConcurrentLinkedDeque<>();
public Worker(String name){
this.name = name;
}
// 初始化线程 和selector
public void register(SocketChannel sc) throws IOException {
if (!start){
selector = Selector.open();
thread = new Thread(this,name);
thread.start();
start =true;
}
// 此时这里还是boss线程执行的 因为run方法才是worker-0线程 可以利用消息队列 ConcurrentLinkedDeque
// sc.register(selector,SelectionKey.OP_READ,null);
// 像队列添加任务 但是这个任务并没有立即执行 我们在worker-0线程中取出来执行
queue.add(()->{
try {
sc.register(selector,SelectionKey.OP_READ,null);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});
selector.wakeup();// 唤醒selector
}
@Override
public void run() {
while (true){
try {
selector.select(); // worker-0 阻塞 wakeup
// 取出来的可能为空
Runnable task = queue.poll();
if (task !=null) {
task.run();// 执行了 sc.register(selector,SelectionKey.OP_READ,null);
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = (SocketChannel) key.channel();
log.debug("read ....{}",channel.getLocalAddress());
// 这里有很多细节 具体的我不在这里进行赘述,往期文章有写 比如这里的buffer可能会出现黏包半包
// 客户端异常断开 检测异常 还有写的数据量过多 等等问题 往期文章都有写这里简单写一下关于多线程的逻辑
channel.read(buffer);
buffer.flip();
debugAll(buffer);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
// worker.register();log.debug("before register...{}",sc.getLocalAddress()); 启动worker-0
// sc.register(worker.selector,SelectionKey.OP_READ,null);
//*这两个代码是运行在两个线程中 当有客户端连接的时候它会阻塞住 这里如何解决? 方法有很多, 接下来模仿Netty的
// 让register 也运行在worker-0线程中*/
}
Client
public class Client {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost",8080));
sc.write(Charset.defaultCharset().encode("1234567890abcdef"));
System.in.read();
// System.out.println("waiting.......");
}
}
注意:
这里采用了队列的方式
多个 worker 版本
主要修改的地方
完整代码
public class MultiThreadServer {
public static void main(String[] args) throws IOException {
Thread.currentThread().setName("Boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector boss = Selector.open();
SelectionKey bossKey = ssc.register(boss, 0, null);
bossKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
//1. 创建固定数量的worker 并初始化
Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];
for (int i = 0; i < workers.length; i++) {
workers[i] = new Worker("worker-" + i);
}
// 计时器
AtomicInteger index = new AtomicInteger();
while (true){
boss.select();
Iterator<SelectionKey> iterator = boss.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()){
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
log.debug("connected...{}",sc.getRemoteAddress());
//2. 关联selector
// 初始化selector 启动worker-0
log.debug("before register...{}",sc.getRemoteAddress());
// round robin 负载均衡算法
workers[index.getAndIncrement() % workers.length].register(sc);
log.debug("after register...{}",sc.getRemoteAddress());
}
}
}
}
static class Worker implements Runnable{
private Thread thread;
private Selector selector;
private String name;
private volatile boolean start = false; // 还未初始化
private ConcurrentLinkedDeque<Runnable> queue = new ConcurrentLinkedDeque<>();
public Worker(String name){
this.name = name;
}
// 初始化线程 和selector
public void register(SocketChannel sc) throws IOException {
if (!start){
selector = Selector.open();
thread = new Thread(this,name);
thread.start();
start =true;
}
queue.add(()->{
try {
sc.register(selector,SelectionKey.OP_READ,null);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});
selector.wakeup();// 唤醒selector
}
@Override
public void run() {
while (true){
try {
selector.select(); // worker-0 阻塞 wakeup
// 取出来的可能为空
Runnable task = queue.poll();
if (task !=null) {
task.run();// 执行了 sc.register(selector,SelectionKey.OP_READ,null);
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = (SocketChannel) key.channel();
log.debug("read ....{}",channel.getLocalAddress());
// 这里有很多细节 具体的我不在这里进行赘述,往期文章有写 比如这里的buffer可能会出现黏包半包
// 客户端异常断开 检测异常 还有写的数据量过多 等等问题 往期文章都有写这里简单写一下关于多线程的逻辑
channel.read(buffer);
buffer.flip();
debugAll(buffer);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
多个 worker 版本的个数获取Runtime.getRuntime().availableProcessors()
这里需要注意
Runtime.getRuntime().availableProcessors() 如果工作在 docker 容器下,因为容器不是物理隔离的,会拿到物理 cpu 个数,而不是容器申请时的个数
这个问题直到 jdk 10 才修复,使用 jvm 参数 UseContainerSupport 配置, 默认开启
关于多线程版的通信讲到这里就告一段落了。
版权声明: 本文为 InfoQ 作者【派大星】的原创文章。
原文链接:【http://xie.infoq.cn/article/7ba745a91d8d570d3a25442ad】。
本文遵守【CC BY-NC-ND】协议,转载请保留原文出处及本版权声明。
派大星
微信搜索【码上遇见你】,获取更多精彩内容 2021-12-13 加入
微信搜索【码上遇见你】,获取更多精彩内容
评论