跟着源码学 IM(八):万字长文,手把手教你用 Netty 打造 IM 聊天
本文作者芋艿,原题“使用 Netty 实现 IM 聊天贼简单”,本次有修订和改动。
一、本文引言
上篇《跟着源码学 IM(七):手把手教你用 WebSocket 打造 Web 端 IM 聊天》中,我们使用 WebSocket 实现了一个简单的 IM 功能,支持身份认证、私聊消息、群聊消息。
然后就有人发私信,希望使用纯 Netty 实现一个类似的功能,因此就有了本文。
注:源码请从同步链接附件中下载,http://www.52im.net/thread-3489-1-1.html。
学习交流:
移动端 IM 开发入门文章:《新手入门一篇就够:从零开发移动端 IM》
开源 IM 框架源码:https://github.com/JackJiang2011/MobileIMSDK
(本文同步发布于:http://www.52im.net/thread-3489-1-1.html)
二、知识准备
可能有人不知道 Netty 是什么,这里简单介绍下:
Netty 是一个 Java 开源框架。Netty 提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
也就是说,Netty 是一个基于 NIO 的客户、服务器端编程框架,使用 Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。
Netty 相当简化和流线化了网络应用的编程开发过程,例如,TCP 和 UDP 的 Socket 服务开发。
以下是几篇有关 Netty 的入门文章,值得一读:
《新手入门:目前为止最透彻的的 Netty 高性能原理和框架架构解析》《写给初学者:Java 高性能 NIO 框架 Netty 的学习方法和进阶策略》《史上最通俗 Netty 框架入门长文:基本介绍、环境搭建、动手实战》如果你连 Java 的 NIO 都不知道是什么,下面的文章建议优先读一下:
《少啰嗦!一分钟带你读懂 Java 的 NIO 和经典 IO 的区别》《史上最强 Java NIO 入门:担心从入门到放弃的,请读这篇!》《Java 的 BIO 和 NIO 很难懂?用代码实践给你看,再不懂我转行!》Netty 源码和 API 的在线阅读地址:
1)Netty-4.1.x 完整源码(在线阅读版)(* 推荐)2)Netty-4.0.x 完整源码(在线阅读版)3)Netty-4.1.x API 文档(在线版)(* 推荐)4)Netty-4.0.x API 文档(在线版)三、本文源码本文完整代码附件下载:请从同步链接附件中下载,http://www.52im.net/thread-3489-1-1.html。
源码的目录结构,如下图所示:
如上图所示:
1)lab-67-netty-demo-server 项目:搭建 Netty 服务端;2)lab-67-netty-demo-client 项目:搭建 Netty 客户端;3)lab-67-netty-demo-common 项目:提供 Netty 的基础封装,提供消息的编解码、分发的功能。另外,源码中也会提供 Netty 常用功能的示例:
1)心跳机制,实现服务端对客户端的存活检测;2)断线重连,实现客户端对服务端的重新连接。不哔哔,直接开干。
五、通信协议在上一章中,我们实现了客户端和服务端的连接功能。而本小节,我们要让它们两能够说上话,即进行数据的读写。
在日常项目的开发中,前端和后端之间采用 HTTP 作为通信协议,使用文本内容进行交互,数据格式一般是 JSON。但是在 TCP 的世界里,我们需要自己基于二进制构建,构建客户端和服务端的通信协议。
我们以客户端向服务端发送消息来举个例子,假设客户端要发送一个登录请求。
对应的类如下:
public class AuthRequest {
}
显然:我们无法将一个 Java 对象直接丢到 TCP Socket 当中,而是需要将其转换成 byte 字节数组,才能写入到 TCP Socket 中去。即,需要将消息对象通过序列化,转换成 byte 字节数组。
同时:在服务端收到 byte 字节数组时,需要将其又转换成 Java 对象,即反序列化。不然,服务端对着一串 byte 字节处理个毛线?!
友情提示:服务端向客户端发消息,也是一样的过程哈!
序列化的工具非常多,例如说 Google 提供的 Protobuf,性能高效,且序列化出来的二进制数据较小。Netty 对 Protobuf 进行集成,提供了相应的编解码器。
如下图所示:
但是考虑到很多可能对 Protobuf 并不了解,因为它实现序列化又增加额外学习成本。因此,仔细一个捉摸,还是采用 JSON 方式进行序列化。可能有人会疑惑,JSON 不是将对象转换成字符串吗?嘿嘿,我们再把字符串转换成 byte 字节数组就可以啦~
下面,我们新建 lab-67-netty-demo-common 项目,并在 codec 包下,实现我们自定义的通信协议。
如下图所示:
5.1、Invocation 创建 Invocation 类,通信协议的消息体。
代码如下:
/**
通信协议的消息体
*/
public class Invocation {
}
① type 属性,类型,用于匹配对应的消息处理器。如果类比 HTTP 协议,type 属性相当于请求地址。
② message 属性,消息内容,使用 JSON 格式。
另外,Message 是我们定义的消息接口,代码如下:
public interface Message {
}
5.2、粘包与拆包在开始看 Invocation 的编解码处理器之前,我们先了解下粘包与拆包的概念。
5.2.1 产生原因产生粘包和拆包问题的主要原因是,操作系统在发送 TCP 数据的时候,底层会有一个缓冲区,例如 1024 个字节大小。
如果一次请求发送的数据量比较小,没达到缓冲区大小,TCP 则会将多个请求合并为同一个请求进行发送,这就形成了粘包问题。
例如说:在《详解 Socket 编程 --- TCP_NODELAY 选项》文章中我们可以看到,在关闭 Nagle 算法时,请求不会等待满足缓冲区大小,而是尽快发出,降低延迟。
如果一次请求发送的数据量比较大,超过了缓冲区大小,TCP 就会将其拆分为多次发送,这就是拆包,也就是将一个大的包拆分为多个小包进行发送。
如下图展示了粘包和拆包的一个示意图,演示了粘包和拆包的三种情况:
如上图所示:
1)A 和 B 两个包都刚好满足 TCP 缓冲区的大小,或者说其等待时间已经达到 TCP 等待时长,从而还是使用两个独立的包进行发送;2)A 和 B 两次请求间隔时间内较短,并且数据包较小,因而合并为同一个包发送给服务端;3)B 包比较大,因而将其拆分为两个包 B_1 和 B_2 进行发送,而这里由于拆分后的 B_2 比较小,其又与 A 包合并在一起发送。5.2.2 解决方案对于粘包和拆包问题,常见的解决方案有三种。
① 客户端在发送数据包的时候,每个包都固定长度。比如 1024 个字节大小,如果客户端发送的数据长度不足 1024 个字节,则通过补充空格的方式补全到指定长度。
这种方式,暂时没有找到采用这种方式的案例。
② 客户端在每个包的末尾使用固定的分隔符。例如 \r\n,如果一个包被拆分了,则等待下一个包发送过来之后找到其中的 \r\n,然后对其拆分后的头部部分与前一个包的剩余部分进行合并,这样就得到了一个完整的包。具体的案例,有 HTTP、WebSocket、Redis。
③ 将消息分为头部和消息体,在头部中保存有当前整个消息的长度,只有在读取到足够长度的消息之后才算是读到了一个完整的消息。
友情提示:方案 ③ 是 ① 的升级版,动态长度。
本文将采用这种方式,在每次 Invocation 序列化成字节数组写入 TCP Socket 之前,先将字节数组的长度写到其中。
如下图所示:
5.3、InvocationEncoder 创建 InvocationEncoder 类,实现将 Invocation 序列化,并写入到 TCP Socket 中。
代码如下:
public class InvocationEncoder extends MessageToByteEncoder<Invocation> {
}
① MessageToByteEncoder 是 Netty 定义的编码 ChannelHandler 抽象类,将泛型 消息转换成字节数组。
② #encode(ChannelHandlerContext ctx, Invocation invocation, ByteBuf out) 方法,进行编码的逻辑。
<2.1> 处,调用 JSON 的 #toJSONBytes(Object object, SerializerFeature... features) 方法,将 Invocation 转换成 字节数组。
<2.2> 处,将字节数组的长度,写入到 TCP Socket 当中。这样,后续「5.4 InvocationDecoder」可以根据该长度,解析到消息,解决粘包和拆包的问题。
友情提示:MessageToByteEncoder 会最终将 ByteBuf out 写到 TCP Socket 中。
<2.3> 处,将字节数组,写入到 TCP Socket 当中。
5.4、InvocationDecoder 创建 InvocationDecoder 类,实现从 TCP Socket 读取字节数组,反序列化成 Invocation。
代码如下:
① ByteToMessageDecoder 是 Netty 定义的解码 ChannelHandler 抽象类,在 TCP Socket 读取到新数据时,触发进行解码。
② 在 <2.1>、<2.2>、<2.3> 处,从 TCP Socket 中读取长度。
③ 在 <3.1>、<3.2>、<3.3> 处,从 TCP Socket 中读取字节数组,并反序列化成 Invocation 对象。
最终,添加 List<Object> out 中,交给后续的 ChannelHandler 进行处理。稍后,我们将在「6. 消息分发」小结中,会看到 MessageDispatcher 将 Invocation 分发到其对应的 MessageHandler 中,进行业务逻辑的执行。
5.5、引入依赖创建 pom.xml 文件,引入 Netty、FastJSON 等等依赖。
5.6、本章小结至此,我们已经完成通信协议的定义、编解码的逻辑,是不是蛮有趣的?!
另外,我们在 NettyServerHandlerInitializer 和 NettyClientHandlerInitializer 的初始化代码中,将编解码器添加到其中。
如下图所示:
六、消息分发在 SpringMVC 中,DispatcherServlet 会根据请求地址、方法等,将请求分发到匹配的 Controller 的 Method 方法上。
在 lab-67-netty-demo-client 项目的 dispatcher 包中,我们创建了 MessageDispatcher 类,实现和 DispatcherServlet 类似的功能,将 Invocation 分发到其对应的 MessageHandler 中,进行业务逻辑的执行。
下面,我们来看看具体的代码实现。
6.1、Message 创建 Message 接口,定义消息的标记接口。
代码如下:
public interface Message {
}
下图,是我们涉及到的 Message 实现类。
如下图所示:
6.2、MessageHandler 创建 MessageHandler 接口,消息处理器接口。
代码如下:
public interface MessageHandler<T extendsMessage> {
}
如上述代码所示:
1)定义了泛型 <T> ,需要是 Message 的实现类;2)定义的两个接口方法,自己看下注释哈。下图,是我们涉及到的 MessageHandler 实现类。
如下图所示:
6.3、MessageHandlerContainer 创建 MessageHandlerContainer 类,作为 MessageHandler 的容器。
代码如下:
① 实现 InitializingBean 接口,在 #afterPropertiesSet() 方法中,扫描所有 MessageHandler Bean ,添加到 MessageHandler 集合中。
② 在 #getMessageHandler(String type) 方法中,获得类型对应的 MessageHandler 对象。稍后,我们会在 MessageDispatcher 调用该方法。
③ 在 #getMessageClass(MessageHandler handler) 方法中,通过 MessageHandler 中,通过解析其类上的泛型,获得消息类型对应的 Class 类。这是参考 rocketmq-spring 项目的 DefaultRocketMQListenerContainer#getMessageType() 方法,进行略微修改。
6.4、MessageDispatcher 创建 MessageDispatcher 类,将 Invocation 分发到其对应的 MessageHandler 中,进行业务逻辑的执行。
代码如下:
@ChannelHandler.Sharable
public class MessageDispatcher extends SimpleChannelInboundHandler<Invocation> {
}
① 在类上添加 @ChannelHandler.Sharable 注解,标记这个 ChannelHandler 可以被多个 Channel 使用。
② SimpleChannelInboundHandler 是 Netty 定义的消息处理 ChannelHandler 抽象类,处理消息的类型是 <I> 泛型时。
③ #channelRead0(ChannelHandlerContext ctx, Invocation invocation) 方法,处理消息,进行分发。
<3.1> 处,调用 MessageHandlerContainer 的 #getMessageHandler(String type) 方法,获得 Invocation 的 type 对应的 MessageHandler 处理器。
然后,调用 MessageHandlerContainer 的 #getMessageClass(messageHandler) 方法,获得 MessageHandler 处理器的消息类。
<3.2> 处,调用 JSON 的 ## parseObject(String text, Class<T> clazz) 方法,将 Invocation 的 message 解析成 MessageHandler 对应的消息对象。
<3.3> 处,丢到线程池中,然后调用 MessageHandler 的 #execute(Channel channel, T message) 方法,执行业务逻辑。
注意:为什么要丢到 executor 线程池中呢?我们先来了解下 EventGroup 的线程模型。
友情提示:在我们启动 Netty 服务端或者客户端时,都会设置其 EventGroup。
EventGroup 我们可以先简单理解成一个线程池,并且线程池的大小仅仅是 CPU 数量 * 2。每个 Channel 仅仅会被分配到其中的一个线程上,进行数据的读写。并且,多个 Channel 会共享一个线程,即使用同一个线程进行数据的读写。
那么试着思考下,MessageHandler 的具体逻辑视线中,往往会涉及到 IO 处理,例如说进行数据库的读取。这样,就会导致一个 Channel 在执行 MessageHandler 的过程中,阻塞了共享当前线程的其它 Channel 的数据读取。
因此,我们在这里创建了 executor 线程池,进行 MessageHandler 的逻辑执行,避免阻塞 Channel 的数据读取。
可能会有人说,我们是不是能够把 EventGroup 的线程池设置大一点,例如说 200 呢?对于长连接的 Netty 服务端,往往会有 1000 ~ 100000 的 Netty 客户端连接上来,这样无论设置多大的线程池,都会出现阻塞数据读取的情况。
友情提示:executor 线程池,我们一般称之为业务线程池或者逻辑线程池,顾名思义,就是执行业务逻辑的。这样的设计方式,目前 Dubbo 等等 RPC 框架,都采用这种方式。后续,可以认真阅读下《【NIO 系列】——之 Reactor 模型》文章,进一步理解。
6.5、NettyServerConfig 创建 NettyServerConfig 配置类,创建 MessageDispatcher 和 MessageHandlerContainer Bean。
代码如下:
@Configuration
public class NettyServerConfig {
}
6.6、NettyClientConfig 创建 NettyClientConfig 配置类,创建 MessageDispatcher 和 MessageHandlerContainer Bean。
代码如下:
@Configuration
public class NettyClientConfig {
}
6.7、本章小结后续,我们将在如下小节,具体演示消息分发的使用。
七、断开重连 Netty 客户端需要实现断开重连机制,解决各种情况下的断开情况。
例如说:
1)Netty 客户端启动时,Netty 服务端处于挂掉,导致无法连接上;2)在运行过程中,Netty 服务端挂掉,导致连接被断开;3)任一一端网络抖动,导致连接异常断开。具体的代码实现比较简单,只需要在两个地方增加重连机制:
1)Netty 客户端启动时,无法连接 Netty 服务端时,发起重连;2)Netty 客户端运行时,和 Netty 断开连接时,发起重连。考虑到重连会存在失败的情况,我们采用定时重连的方式,避免占用过多资源。
7.1、具体代码① 在 NettyClient 中,提供 #reconnect() 方法,实现定时重连的逻辑。
代码如下:
// NettyClient.java
public void reconnect() {
}
通过调用 EventLoop 提供的 #schedule(Runnable command, long delay, TimeUnit unit) 方法,实现定时逻辑。而在内部的具体逻辑,调用 NettyClient 的 #start() 方法,发起连接 Netty 服务端。
又因为 NettyClient 在 #start() 方法在连接 Netty 服务端失败时,又会调用 #reconnect() 方法,从而再次发起定时重连。如此循环反复,知道 Netty 客户端连接上 Netty 服务端。
如下图所示:
② 在 NettyClientHandler 中,实现 #channelInactive(ChannelHandlerContext ctx) 方法,在发现和 Netty 服务端断开时,调用 Netty Client 的 #reconnect() 方法,发起重连。
代码如下:
// NettyClientHandler.java
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
}
7.2、简单测试① 启动 Netty Client,不要启动 Netty Server,控制台打印日志如下图:
可以看到 Netty Client 在连接失败时,不断发起定时重连。
② 启动 Netty Server,控制台打印如下图:
可以看到 Netty Client 成功重连上 Netty Server。
八、心跳机制与空闲检测我们可以了解到 TCP 自带的空闲检测机制,默认是 2 小时。这样的检测机制,从系统资源层面上来说是可以接受的。
但是在业务层面,如果 2 小时才发现客户端与服务端的连接实际已经断开,会导致中间非常多的消息丢失,影响客户的使用体验。
因此,我们需要在业务层面,自己实现空闲检测,保证尽快发现客户端与服务端实际已经断开的情况。
实现逻辑如下:
1)服务端发现 180 秒未从客户端读取到消息,主动断开连接;2)客户端发现 180 秒未从服务端读取到消息,主动断开连接。考虑到客户端和服务端之间并不是一直有消息的交互,所以我们需要增加心跳机制。
逻辑如下:
1)客户端每 60 秒向服务端发起一次心跳消息,保证服务端可以读取到消息;2)服务端在收到心跳消息时,回复客户端一条确认消息,保证客户端可以读取到消息。友情提示:
为什么是 180 秒?可以加大或者减小,看自己希望多快检测到连接异常。过短的时间,会导致心跳过于频繁,占用过多资源。
为什么是 60 秒?三次机会,确认是否心跳超时。
虽然听起来有点复杂,但是实现起来并不复杂哈。
8.1、服务端的空闲检测在 NettyServerHandlerInitializer 中,我们添加了一个 ReadTimeoutHandler 处理器,它在超过指定时间未从对端读取到数据,会抛出 ReadTimeoutException 异常。
如下图所示:
通过这样的方式,实现服务端发现 180 秒未从客户端读取到消息,主动断开连接。
8.2、客户端的空闲检测在 NettyClientHandlerInitializer 中,我们添加了一个 ReadTimeoutHandler 处理器,它在超过指定时间未从对端读取到数据,会抛出 ReadTimeoutException 异常。
如下图所示:
通过这样的方式,实现客户端发现 180 秒未从服务端读取到消息,主动断开连接。
8.3、心跳机制 Netty 提供了 IdleStateHandler 处理器,提供空闲检测的功能,在 Channel 的读或者写空闲时间太长时,将会触发一个 IdleStateEvent 事件。
这样,我们只需要在 NettyClientHandler 处理器中,在接收到 IdleStateEvent 事件时,客户端向客户端发送一次心跳消息。
如下图所示:
其中,HeartbeatRequest 是心跳请求。
同时,我们在服务端项目中,创建了一个 HeartbeatRequestHandler 消息处理器,在收到客户端的心跳请求时,回复客户端一条确认消息。
代码如下:
@Component
public class HeartbeatRequestHandler implementsMessageHandler<HeartbeatRequest> {
}
其中,HeartbeatResponse 是心跳确认响应。
8.4、简单测试启动 Netty Server 服务端,再启动 Netty Client 客户端,耐心等待 60 秒后,可以看到心跳日志如下:
九、认证逻辑从本小节开始,我们就具体看看业务逻辑的处理示例。
认证的过程,如下图所示:
9.1、AuthRequest 创建 AuthRequest 类,定义用户认证请求。
代码如下:
public class AuthRequest implements Message {
/**
}
这里我们使用 accessToken 认证令牌进行认证。
因为一般情况下,我们使用 HTTP 进行登录系统,然后使用登录后的身份标识(例如说 accessToken 认证令牌),将客户端和当前用户进行认证绑定。
9.2、AuthResponse 创建 AuthResponse 类,定义用户认证响应。
代码如下:
public class AuthResponse implements Message {
}
9.3、AuthRequestHandler 服务端...
创建 AuthRequestHandler 类,为服务端处理客户端的认证请求。
代码如下:
代码比较简单,看看 <1>、<2>、<3>、<4> 上的注释。
9.4、AuthResponseHandler 客户端...
创建 AuthResponseHandler 类,为客户端处理服务端的认证响应。
代码如下:
@Component
public class AuthResponseHandler implements MessageHandler<AuthResponse> {
}
打印个认证结果,方便调试。
9.5、TestController 客户端...
创建 TestController 类,提供 /test/mock 接口,模拟客户端向服务端发送请求。
代码如下:
@RestController
@RequestMapping("/test")
public class TestController {
}
9.6、简单测试启动 Netty Server 服务端,再启动 Netty Client 客户端,然后使用 Postman 模拟一次认证请求。
如下图所示:
同时,可以看到认证成功的日志如下:
十一、群聊逻辑群聊的过程,如下图所示:
服务端负责将客户端 A 发送的群聊消息,转发给客户端 A、B、C。
友情提示:考虑到逻辑简洁,提供的本小节的示例并不是一个一个群,而是所有人在一个大的群聊中哈~
11.1、ChatSendToAllRequest 创建 ChatSendToOneRequest 类,发送给所有人的群聊消息的请求。
代码如下:
public class ChatSendToAllRequest implements Message {
}
PS:如果是正经的群聊,会有一个 groupId 字段,表示群编号。
11.2、ChatSendToAllHandler 服务端...
创建 ChatSendToAllHandler 类,为服务端处理客户端的群聊请求。
代码如下:
代码比较简单,看看 <1>、<2> 上的注释。
11.3、简单测试① 启动 Netty Server 服务端。
② 启动 Netty Client 客户端 A。然后使用 Postman 模拟一次认证请求(用户为 yunai)。
如下图所示:
③ 启动 Netty Client 客户端 B。注意,需要设置 --server.port 端口为 8081,避免冲突。
④ 启动 Netty Client 客户端 C。注意,需要设置 --server.port 端口为 8082,避免冲突。
⑤ 最后使用 Postman 模拟一次发送群聊消息。
如下图所示:
同时,可以看到客户端 A 群发给所有客户端的日志如下:
最后,要想系统地学习 IM 开发的方方面面,请继续阅读:《新手入门一篇就够:从零开发移动端 IM》
附录、系列文章
《跟着源码学 IM(一):手把手教你用 Netty 实现心跳机制、断线重连机制》
《跟着源码学 IM(二):自已开发 IM 很难?手把手教你撸一个 Andriod 版 IM》
《跟着源码学 IM(三):基于 Netty,从零开发一个 IM 服务端》
《跟着源码学 IM(四):拿起键盘就是干,教你徒手开发一套分布式 IM 系统》
《跟着源码学 IM(五):正确理解 IM 长连接、心跳及重连机制,并动手实现》
《跟着源码学 IM(六):手把手教你用 Go 快速搭建高性能、可扩展的 IM 系统》
《跟着源码学 IM(七):手把手教你用 WebSocket 打造 Web 端 IM 聊天》
《跟着源码学 IM(八):万字长文,手把手教你用 Netty 打造 IM 聊天》(* 本文)
评论