一文带你掌握物联网 Mqtt 网关搭建背后的技术原理
- 2023-02-02  中国香港
- 本文字数:30799 字 - 阅读完需:约 101 分钟 

本文分享自华为云社区《一文带你掌握物联网mqtt网关搭建背后的技术原理》,作者:张俭。
前言
物联网是现在比较热门的软件领域,众多物联网厂商都有自己的物联网平台,而物联网平台其中一个核心的模块就是 Mqtt 网关。这篇文章的目的是手把手教大家写书写一个 mqtt 网关,后端存储支持 Kafka/Pulsar,支持 mqtt 连接、断链、发送消息、订阅消息。技术选型:
- Netty java 最流行的网络框架 
- netty-codec-mqtt netty 的子项目,mqtt 编解码插件 
- Pulsar/Kafka 流行的消息中间件作为后端存储 
核心 pom 依赖如下
        <dependency>            <groupId>io.netty</groupId>            <artifactId>netty-codec-mqtt</artifactId>        </dependency>        <dependency>            <groupId>io.netty</groupId>            <artifactId>netty-common</artifactId>        </dependency>        <dependency>            <groupId>io.netty</groupId>            <artifactId>netty-transport</artifactId>        </dependency>        <dependency>            <groupId>org.apache.pulsar</groupId>            <artifactId>pulsar-client-original</artifactId>            <version>${pulsar.version}</version>        </dependency>        <dependency>            <groupId>org.apache.kafka</groupId>            <artifactId>kafka-clients</artifactId>            <version>${kafka.version}</version>        </dependency>        <dependency>            <groupId>org.eclipse.paho</groupId>            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>            <version>${mqtt-client.version}</version>            <scope>test</scope>        </dependency>软件参数设计
软件参数可谓是非常常见,复杂的开源项目,参数甚至可以达到上百个、配置文件长达数千行。我们需要的配置有
MqttServer 监听的端口
监听端口的配置即使是写 demo 也非常必要,常常用在单元测试中,由于单元测试跑完之后,即使网络服务器关闭,操作系统也不会立即释放端口,所以单元测试的时候指定随机端口非常关键,在 java 中,我们可以通过这样的工具类来获取一个空闲的端口。未配置的话,我们就使用 mqtt 的默认端口 1883。
package io.github.protocol.mqtt.broker.util;
import java.io.IOException;import java.io.UncheckedIOException;import java.net.ServerSocket;
public class SocketUtil {
    public static int getFreePort() {        try (ServerSocket serverSocket = new ServerSocket(0)) {            return serverSocket.getLocalPort();        } catch (IOException e) {            throw new UncheckedIOException(e);        }    }
}后端存储配置
我们的 mqtt 网关是没有可靠的存储能力的,依赖后端的消息中间件来做持久化处理。后端规划支持 Pulsar、Kafka 两种类型。定义枚举类如下
public enum ProcessorType {    KAFKA,    PULSAR,}
对应的 KafkaProcessorConfig、PulsarProcessorConfig 比较简单,包含基础的连接地址即可,如果后续要做性能调优、安全,这块还是会有更多的配置项
@Setter@Getterpublic class KafkaProcessorConfig {
    private String bootstrapServers = "localhost:9092";
    public KafkaProcessorConfig() {    }}@Setter@Getterpublic class PulsarProcessorConfig {
    private String httpUrl = "http://localhost:8080";
    private String serviceUrl = "pulsar://localhost:6650";
    public PulsarProcessorConfig() {    }}启动 netty MqttServer
我们通过 netty 启动一个 mqttServer,添加 mqtt 解码器
package io.github.protocol.mqtt.broker;
import io.github.protocol.mqtt.broker.processor.KafkaProcessor;import io.github.protocol.mqtt.broker.processor.KafkaProcessorConfig;import io.github.protocol.mqtt.broker.processor.MqttProcessor;import io.github.protocol.mqtt.broker.processor.PulsarProcessor;import io.github.protocol.mqtt.broker.processor.PulsarProcessorConfig;import io.github.protocol.mqtt.broker.util.SocketUtil;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.ChannelPipeline;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.mqtt.MqttDecoder;import io.netty.handler.codec.mqtt.MqttEncoder;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;import lombok.extern.slf4j.Slf4j;
@Slf4jpublic class MqttServer {
    private final MqttServerConfig mqttServerConfig;
    public MqttServer() {        this(new MqttServerConfig());    }
    public MqttServer(MqttServerConfig mqttServerConfig) {        this.mqttServerConfig = mqttServerConfig;        if (mqttServerConfig.getPort() == 0) {            mqttServerConfig.setPort(SocketUtil.getFreePort());        }    }
    public void start() throws Exception {        EventLoopGroup bossGroup = new NioEventLoopGroup(1);        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            ServerBootstrap b = new ServerBootstrap();            b.group(bossGroup, workerGroup)                    .channel(NioServerSocketChannel.class)                    .option(ChannelOption.SO_BACKLOG, 100)                    .handler(new LoggingHandler(LogLevel.INFO))                    .childHandler(new ChannelInitializer<SocketChannel>() {                        @Override                        public void initChannel(SocketChannel ch) throws Exception {                            ChannelPipeline p = ch.pipeline();                            // decoder                            p.addLast(new MqttDecoder());                            p.addLast(MqttEncoder.INSTANCE);                        }                    });
            // Start the server.            ChannelFuture f = b.bind(mqttServerConfig.getPort()).sync();
            // Wait until the server socket is closed.            f.channel().closeFuture().sync();        } finally {            // Shut down all event loops to terminate all threads.            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }
    private MqttProcessor processor(MqttServerConfig config) {        return switch (config.getProcessorType()) {            case KAFKA -> new KafkaProcessor(config.getMqttAuth(), config.getKafkaProcessorConfig());            case PULSAR -> new PulsarProcessor(config.getMqttAuth(), config.getPulsarProcessorConfig());        };    }
    public int getPort() {        return mqttServerConfig.getPort();    }
}MqttserverStarter.java
我们写一个简单的 main 函数用来启动 mqttServer,方便调测
package io.github.protocol.mqtt.broker;
public class MqttServerStarter {
    public static void main(String[] args) throws Exception {        new MqttServer().start();    }
}客户端使用 eclipse mqtt client 进行测试
package io.github.protocol.mqtt;
import lombok.extern.log4j.Log4j2;import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
@Log4j2public class MqttClientPublishExample {
    public static void main(String[] args) throws Exception {        String topic = "MQTT Examples";        String content = "Message from MqttPublishExample";        int qos = 2;        String broker = "tcp://127.0.0.1:1883";        String clientId = "JavaSample";        MemoryPersistence persistence = new MemoryPersistence();
        try {            MqttClient sampleClient = new MqttClient(broker, clientId, persistence);            MqttConnectOptions connOpts = new MqttConnectOptions();            connOpts.setCleanSession(true);            log.info("Connecting to broker: {}", broker);            sampleClient.connect(connOpts);            log.info("Connected");            log.info("Publishing message: {}", content);            MqttMessage message = new MqttMessage(content.getBytes());            message.setQos(qos);            sampleClient.publish(topic, message);            log.info("Message published");            sampleClient.disconnect();            log.info("Disconnected");            System.exit(0);        } catch (MqttException me) {            log.error("reason {} msg {}", me.getReasonCode(), me.getMessage(), me);        }    }
}然后我们先运行 MqttServer,再运行 MqttClient,发现 MqttClient 卡住了
Connecting to broker: tcp://127.0.0.1:1883
这是为什么呢,我们通过抓包发现仅仅只有客户端发送了 Mqtt connect 信息,服务端并没有响应
 
 但是根据 mqtt 标准协议,发送 Connect 消息,必须要有 ConnAck 响应
 
 所以我们需要在接收到 Connect 后,返回 connAck 消息。我们创建一个 MqttHandler,让他继承 ChannelInboundHandlerAdapter, 用来接力 MqttDecoder 解码完成后的消息,这里要重点继承其中的 channelRead 方法,以及 channelInactive 方法,用来释放断链时需要释放的资源
package com.github.shoothzj.mqtt;
import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import lombok.extern.slf4j.Slf4j;
@Slf4jpublic class MqttHandler extends ChannelInboundHandlerAdapter {
    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        super.channelRead(ctx, msg);    }
}然后把这个 handler 加入到 netty 的职责链中,放到解码器的后面
 
 在 mqtt handler 中插入我们的代码
    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        super.channelRead(ctx, msg);        if (msg instanceof MqttConnectMessage) {            handleConnect(ctx, (MqttConnectMessage) msg);        } else {            log.error("Unsupported type msg [{}]", msg);        }    }
    private void handleConnect(ChannelHandlerContext ctx, MqttConnectMessage connectMessage) {        log.info("connect msg is [{}]", connectMessage);    }打印出 connectMessage 如下
[MqttConnectMessage[fixedHeader=MqttFixedHeader[messageType=CONNECT, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=22], variableHeader=MqttConnectVariableHeader[name=MQTT, version=4, hasUserName=false, hasPassword=false, isWillRetain=false, isWillFlag=false, isCleanSession=true, keepAliveTimeSeconds=60], payload=MqttConnectPayload[clientIdentifier=JavaSample, willTopic=null, willMessage=null, userName=null, password=null]]]
通常,mqtt connect message 中会包含 qos、用户名、密码等信息,由于我们启动客户端的时候也没有携带用户名和密码,这里获取到的都为 null,我们先不校验这些消息,直接给客户端返回 connack 消息,代表连接成功
        final MqttConnAckMessage ackMessage = MqttMessageBuilders.connAck().returnCode(CONNECTION_ACCEPTED).build();        ctx.channel().writeAndFlush(ackMessage);
我们再运行起 Server 和 Client,随后可以看到已经走过了 Connect 阶段,进入了 publish message 过程,接下来我们再实现更多的其他场景
 
 附上此阶段的 MqttHandler 代码
package com.github.shoothzj.mqtt;
import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.handler.codec.mqtt.MqttConnAckMessage;import io.netty.handler.codec.mqtt.MqttConnectMessage;import io.netty.handler.codec.mqtt.MqttConnectPayload;import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;import io.netty.handler.codec.mqtt.MqttFixedHeader;import io.netty.handler.codec.mqtt.MqttMessageBuilders;import lombok.extern.slf4j.Slf4j;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED;
@Slf4jpublic class MqttHandler extends ChannelInboundHandlerAdapter {
    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        super.channelRead(ctx, msg);        if (msg instanceof MqttConnectMessage) {            handleConnect(ctx, (MqttConnectMessage) msg);        } else {            log.error("Unsupported type msg [{}]", msg);        }    }
    private void handleConnect(ChannelHandlerContext ctx, MqttConnectMessage connectMessage) {        log.info("connect msg is [{}]", connectMessage);        final MqttFixedHeader fixedHeader = connectMessage.fixedHeader();        final MqttConnectVariableHeader variableHeader = connectMessage.variableHeader();        final MqttConnectPayload connectPayload = connectMessage.payload();        final MqttConnAckMessage ackMessage = MqttMessageBuilders.connAck().returnCode(CONNECTION_ACCEPTED).build();        ctx.channel().writeAndFlush(ackMessage);    }
}我们当前把所有的逻辑都放在 MqttHandler 里面,不方便后续的扩展。抽象出一个 MqttProcessor 接口来处理具体的请求,MqttHandler 负责解析 MqttMessage 的类型并分发。MqttProcess 接口设计如下
package io.github.protocol.mqtt.broker.processor;
import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.mqtt.MqttConnAckMessage;import io.netty.handler.codec.mqtt.MqttConnectMessage;import io.netty.handler.codec.mqtt.MqttMessage;import io.netty.handler.codec.mqtt.MqttPubAckMessage;import io.netty.handler.codec.mqtt.MqttPublishMessage;import io.netty.handler.codec.mqtt.MqttSubAckMessage;import io.netty.handler.codec.mqtt.MqttSubscribeMessage;import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
public interface MqttProcessor {
    void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) throws Exception;
    void processConnAck(ChannelHandlerContext ctx, MqttConnAckMessage msg) throws Exception;
    void processPublish(ChannelHandlerContext ctx, MqttPublishMessage msg) throws Exception;
    void processPubAck(ChannelHandlerContext ctx, MqttPubAckMessage msg) throws Exception;
    void processPubRec(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;
    void processPubRel(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;
    void processPubComp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;
    void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage msg) throws Exception;
    void processSubAck(ChannelHandlerContext ctx, MqttSubAckMessage msg) throws Exception;
    void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage msg) throws Exception;
    void processUnsubAck(ChannelHandlerContext ctx, MqttUnsubAckMessage msg) throws Exception;
    void processPingReq(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;
    void processPingResp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;
    void processDisconnect(ChannelHandlerContext ctx) throws Exception;
    void processAuth(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;
}我们允许这些方法抛出异常,当遇到极难处理的故障时,把 mqtt 连接断掉(如后端存储故障),等待客户端的重连。
MqttHandler 中来调用 MqttProcessor,相关 MqttHandler 代码如下
        Preconditions.checkArgument(message instanceof MqttMessage);        MqttMessage msg = (MqttMessage) message;        try {            if (msg.decoderResult().isFailure()) {                Throwable cause = msg.decoderResult().cause();                if (cause instanceof MqttUnacceptableProtocolVersionException) {                    // Unsupported protocol version                    MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(                            new MqttFixedHeader(MqttMessageType.CONNACK,                                    false, MqttQoS.AT_MOST_ONCE, false, 0),                            new MqttConnAckVariableHeader(                                    MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION,                                    false), null);                    ctx.writeAndFlush(connAckMessage);                    log.error("connection refused due to invalid protocol, client address [{}]",                            ctx.channel().remoteAddress());                    ctx.close();                    return;                } else if (cause instanceof MqttIdentifierRejectedException) {                    // ineligible clientId                    MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(                            new MqttFixedHeader(MqttMessageType.CONNACK,                                    false, MqttQoS.AT_MOST_ONCE, false, 0),                            new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED,                                    false), null);                    ctx.writeAndFlush(connAckMessage);                    log.error("ineligible clientId, client address [{}]", ctx.channel().remoteAddress());                    ctx.close();                    return;                }                throw new IllegalStateException(msg.decoderResult().cause().getMessage());            }            MqttMessageType messageType = msg.fixedHeader().messageType();            if (log.isDebugEnabled()) {                log.debug("Processing MQTT Inbound handler message, type={}", messageType);            }            switch (messageType) {                case CONNECT:                    Preconditions.checkArgument(msg instanceof MqttConnectMessage);                    processor.processConnect(ctx, (MqttConnectMessage) msg);                    break;                case CONNACK:                    Preconditions.checkArgument(msg instanceof MqttConnAckMessage);                    processor.processConnAck(ctx, (MqttConnAckMessage) msg);                    break;                case PUBLISH:                    Preconditions.checkArgument(msg instanceof MqttPublishMessage);                    processor.processPublish(ctx, (MqttPublishMessage) msg);                    break;                case PUBACK:                    Preconditions.checkArgument(msg instanceof MqttPubAckMessage);                    processor.processPubAck(ctx, (MqttPubAckMessage) msg);                    break;                case PUBREC:                    processor.processPubRec(ctx, msg);                    break;                case PUBREL:                    processor.processPubRel(ctx, msg);                    break;                case PUBCOMP:                    processor.processPubComp(ctx, msg);                    break;                case SUBSCRIBE:                    Preconditions.checkArgument(msg instanceof MqttSubscribeMessage);                    processor.processSubscribe(ctx, (MqttSubscribeMessage) msg);                    break;                case SUBACK:                    Preconditions.checkArgument(msg instanceof MqttSubAckMessage);                    processor.processSubAck(ctx, (MqttSubAckMessage) msg);                    break;                case UNSUBSCRIBE:                    Preconditions.checkArgument(msg instanceof MqttUnsubscribeMessage);                    processor.processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);                    break;                case UNSUBACK:                    Preconditions.checkArgument(msg instanceof MqttUnsubAckMessage);                    processor.processUnsubAck(ctx, (MqttUnsubAckMessage) msg);                    break;                case PINGREQ:                    processor.processPingReq(ctx, msg);                    break;                case PINGRESP:                    processor.processPingResp(ctx, msg);                    break;                case DISCONNECT:                    processor.processDisconnect(ctx);                    break;                case AUTH:                    processor.processAuth(ctx, msg);                    break;                default:                    throw new UnsupportedOperationException("Unknown MessageType: " + messageType);            }        } catch (Throwable ex) {            ReferenceCountUtil.safeRelease(msg);            log.error("Exception was caught while processing MQTT message, ", ex);            ctx.close();        }这里的代码,主要是针对 MqttMessage 的不同类型,调用 MqttProcessor 的不同方法,值得一提的有两点
- 提前判断了一些解码异常,fast fail 
- 全局捕获异常,并进行断链处理 
维护 MqttSession
维护 Mqtt 会话的 session,主要用来持续跟踪客户端会话信息,跟踪在系统中占用的资源等,考虑到无论是何种后端实现,都需要维护 Mqtt 的 Session,我们构筑一个 AbstractMqttProcessor 来维护 MqttSession
package io.github.protocol.mqtt.broker.processor;
import io.github.protocol.mqtt.broker.MqttSessionKey;import io.github.protocol.mqtt.broker.auth.MqttAuth;import io.github.protocol.mqtt.broker.util.ChannelUtils;import io.github.protocol.mqtt.broker.util.MqttMessageUtil;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.mqtt.MqttConnAckMessage;import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;import io.netty.handler.codec.mqtt.MqttConnectMessage;import io.netty.handler.codec.mqtt.MqttConnectReturnCode;import io.netty.handler.codec.mqtt.MqttFixedHeader;import io.netty.handler.codec.mqtt.MqttMessage;import io.netty.handler.codec.mqtt.MqttMessageFactory;import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;import io.netty.handler.codec.mqtt.MqttMessageType;import io.netty.handler.codec.mqtt.MqttPubAckMessage;import io.netty.handler.codec.mqtt.MqttPublishMessage;import io.netty.handler.codec.mqtt.MqttQoS;import io.netty.handler.codec.mqtt.MqttSubAckMessage;import io.netty.handler.codec.mqtt.MqttSubAckPayload;import io.netty.handler.codec.mqtt.MqttSubscribeMessage;import io.netty.handler.codec.mqtt.MqttSubscribePayload;import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;import lombok.extern.slf4j.Slf4j;import org.apache.commons.lang3.StringUtils;
import java.util.stream.IntStream;
@Slf4jpublic abstract class AbstractProcessor implements MqttProcessor {
    protected final MqttAuth mqttAuth;
    public AbstractProcessor(MqttAuth mqttAuth) {        this.mqttAuth = mqttAuth;    }
    @Override    public void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) throws Exception {        String clientId = msg.payload().clientIdentifier();        String username = msg.payload().userName();        byte[] pwd = msg.payload().passwordInBytes();        if (StringUtils.isBlank(clientId) || StringUtils.isBlank(username)) {            MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(                    new MqttFixedHeader(MqttMessageType.CONNACK,                            false, MqttQoS.AT_MOST_ONCE, false, 0),                    new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED,                            false), null);            ctx.writeAndFlush(connAckMessage);            log.error("the clientId username pwd cannot be empty, client address[{}]", ctx.channel().remoteAddress());            ctx.close();            return;        }        if (!mqttAuth.connAuth(clientId, username, pwd)) {            MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(                    new MqttFixedHeader(MqttMessageType.CONNACK,                            false, MqttQoS.AT_MOST_ONCE, false, 0),                    new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD,                            false), null);            ctx.writeAndFlush(connAckMessage);            log.error("the clientId username pwd cannot be empty, client address[{}]", ctx.channel().remoteAddress());            ctx.close();            return;        }
        MqttSessionKey mqttSessionKey = new MqttSessionKey();        mqttSessionKey.setUsername(username);        mqttSessionKey.setClientId(clientId);        ChannelUtils.setMqttSession(ctx.channel(), mqttSessionKey);        log.info("username {} clientId {} remote address {} connected",                username, clientId, ctx.channel().remoteAddress());        onConnect(mqttSessionKey);        MqttConnAckMessage mqttConnectMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(                new MqttFixedHeader(MqttMessageType.CONNACK,                        false, MqttQoS.AT_MOST_ONCE, false, 0),                new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, false),                null);        ctx.writeAndFlush(mqttConnectMessage);    }
    protected void onConnect(MqttSessionKey mqttSessionKey) {    }
    @Override    public void processConnAck(ChannelHandlerContext ctx, MqttConnAckMessage msg) throws Exception {        MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());        if (mqttSession == null) {            log.error("conn ack, client address {} not authed", ctx.channel().remoteAddress());            ctx.close();        }    }
    @Override    public void processPublish(ChannelHandlerContext ctx, MqttPublishMessage msg) throws Exception {        MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());        if (mqttSession == null) {            log.error("publish, client address {} not authed", ctx.channel().remoteAddress());            ctx.close();            return;        }        if (msg.fixedHeader().qosLevel() == MqttQoS.FAILURE) {            log.error("failure. clientId {}, username {} ", mqttSession.getClientId(), mqttSession.getUsername());            return;        }        if (msg.fixedHeader().qosLevel() == MqttQoS.EXACTLY_ONCE) {            log.error("does not support QoS2 protocol. clientId {}, username {} ",                    mqttSession.getClientId(), mqttSession.getUsername());            return;        }        onPublish(ctx, mqttSession, msg);    }
    protected void onPublish(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,                             MqttPublishMessage msg) throws Exception {    }
    @Override    public void processPubAck(ChannelHandlerContext ctx, MqttPubAckMessage msg) throws Exception {        MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());        if (mqttSession == null) {            log.error("pub ack, client address {} not authed", ctx.channel().remoteAddress());            ctx.close();        }    }
    @Override    public void processPubRec(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {        MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());        if (mqttSession == null) {            log.error("pub rec, client address {} not authed", ctx.channel().remoteAddress());            ctx.close();        }    }
    @Override    public void processPubRel(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {        MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());        if (mqttSession == null) {            log.error("pub rel, client address {} not authed", ctx.channel().remoteAddress());            ctx.close();        }    }
    @Override    public void processPubComp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {        MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());        if (mqttSession == null) {            log.error("pub comp, client address {} not authed", ctx.channel().remoteAddress());            ctx.close();        }    }
    @Override    public void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage msg) throws Exception {        MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());        if (mqttSession == null) {            log.error("sub, client address {} not authed", ctx.channel().remoteAddress());            ctx.close();        }        onSubscribe(ctx, mqttSession, msg.payload());        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBACK,                false, MqttQoS.AT_MOST_ONCE, false, 0);        IntStream intStream = msg.payload().topicSubscriptions().stream().mapToInt(s -> s.qualityOfService().value());        MqttSubAckPayload payload = new MqttSubAckPayload(intStream.toArray());        ctx.writeAndFlush(MqttMessageFactory.newMessage(                fixedHeader,                MqttMessageIdVariableHeader.from(msg.variableHeader().messageId()),                payload));    }
    protected void onSubscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,                               MqttSubscribePayload subscribePayload) throws Exception {    }
    @Override    public void processSubAck(ChannelHandlerContext ctx, MqttSubAckMessage msg) throws Exception {        MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());        if (mqttSession == null) {            log.error("sub ack, client address {} not authed", ctx.channel().remoteAddress());            ctx.close();        }    }
    @Override    public void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage msg) throws Exception {        MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());        if (mqttSession == null) {            log.error("unsub, client address {} not authed", ctx.channel().remoteAddress());            ctx.close();        }    }
    @Override    public void processUnsubAck(ChannelHandlerContext ctx, MqttUnsubAckMessage msg) throws Exception {        MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());        if (mqttSession == null) {            log.error("unsub ack, client address {} not authed", ctx.channel().remoteAddress());            ctx.close();        }    }
    @Override    public void processPingReq(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {        ctx.writeAndFlush(MqttMessageUtil.pingResp());    }
    @Override    public void processPingResp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {        MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());        if (mqttSession == null) {            log.error("ping resp, client address {} not authed", ctx.channel().remoteAddress());            ctx.close();        }    }
    @Override    public void processDisconnect(ChannelHandlerContext ctx) throws Exception {        MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());        if (mqttSession == null) {            log.error("disconnect, client address {} not authed", ctx.channel().remoteAddress());        }        onDisconnect(mqttSession);    }
    protected void onDisconnect(MqttSessionKey mqttSessionKey) {    }
    @Override    public void processAuth(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {        MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());        if (mqttSession == null) {            log.error("auth, client address {} not authed", ctx.channel().remoteAddress());            ctx.close();        }    }}可以看到,这里的 AbstractProcessor 主要是维护了 MqttSessionKey,校验 MqttSessionKey,并拦截 publish 中不支持的 Qos2、Failure。同时,也影响了 mqtt 心跳请求。同样的,我们允许在 onPublish、onSubscribe 中抛出异常。
基于消息队列实现的 mqtt 网关的基础思想也比较简单,简而言之就是,有 publish 消息的时候向消息队列中生产消息。有订阅的时候就从消息队列中拉取消息。由此延伸出来,我们可能需要维护每个 mqtt topic 和 producer、consumer 的对应关系,因为像 kafka、pulsar 这些消息中间件的消费者都是区分 topic 的,片段通用代码如下:
    protected final ReentrantReadWriteLock.ReadLock rLock;
    protected final ReentrantReadWriteLock.WriteLock wLock;
    protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionProducerMap;
    protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionConsumerMap;
    protected final Map<MqttTopicKey, P> producerMap;
    protected final Map<MqttTopicKey, C> consumerMap;
    public AbstractMqProcessor(MqttAuth mqttAuth) {        super(mqttAuth);        ReentrantReadWriteLock lock = new ReentrantReadWriteLock();        rLock = lock.readLock();        wLock = lock.writeLock();        this.sessionProducerMap = new HashMap<>();        this.sessionConsumerMap = new HashMap<>();        this.producerMap = new HashMap<>();        this.consumerMap = new HashMap<>();    }
    @Override    protected void onConnect(MqttSessionKey mqttSessionKey) {        wLock.lock();        try {            sessionProducerMap.put(mqttSessionKey, new ArrayList<>());            sessionConsumerMap.put(mqttSessionKey, new ArrayList<>());        } finally {            wLock.unlock();        }    }
    @Override    protected void onDisconnect(MqttSessionKey mqttSessionKey) {        wLock.lock();        try {            // find producers            List<MqttTopicKey> produceTopicKeys = sessionProducerMap.get(mqttSessionKey);            if (produceTopicKeys != null) {                for (MqttTopicKey mqttTopicKey : produceTopicKeys) {                    P producer = producerMap.get(mqttTopicKey);                    if (producer != null) {                        ClosableUtils.close(producer);                        producerMap.remove(mqttTopicKey);                    }                }            }            sessionProducerMap.remove(mqttSessionKey);            List<MqttTopicKey> consumeTopicKeys = sessionConsumerMap.get(mqttSessionKey);            if (consumeTopicKeys != null) {                for (MqttTopicKey mqttTopicKey : consumeTopicKeys) {                    C consumer = consumerMap.get(mqttTopicKey);                    if (consumer != null) {                        ClosableUtils.close(consumer);                        consumerMap.remove(mqttTopicKey);                    }                }            }            sessionConsumerMap.remove(mqttSessionKey);        } finally {            wLock.unlock();        }    }}kafka processor 实现
由于 kafka producer 不区分 topic,我们可以在 kafka processor 中复用 producer,在将来单个 kafka producer 的性能到达上限时,我们可以将 kafka producer 扩展为 kafka producer 列表进行轮询处理,消费者由于 mqtt 协议可能针对每个订阅 topic 有不同的行为,不合适复用同一个消费者实例。我们在构造函数中启动 KafkaProducer
    private final KafkaProcessorConfig kafkaProcessorConfig;
    private final KafkaProducer<String, ByteBuffer> producer;
    public KafkaProcessor(MqttAuth mqttAuth, KafkaProcessorConfig kafkaProcessorConfig) {        super(mqttAuth);        this.kafkaProcessorConfig = kafkaProcessorConfig;        this.producer = createProducer();    }
    protected KafkaProducer<String, ByteBuffer> createProducer() {        Properties properties = new Properties();        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProcessorConfig.getBootstrapServers());        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer.class);        return new KafkaProducer<>(properties);    }处理 MqttPublish 消息,MqttPublish 消息包含如下几个关键参数
MqttQoS mqttQoS = publishMessage.fixedHeader().qosLevel();String topic = publishMessage.variableHeader().topicName();ByteBuffer byteBuffer = publishMessage.payload().nioBuffer();
其中
- qos 代表这条消息的质量级别,0 没有任何保障,1 代表至少一次,2 代表恰好一次。当前仅支持 qos0、qos1 
- topicName 就是 topic 的名称 
- ByteBuffer 就是消息的内容 
根据 topic、qos 发送消息,代码如下
        String topic = msg.variableHeader().topicName();        ProducerRecord<String, ByteBuffer> record = new ProducerRecord<>(topic, msg.payload().nioBuffer());        switch (msg.fixedHeader().qosLevel()) {            case AT_MOST_ONCE -> producer.send(record, (metadata, exception) -> {                if (exception != null) {                    log.error("mqttSessionKey {} send message to kafka error", mqttSessionKey, exception);                    return;                }                log.debug("mqttSessionKey {} send message to kafka success, topic {}, partition {}, offset {}",                        mqttSessionKey, metadata.topic(), metadata.partition(), metadata.offset());            });            case AT_LEAST_ONCE -> {                try {                    RecordMetadata recordMetadata = producer.send(record).get();                    log.info("mqttSessionKey {} send message to kafka success, topic {}, partition {}, offset {}",                            mqttSessionKey, recordMetadata.topic(),                            recordMetadata.partition(), recordMetadata.offset());                    ctx.writeAndFlush(MqttMessageUtil.pubAckMessage(msg.variableHeader().packetId()));                } catch (Exception e) {                    log.error("mqttSessionKey {} send message to kafka error", mqttSessionKey, e);                }            }            case EXACTLY_ONCE, FAILURE -> throw new IllegalStateException(                    String.format("mqttSessionKey %s can not reach here", mqttSessionKey));        }处理订阅消息,我们暂时仅根据订阅的 topic,创建 topic 进行消费即可,由于 kafka 原生客户端建议的消费代码模式如下
while (true) {  ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(1));  for (ConsumerRecord<String, byte[]> record : records) {    // do logic  }}
我们需要切换到其他线程对 consumer 进行消息,书写一个 KafkaConsumerListenerWrapper 的 wrapper,转换为 listener 异步消费模型
package io.github.protocol.mqtt.broker.processor;
import lombok.extern.slf4j.Slf4j;import org.apache.kafka.clients.admin.AdminClient;import org.apache.kafka.clients.admin.AdminClientConfig;import org.apache.kafka.clients.admin.KafkaAdminClient;import org.apache.kafka.clients.admin.NewTopic;import org.apache.kafka.clients.admin.TopicDescription;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;import org.apache.kafka.common.errors.WakeupException;import org.apache.kafka.common.serialization.ByteArrayDeserializer;import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;import java.util.Collections;import java.util.Properties;import java.util.concurrent.ExecutionException;
@Slf4jpublic class KafkaConsumerListenerWrapper implements AutoCloseable {
    private final AdminClient adminClient;
    private final KafkaConsumer<String, byte[]> consumer;
    public KafkaConsumerListenerWrapper(KafkaProcessorConfig config, String username) {        Properties adminProperties = new Properties();        adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());        this.adminClient = KafkaAdminClient.create(adminProperties);        Properties properties = new Properties();        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());        properties.put(ConsumerConfig.GROUP_ID_CONFIG, username);        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);        this.consumer = new KafkaConsumer<>(properties);    }
    public void start(String topic, KafkaMessageListener listener) throws Exception {        try {            TopicDescription topicDescription = adminClient.describeTopics(Collections.singletonList(topic))                    .values().get(topic).get();            log.info("topic info is {}", topicDescription);        } catch (ExecutionException ee) {            if (ee.getCause() instanceof UnknownTopicOrPartitionException) {                log.info("topic {} not exist, create it", topic);                adminClient.createTopics(Collections.singletonList(new NewTopic(topic, 1, (short) 1)));            } else {                log.error("find topic info {} error", topic, ee);            }        } catch (Exception e) {            throw new IllegalStateException("find topic info error", e);        }        consumer.subscribe(Collections.singletonList(topic));        log.info("consumer topic {} start", topic);        new Thread(() -> {            try {                while (true) {                    ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(1));                    for (ConsumerRecord<String, byte[]> record : records) {                        listener.messageReceived(record);                    }                }            } catch (WakeupException we) {                consumer.close();            } catch (Exception e) {                log.error("consumer topic {} consume error", topic, e);                consumer.close();            }        }).start();        Thread.sleep(5_000);    }
    @Override    public void close() throws Exception {        log.info("wake up {} consumer", consumer);        consumer.wakeup();    }}    @Override    protected void onSubscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,                               MqttSubscribePayload subscribePayload) throws Exception {        for (MqttTopicSubscription topicSubscription : subscribePayload.topicSubscriptions()) {            KafkaConsumerListenerWrapper consumer = createConsumer(mqttSessionKey, topicSubscription.topicName());            subscribe(ctx, consumer, topicSubscription.topicName());        }    }
    private KafkaConsumerListenerWrapper createConsumer(MqttSessionKey mqttSessionKey, String topic) {        MqttTopicKey mqttTopicKey = new MqttTopicKey();        mqttTopicKey.setTopic(topic);        mqttTopicKey.setMqttSessionKey(mqttSessionKey);
        wLock.lock();        try {            KafkaConsumerListenerWrapper consumer = consumerMap.get(mqttTopicKey);            if (consumer == null) {                consumer = new KafkaConsumerListenerWrapper(kafkaProcessorConfig, mqttSessionKey.getUsername());                sessionConsumerMap.compute(mqttSessionKey, (mqttSessionKey1, mqttTopicKeys) -> {                    if (mqttTopicKeys == null) {                        mqttTopicKeys = new ArrayList<>();                    }                    mqttTopicKeys.add(mqttTopicKey);                    return mqttTopicKeys;                });                consumerMap.put(mqttTopicKey, consumer);            }            return consumer;        } finally {            wLock.unlock();        }    }
    protected void subscribe(ChannelHandlerContext ctx,                             KafkaConsumerListenerWrapper consumer, String topic) throws Exception {        BoundInt boundInt = new BoundInt(65535);        consumer.start(topic, record -> {            log.info("receive message from kafka, topic {}, partition {}, offset {}",                    record.topic(), record.partition(), record.offset());            MqttPublishMessage mqttPublishMessage = MqttMessageUtil.publishMessage(                    MqttQoS.AT_LEAST_ONCE, topic, boundInt.nextVal(), record.value());            ctx.writeAndFlush(mqttPublishMessage);        });    }在上述的代码中,有一个需要通篇注意的点:日志打印的时候,注意要将关键的信息携带,比如:topic、mqtt username、mqtt clientId 等,在写 demo 的时候没有感觉,但是海量请求下需要定位问题的时候,就知道这些信息的关键之处了。
使用 BountInt 这个简单的工具类来生成从 0~65535 的 packageId,满足协议的要求
pulsar processor 实现
pulsar 相比 kafka 来说,更适合作为 mqtt 协议的代理。原因有如下几点:
- pulsar 支持百万 topic、topic 实现更轻量 
- pulsar 原生支持 listener 的消费模式,不需要每个消费者启动一个线程 
- pulsar 支持 share 的消费模式,消费模式更灵活 
- pulsar 消费者的 subscribe 可确保成功创建订阅,相比 kafka 的消费者没有这样的语义保障 
    protected final ReentrantReadWriteLock.ReadLock rLock;
    protected final ReentrantReadWriteLock.WriteLock wLock;
    protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionProducerMap;
    protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionConsumerMap;
    protected final Map<MqttTopicKey, Producer<byte[]>> producerMap;
    protected final Map<MqttTopicKey, Consumer<byte[]>> consumerMap;
    private final PulsarProcessorConfig pulsarProcessorConfig;
    private final PulsarAdmin pulsarAdmin;
    private final PulsarClient pulsarClient;
    public PulsarProcessor(MqttAuth mqttAuth, PulsarProcessorConfig pulsarProcessorConfig) {        super(mqttAuth);        ReentrantReadWriteLock lock = new ReentrantReadWriteLock();        rLock = lock.readLock();        wLock = lock.writeLock();        this.sessionProducerMap = new HashMap<>();        this.sessionConsumerMap = new HashMap<>();        this.producerMap = new HashMap<>();        this.consumerMap = new HashMap<>();        this.pulsarProcessorConfig = pulsarProcessorConfig;        try {            this.pulsarAdmin = PulsarAdmin.builder()                    .serviceHttpUrl(pulsarProcessorConfig.getHttpUrl())                    .build();            this.pulsarClient = PulsarClient.builder()                    .serviceUrl(pulsarProcessorConfig.getServiceUrl())                    .build();        } catch (Exception e) {            throw new IllegalStateException("Failed to create pulsar client", e);        }    }处理 publish 消息
    @Override    protected void onPublish(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,                             MqttPublishMessage msg) throws Exception {        String topic = msg.variableHeader().topicName();        Producer<byte[]> producer = getOrCreateProducer(mqttSessionKey, topic);        int len = msg.payload().readableBytes();        byte[] messageBytes = new byte[len];        msg.payload().getBytes(msg.payload().readerIndex(), messageBytes);        switch (msg.fixedHeader().qosLevel()) {            case AT_MOST_ONCE -> producer.sendAsync(messageBytes).                    thenAccept(messageId -> log.info("clientId [{}],"                                    + " username [{}]. send message to pulsar success messageId: {}",                            mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), messageId))                    .exceptionally((e) -> {                        log.error("clientId [{}], username [{}]. send message to pulsar fail: ",                                mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), e);                        return null;                    });            case AT_LEAST_ONCE -> {                try {                    MessageId messageId = producer.send(messageBytes);                    MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK,                            false, MqttQoS.AT_MOST_ONCE, false, 0);                    MqttPubAckMessage pubAckMessage = (MqttPubAckMessage) MqttMessageFactory.newMessage(fixedHeader,                            MqttMessageIdVariableHeader.from(msg.variableHeader().packetId()), null);                    log.info("clientId [{}], username [{}]. send pulsar success. messageId: {}",                            mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), messageId);                    ctx.writeAndFlush(pubAckMessage);                } catch (PulsarClientException e) {                    log.error("clientId [{}], username [{}]. send pulsar error: {}",                            mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), e.getMessage());                }            }            case EXACTLY_ONCE, FAILURE -> throw new IllegalStateException(                    String.format("mqttSessionKey %s can not reach here", mqttSessionKey));        }    }
    private Producer<byte[]> getOrCreateProducer(MqttSessionKey mqttSessionKey, String topic) throws Exception {        MqttTopicKey mqttTopicKey = new MqttTopicKey();        mqttTopicKey.setTopic(topic);        mqttTopicKey.setMqttSessionKey(mqttSessionKey);
        rLock.lock();        try {            Producer<byte[]> producer = producerMap.get(mqttTopicKey);            if (producer != null) {                return producer;            }        } finally {            rLock.unlock();        }
        wLock.lock();        try {            Producer<byte[]> producer = producerMap.get(mqttTopicKey);            if (producer == null) {                producer = createProducer(topic);                sessionProducerMap.compute(mqttSessionKey, (mqttSessionKey1, mqttTopicKeys) -> {                    if (mqttTopicKeys == null) {                        mqttTopicKeys = new ArrayList<>();                    }                    mqttTopicKeys.add(mqttTopicKey);                    return mqttTopicKeys;                });                producerMap.put(mqttTopicKey, producer);            }            return producer;        } finally {            wLock.unlock();        }    }
    protected Producer<byte[]> createProducer(String topic) throws Exception {        return pulsarClient.newProducer(Schema.BYTES).topic(topic).create();    }处理 subscribe 消息
    @Override    protected void onSubscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,                               MqttSubscribePayload subscribePayload) throws Exception {        for (MqttTopicSubscription topicSubscription : subscribePayload.topicSubscriptions()) {            subscribe(ctx, mqttSessionKey, topicSubscription.topicName());        }    }
    protected void subscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,                             String topic) throws Exception {        MqttTopicKey mqttTopicKey = new MqttTopicKey();        mqttTopicKey.setTopic(topic);        mqttTopicKey.setMqttSessionKey(mqttSessionKey);
        wLock.lock();        try {            Consumer<byte[]> consumer = consumerMap.get(mqttTopicKey);            if (consumer == null) {                consumer = createConsumer(ctx, mqttSessionKey.getUsername(), topic);                sessionConsumerMap.compute(mqttSessionKey, (mqttSessionKey1, mqttTopicKeys) -> {                    if (mqttTopicKeys == null) {                        mqttTopicKeys = new ArrayList<>();                    }                    mqttTopicKeys.add(mqttTopicKey);                    return mqttTopicKeys;                });                consumerMap.put(mqttTopicKey, consumer);            }        } finally {            wLock.unlock();        }    }
    protected Consumer<byte[]> createConsumer(ChannelHandlerContext ctx, String username,                                              String topic) throws Exception {        BoundInt boundInt = new BoundInt(65535);        try {            PartitionedTopicStats partitionedStats = pulsarAdmin.topics().getPartitionedStats(topic, false);            log.info("topic {} partitioned stats {}", topic, partitionedStats);        } catch (PulsarAdminException.NotFoundException nfe) {            log.info("topic {} not found", topic);            pulsarAdmin.topics().createPartitionedTopic(topic, 1);        }        return pulsarClient.newConsumer(Schema.BYTES).topic(topic)                .messageListener((consumer, msg) -> {                    log.info("receive message from pulsar, topic {}, message {}", topic, msg.getMessageId());                    MqttPublishMessage mqttPublishMessage = MqttMessageUtil.publishMessage(                            MqttQoS.AT_LEAST_ONCE, topic, boundInt.nextVal(), msg.getData());                    ctx.writeAndFlush(mqttPublishMessage);                })                .subscriptionName(username).subscribe();    }测试用例鲁邦的软件应该有相应的测试用例,这里简单写了两个基础的 pubsub 用例,实际的 production ready 的项目,测试用例会更加复杂,涵盖各种异常的场景。有句话说的很好 ”单元测试是对开发人员的即时激励“,我也很认同这句话
kafka 启动 kafka 测试 broker 我们可以通过 embedded-kafka-java 这个项目来启动用做单元测试的 kafka broker。通过如下的 group 引入依赖
    <dependency>        <groupId>io.github.embedded-middleware</groupId>        <artifactId>embedded-kafka-core</artifactId>        <version>0.0.2</version>        <scope>test</scope>    </dependency>
我们就可以通过如下的代码启动基于 kafka 的 mqtt broker
@Slf4jpublic class MqttKafkaTestUtil {
    public static MqttServer setupMqttKafka() throws Exception {        EmbeddedKafkaServer embeddedKafkaServer = new EmbeddedKafkaServer();        new Thread(() -> {            try {                embeddedKafkaServer.start();            } catch (Exception e) {                log.error("kafka broker started exception ", e);            }        }).start();        Thread.sleep(5_000);        MqttServerConfig mqttServerConfig = new MqttServerConfig();        mqttServerConfig.setPort(0);        mqttServerConfig.setProcessorType(ProcessorType.KAFKA);        KafkaProcessorConfig kafkaProcessorConfig = new KafkaProcessorConfig();        kafkaProcessorConfig.setBootstrapServers(String.format("localhost:%d", embeddedKafkaServer.getKafkaPort()));        mqttServerConfig.setKafkaProcessorConfig(kafkaProcessorConfig);        MqttServer mqttServer = new MqttServer(mqttServerConfig);        new Thread(() -> {            try {                mqttServer.start();            } catch (Exception e) {                log.error("mqsar broker started exception ", e);            }        }).start();        Thread.sleep(5000L);        return mqttServer;    }
}kafka 端到端测试用例,比较简单,通过 mqtt client publish 一条消息,然后消费出来
@Log4j2public class MqttKafkaPubSubTest {
    @Test    public void pubSubTest() throws Exception {        MqttServer mqttServer = MqttKafkaTestUtil.setupMqttKafka();        String topic = UUID.randomUUID().toString();        String content = "test-msg";        String broker = String.format("tcp://localhost:%d", mqttServer.getPort());        String clientId = UUID.randomUUID().toString();        MemoryPersistence persistence = new MemoryPersistence();        MqttClient sampleClient = new MqttClient(broker, clientId, persistence);        MqttConnectOptions connOpts = new MqttConnectOptions();        connOpts.setUserName(UUID.randomUUID().toString());        connOpts.setPassword(UUID.randomUUID().toString().toCharArray());        connOpts.setCleanSession(true);        log.info("Mqtt connecting to broker");        sampleClient.connect(connOpts);        CompletableFuture<String> future = new CompletableFuture<>();        log.info("Mqtt subscribing");        sampleClient.subscribe(topic, (s, mqttMessage) -> {            log.info("messageArrived");            future.complete(mqttMessage.toString());        });        log.info("Mqtt subscribed");        MqttMessage message = new MqttMessage(content.getBytes());        message.setQos(1);        log.info("Mqtt message publishing");        sampleClient.publish(topic, message);        log.info("Mqtt message published");        TimeUnit.SECONDS.sleep(3);        sampleClient.disconnect();        String msg = future.get(5, TimeUnit.SECONDS);        Assertions.assertEquals(content, msg);    }
}pulsar 我们可以通过 embedded-pulsar-java 这个项目来启动用做单元测试的 pulsar broker。通过如下的 group 引入依赖
    <dependency>        <groupId>io.github.embedded-middleware</groupId>        <artifactId>embedded-pulsar-core</artifactId>        <version>0.0.2</version>        <scope>test</scope>    </dependency>
我们就可以通过如下的代码启动基于 pulsar 的 mqtt broker
@Slf4jpublic class MqttPulsarTestUtil {
    public static MqttServer setupMqttPulsar() throws Exception {        EmbeddedPulsarServer embeddedPulsarServer = new EmbeddedPulsarServer();        embeddedPulsarServer.start();        MqttServerConfig mqttServerConfig = new MqttServerConfig();        mqttServerConfig.setPort(0);        mqttServerConfig.setProcessorType(ProcessorType.PULSAR);        PulsarProcessorConfig pulsarProcessorConfig = new PulsarProcessorConfig();        pulsarProcessorConfig.setHttpUrl(String.format("http://localhost:%d", embeddedPulsarServer.getWebPort()));        pulsarProcessorConfig.setServiceUrl(String.format("pulsar://localhost:%d", embeddedPulsarServer.getTcpPort()));        mqttServerConfig.setPulsarProcessorConfig(pulsarProcessorConfig);        MqttServer mqttServer = new MqttServer(mqttServerConfig);        new Thread(() -> {            try {                mqttServer.start();            } catch (Exception e) {                log.error("mqsar broker started exception ", e);            }        }).start();        Thread.sleep(5000L);        return mqttServer;    }}pulsar 端到端测试用例,比较简单,通过 mqtt client publish 一条消息,然后消费出来
@Log4j2public class MqttPulsarPubSubTest {
    @Test    public void pubSubTest() throws Exception {        MqttServer mqttServer = MqttPulsarTestUtil.setupMqttPulsar();        String topic = UUID.randomUUID().toString();        String content = "test-msg";        String broker = String.format("tcp://localhost:%d", mqttServer.getPort());        String clientId = UUID.randomUUID().toString();        MemoryPersistence persistence = new MemoryPersistence();        MqttClient sampleClient = new MqttClient(broker, clientId, persistence);        MqttConnectOptions connOpts = new MqttConnectOptions();        connOpts.setUserName(UUID.randomUUID().toString());        connOpts.setPassword(UUID.randomUUID().toString().toCharArray());        connOpts.setCleanSession(true);        log.info("Mqtt connecting to broker");        sampleClient.connect(connOpts);        CompletableFuture<String> future = new CompletableFuture<>();        log.info("Mqtt subscribing");        sampleClient.subscribe(topic, (s, mqttMessage) -> {            log.info("messageArrived");            future.complete(mqttMessage.toString());        });        log.info("Mqtt subscribed");        MqttMessage message = new MqttMessage(content.getBytes());        message.setQos(1);        log.info("Mqtt message publishing");        sampleClient.publish(topic, message);        log.info("Mqtt message published");        TimeUnit.SECONDS.sleep(3);        sampleClient.disconnect();        String msg = future.get(5, TimeUnit.SECONDS);        Assertions.assertEquals(content, msg);    }}性能优化
这里我们简单描述几个性能优化点,像一些调整线程数、buffer 大小这类的参数调整就不在这里赘述了,这些需要具体的性能压测来决定参数的设置。
在 linux 上使用 Epoll 网络模型
public class EventLoopUtil {
    /**     * @return an EventLoopGroup suitable for the current platform     */    public static EventLoopGroup newEventLoopGroup(int nThreads, ThreadFactory threadFactory) {        if (Epoll.isAvailable()) {            return new EpollEventLoopGroup(nThreads, threadFactory);        } else {            return new NioEventLoopGroup(nThreads, threadFactory);        }    }
    public static Class<? extends ServerSocketChannel> getServerSocketChannelClass(EventLoopGroup eventLoopGroup) {        if (eventLoopGroup instanceof EpollEventLoopGroup) {            return EpollServerSocketChannel.class;        } else {            return NioServerSocketChannel.class;        }    }
}通过 Epollo.isAvailable,以及在指定 channel 类型的时候通过判断 group 的类型选择对应的 channel 类型
        EventLoopGroup acceptorGroup = EventLoopUtil.newEventLoopGroup(1,                new DefaultThreadFactory("mqtt-acceptor"));        EventLoopGroup workerGroup = EventLoopUtil.newEventLoopGroup(1,                new DefaultThreadFactory("mqtt-worker"));                b.group(acceptorGroup, workerGroup)                    // key point                    .channel(EventLoopUtil.getServerSocketChannelClass(workerGroup))                    .option(ChannelOption.SO_BACKLOG, 100)                    .handler(new LoggingHandler(LogLevel.INFO))                    .childHandler(new ChannelInitializer<SocketChannel>() {                        @Override                        public void initChannel(SocketChannel ch) throws Exception {                            ChannelPipeline p = ch.pipeline();                            // decoder                            p.addLast(new MqttDecoder());                            p.addLast(MqttEncoder.INSTANCE);                            p.addLast(new MqttHandler(processor(mqttServerConfig)));                        }                    });关闭 tcp keepalive 由于 mqtt 协议本身就有心跳机制,所以可以关闭 tcp 的 keepalive,依赖 mqtt 协议层的心跳即可,节约海量连接下的性能。配置 ChannelOption.SO_KEEPALIVE 为 false 即可
                .option(ChannelOption.SO_KEEPALIVE, false)
超时时间调短默认情况下,无论是单元测试中 mqtt,还是 pulsar producer 和 kafka producer 的生产超时时间,都相对较长(一般为 30s),如果在内网环境部署,可以将超时时间调整到 5s。来避免无意义的超时等待
使用多个 KafkaProducer 来优化性能单个 KafkaProducer 会达到 tcp 链路带宽的瓶颈,当有海量请求,而延时在 kafka 生产比较突出的情况下,可以考虑启动多个 KafkaProducer。并根据 mqtt 协议的特点(链路多,单个链路上 qps 不高),用 mqttSessionKey 的哈希值来决定使用那个 KafkaProducer 发送消息
在 KafkaProcessorConfig 中添加如下配置,生产者个数,默认为 1
    private int producerNum = 1;
在初始化的时候,初始化 Producer 数组,而不是单个 Producer
    this.producerArray = new KafkaProducer[kafkaProcessorConfig.getProducerNum()];    for (int i = 0; i < kafkaProcessorConfig.getProducerNum(); i++) {        producerArray[i] = createProducer();    }
封装一个方法来获取 producer
    private Producer<String, ByteBuffer> getProducer(MqttSessionKey mqttSessionKey) {        return producerArray[Math.abs(mqttSessionKey.hashCode() % kafkaProcessorConfig.getProducerNum())];    }结语
本文的代码均已上传到github。我们这里仅仅只实现了基础的 mqtt 连接、发布、订阅功能,甚至不支持暂停、取消订阅。想要实现一个成熟商用的 mqtt 网关,我们还需要用户隔离、对协议的更多支持、可靠性、可运维、流控、安全等能力。如有商用生产级别的 mqtt 需求,又无法快速构筑成熟的 mqtt 网关的,可以选择华为云IoTDA服务,提供稳定可靠的 mqtt 服务,支持海量设备连接上云、设备和云端消息双向通信能力。
版权声明: 本文为 InfoQ 作者【华为云开发者联盟】的原创文章。
原文链接:【http://xie.infoq.cn/article/15d34a2d1042101d319261da8】。文章转载请联系作者。

华为云开发者联盟
提供全面深入的云计算技术干货 2020-07-14 加入
生于云,长于云,让开发者成为决定性力量










 
    
评论