写点什么

基于 Kotlin + Netty 实现一个简单的 TCP 自定义协议,kotlin 扩展函数原理

用户头像
Android架构
关注
发布于: 2 小时前

const val JSON: Byte = 0


}


}


每个 Packet 也包含了其对应的 command。下面是 Commands 是指令集,支持 256 个指令。


/**


  • 指令集,支持从 -128 到 127 总共 256 个指令


*/


interface Commands {


companion object {


/**


  • 心跳包


*/


const val HEART_BEAT: Byte = 0


/**


  • 登录(App 需要告诉 Watcher :cameraPosition 的位置)


*/


const val LOGIN: Byte = 1


......


}


}


由于使用自定义的协议,必须要有对报文的 encode、decode,PacketManager 负责这些事情。


encode 时按照协议的结构进行组装报文,同理 decode 是其逆向的过程。


/**


  • 报文的管理类,对报文进行 encode、decode


*/


object PacketManager {


fun encode(packet: Packet):ByteBuf = encode(ByteBufAllocator.DEFAULT, packet)


fun encode(alloc:ByteBufAllocator, packet: Packet) = encode(alloc.ioBuffer(), packet)


fun encode(buf: ByteBuf, packet: Packet): ByteBuf {


val serializer = SerializerFactory.getSerializer(packet.serializeMethod)


val bytes: ByteArray = serializer.serialize(packet)


//组装报文:魔数(4 字节)+ 版本号(1 字节)+ 序列化方式(1 字节)+ 指令(1 字节)+ 数据长度(4 字节)+ 数据(N 字节)


buf.writeInt(MAGIC_NUMBER)


buf.writeByte(packet.version.toInt())


buf.writeByte(packet.serializeMethod.toInt())


buf.writeByte(packet.command.toInt())


buf.writeInt(bytes.size)


buf.writeBytes(bytes)


return buf


}


fun decode(buf:ByteBuf): Packet {


buf.skipBytes(4) // 魔数由单独的 Handler 进行校验


buf.skipBytes(1)


val serializationMethod = buf.readByte()


val serializer = SerializerFactory.getSerializer(serializationMethod)


val command = buf.readByte()


val clazz = PacketFactory.getPacket(command)


val length = buf.readInt() // 数据的长度


val bytes = ByteArray(length) // 定义需要读取的字符数组


buf.readBytes(bytes)


return serializer.deserialize(clazz, bytes)


}


}


[](


)三. TCP 服务端


==========================================================================


启动 TCP 服务的方法


fun execute() {


boss = NioEventLoopGroup()


worker = NioEventLoopGroup()


val bootstrap = ServerBootstrap()


bootstrap.group(boss, worker).channel(NioServerSocketChannel::class.java)


.option(ChannelOption.SO_BACKLOG, 100)


.childOption(ChannelOption.SO_KEEPALIVE, true)


.childOption(ChannelOption.SO_REUSEADDR, true)


.childOption(ChannelOption.TCP_NODELAY, true)


.childHandler(object : ChannelInitializer<NioSocketChannel>() {


@Throws(Exception::class)


override fun initChannel(nioSocketChannel: NioSocketChannel) {


val pipeline = nioSocketChannel.pipeline()


pipeline.addLast(ServerIdleHandler())


pipeline.addLast(MagicNumValidator())


pipeline.addLast(PacketCodecHandler)


pipeline.addLast(HeartBeatHandler)


pipeline.addLast(ResponseHandler)


}


})


val future: ChannelFuture = bootstrap.bind(TCP_PORT)


future.addListener(object : ChannelFutureListener {


@Throws(Exception::class)


override fun operationComplete(channelFuture: ChannelFuture) {


if (channelFuture.isSuccess) {


logInfo(logger, "TCP Server is starting...")


} else {


logError(logger,channelFuture.cause(),"TCP Server failed")


}


}


})


}


其中,ServerIdleHandler: 表示 5 分钟内没有收到心跳,则断开连接。


class ServerIdleHandler : IdleStateHandler(0, 0, HERT_BEAT_TIME) {


private val logger: Logger = LoggerFactory.getLogger(ServerIdleHandler::class.java)


@Throws(Exception::class)


override fun channelIdle(ctx: ChannelHandlerContext, evt: IdleStateEvent) {


logInfo(logger) {


ctx.channel().close()


"$HERT_BEAT_TIME 秒内没有收到心跳,则断开连接"


}


}


companion object {


private const val HERT_BEAT_TIME = 300


}


}


MagicNumValidator:用于 TCP 报文的魔数校验。


class MagicNumValidator : LengthFieldBasedFrameDecoder(Int.MAX_VALUE, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH) {


private val logger: Logger = LoggerFactory.getLogger(this.javaClass)


@Throws(Exception::class)


override fun decode(ctx: ChannelHandlerContext, in: ByteBuf): Any? {


if (in.getInt(in.readerIndex()) !== MAGIC_NUMBER) { // 魔数校验不通过,则关闭连接


logInfo(logger,"魔数校验失败")


ctx.channel().close()


return null


}


return super.decode(ctx, in)


}


companion object {


private const val LENGTH_FIELD_OFFSET = 7


private const val LENGTH_FIELD_LENGTH = 4


}


}


PacketCodecHandler: 解析报文的 Handler。


PacketCodecHandler 继承自 ByteToMessageCodec ,它是用来处理 byte-to-message 和 message-to-byte,便于解码字节消息成 POJO 或编码 POJO 消息成字节。


@ChannelHandler.Sharable


object PacketCodecHandler : MessageToMessageCodec<ByteBuf, Packet>() {


override fun encode(ctx: ChannelHandlerContext, msg: Packet, list: MutableList<Any>) {


val byteBuf = ctx.channel().alloc().ioBuffer()


PacketManager.encode(byteBuf, msg)


list.add(byteBuf)


}


override fun decode(ctx: ChannelHandlerContext, msg: ByteBuf, list: MutableList<Any>) {


list.add(PacketManager.decode(msg));


}


}


HeartBeatHandler:心跳的 Handler,接收 TCP 客户端发来的"ping",然后给客户端返回"pong"。


@ChannelHandle


《Android学习笔记总结+最新移动架构视频+大厂安卓面试真题+项目实战源码讲义》
浏览器打开:qq.cn.hn/FTe 免费领取
复制代码


r.Sharable


object HeartBeatHandler : SimpleChannelInboundHandler<HeartBeatPacket>(){


private val logger: Logger = LoggerFactory.getLogger(this.javaClass)


override fun channelRead0(ctx: ChannelHandlerContext, msg: HeartBeatPacket) {


logInfo(logger,"收到心跳包:${GsonUtils.toJson(msg)}")


msg.msg = "pong" // 返回 pong 给到客户端


ctx.writeAndFlush(msg)


}


}


ResponseHandler:通用的处理接收 TCP 客户端发来指令的 Handler,可以根据对应的指令去查询对应的 Handler 并处理其命令。


object ResponseHandler: SimpleChannelInboundHandler<Packet>() {


private val logger: Logger = LoggerFactory.getLogger(this.javaClass)


private val handlerMap: ConcurrentHashMap<Byte, SimpleChannelInboundHandler<out Packet>> = ConcurrentHashMap()


init {


handlerMap[LOGIN] = LoginHandler


......


handlerMap[ERROR] = ErrorHandler


}


override fun channelRead0(ctx: ChannelHandlerContext, msg: Packet) {


logInfo(logger,"收到客户端的指令: ${msg.command}")


val handler: SimpleChannelInboundHandler<out Packet>? = handlerMap[msg.command]


handler?.let {


logInfo(logger,"找到响应指令的 Handler: ${it.javaClass.simpleName}")


it.channelRead(ctx, msg)


} ?: logInfo(logger,"未找到响应指令的 Handler")


}


@Throws(Exception::class)


override fun channelInactive(ctx: ChannelHandlerContext) {


val insocket = ctx.channel().remoteAddress() as InetSocketAddress


val clientIP = insocket.address.hostAddress


val clientPort = insocket.port

用户头像

Android架构

关注

还未添加个人签名 2021.10.31 加入

还未添加个人简介

评论

发布
暂无评论
基于 Kotlin + Netty 实现一个简单的 TCP 自定义协议,kotlin扩展函数原理