写点什么

一文带你掌握物联网 Mqtt 网关搭建背后的技术原理

  • 2023-02-02
    中国香港
  • 本文字数:30799 字

    阅读完需:约 101 分钟

一文带你掌握物联网Mqtt网关搭建背后的技术原理

本文分享自华为云社区《一文带你掌握物联网mqtt网关搭建背后的技术原理》,作者:张俭。

前言


物联网是现在比较热门的软件领域,众多物联网厂商都有自己的物联网平台,而物联网平台其中一个核心的模块就是 Mqtt 网关。这篇文章的目的是手把手教大家写书写一个 mqtt 网关,后端存储支持 Kafka/Pulsar,支持 mqtt 连接、断链、发送消息、订阅消息。技术选型:


  • Netty java 最流行的网络框架

  • netty-codec-mqtt netty 的子项目,mqtt 编解码插件

  • Pulsar/Kafka 流行的消息中间件作为后端存储


核心 pom 依赖如下


        <dependency>            <groupId>io.netty</groupId>            <artifactId>netty-codec-mqtt</artifactId>        </dependency>        <dependency>            <groupId>io.netty</groupId>            <artifactId>netty-common</artifactId>        </dependency>        <dependency>            <groupId>io.netty</groupId>            <artifactId>netty-transport</artifactId>        </dependency>        <dependency>            <groupId>org.apache.pulsar</groupId>            <artifactId>pulsar-client-original</artifactId>            <version>${pulsar.version}</version>        </dependency>        <dependency>            <groupId>org.apache.kafka</groupId>            <artifactId>kafka-clients</artifactId>            <version>${kafka.version}</version>        </dependency>        <dependency>            <groupId>org.eclipse.paho</groupId>            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>            <version>${mqtt-client.version}</version>            <scope>test</scope>        </dependency>
复制代码

软件参数设计


软件参数可谓是非常常见,复杂的开源项目,参数甚至可以达到上百个、配置文件长达数千行。我们需要的配置有

MqttServer 监听的端口


监听端口的配置即使是写 demo 也非常必要,常常用在单元测试中,由于单元测试跑完之后,即使网络服务器关闭,操作系统也不会立即释放端口,所以单元测试的时候指定随机端口非常关键,在 java 中,我们可以通过这样的工具类来获取一个空闲的端口。未配置的话,我们就使用 mqtt 的默认端口 1883。


package io.github.protocol.mqtt.broker.util;
import java.io.IOException;import java.io.UncheckedIOException;import java.net.ServerSocket;
public class SocketUtil {
public static int getFreePort() { try (ServerSocket serverSocket = new ServerSocket(0)) { return serverSocket.getLocalPort(); } catch (IOException e) { throw new UncheckedIOException(e); } }
}
复制代码

后端存储配置


我们的 mqtt 网关是没有可靠的存储能力的,依赖后端的消息中间件来做持久化处理。后端规划支持 Pulsar、Kafka 两种类型。定义枚举类如下


public enum ProcessorType {    KAFKA,    PULSAR,}
复制代码


对应的 KafkaProcessorConfig、PulsarProcessorConfig 比较简单,包含基础的连接地址即可,如果后续要做性能调优、安全,这块还是会有更多的配置项


@Setter@Getterpublic class KafkaProcessorConfig {
private String bootstrapServers = "localhost:9092";
public KafkaProcessorConfig() { }}
复制代码


@Setter@Getterpublic class PulsarProcessorConfig {
private String httpUrl = "http://localhost:8080";
private String serviceUrl = "pulsar://localhost:6650";
public PulsarProcessorConfig() { }}
复制代码

启动 netty MqttServer


我们通过 netty 启动一个 mqttServer,添加 mqtt 解码器


package io.github.protocol.mqtt.broker;
import io.github.protocol.mqtt.broker.processor.KafkaProcessor;import io.github.protocol.mqtt.broker.processor.KafkaProcessorConfig;import io.github.protocol.mqtt.broker.processor.MqttProcessor;import io.github.protocol.mqtt.broker.processor.PulsarProcessor;import io.github.protocol.mqtt.broker.processor.PulsarProcessorConfig;import io.github.protocol.mqtt.broker.util.SocketUtil;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.ChannelPipeline;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.mqtt.MqttDecoder;import io.netty.handler.codec.mqtt.MqttEncoder;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;import lombok.extern.slf4j.Slf4j;
@Slf4jpublic class MqttServer {
private final MqttServerConfig mqttServerConfig;
public MqttServer() { this(new MqttServerConfig()); }
public MqttServer(MqttServerConfig mqttServerConfig) { this.mqttServerConfig = mqttServerConfig; if (mqttServerConfig.getPort() == 0) { mqttServerConfig.setPort(SocketUtil.getFreePort()); } }
public void start() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); // decoder p.addLast(new MqttDecoder()); p.addLast(MqttEncoder.INSTANCE); } });
// Start the server. ChannelFuture f = b.bind(mqttServerConfig.getPort()).sync();
// Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
private MqttProcessor processor(MqttServerConfig config) { return switch (config.getProcessorType()) { case KAFKA -> new KafkaProcessor(config.getMqttAuth(), config.getKafkaProcessorConfig()); case PULSAR -> new PulsarProcessor(config.getMqttAuth(), config.getPulsarProcessorConfig()); }; }
public int getPort() { return mqttServerConfig.getPort(); }
}
复制代码

MqttserverStarter.java


我们写一个简单的 main 函数用来启动 mqttServer,方便调测


package io.github.protocol.mqtt.broker;
public class MqttServerStarter {
public static void main(String[] args) throws Exception { new MqttServer().start(); }
}
复制代码


客户端使用 eclipse mqtt client 进行测试


package io.github.protocol.mqtt;
import lombok.extern.log4j.Log4j2;import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
@Log4j2public class MqttClientPublishExample {
public static void main(String[] args) throws Exception { String topic = "MQTT Examples"; String content = "Message from MqttPublishExample"; int qos = 2; String broker = "tcp://127.0.0.1:1883"; String clientId = "JavaSample"; MemoryPersistence persistence = new MemoryPersistence();
try { MqttClient sampleClient = new MqttClient(broker, clientId, persistence); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(true); log.info("Connecting to broker: {}", broker); sampleClient.connect(connOpts); log.info("Connected"); log.info("Publishing message: {}", content); MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(qos); sampleClient.publish(topic, message); log.info("Message published"); sampleClient.disconnect(); log.info("Disconnected"); System.exit(0); } catch (MqttException me) { log.error("reason {} msg {}", me.getReasonCode(), me.getMessage(), me); } }
}
复制代码


然后我们先运行 MqttServer,再运行 MqttClient,发现 MqttClient 卡住了


Connecting to broker: tcp://127.0.0.1:1883
复制代码


这是为什么呢,我们通过抓包发现仅仅只有客户端发送了 Mqtt connect 信息,服务端并没有响应



但是根据 mqtt 标准协议,发送 Connect 消息,必须要有 ConnAck 响应



所以我们需要在接收到 Connect 后,返回 connAck 消息。我们创建一个 MqttHandler,让他继承 ChannelInboundHandlerAdapter, 用来接力 MqttDecoder 解码完成后的消息,这里要重点继承其中的 channelRead 方法,以及 channelInactive 方法,用来释放断链时需要释放的资源


package com.github.shoothzj.mqtt;
import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import lombok.extern.slf4j.Slf4j;
@Slf4jpublic class MqttHandler extends ChannelInboundHandlerAdapter {
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { super.channelRead(ctx, msg); }
}
复制代码


然后把这个 handler 加入到 netty 的职责链中,放到解码器的后面



在 mqtt handler 中插入我们的代码


    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        super.channelRead(ctx, msg);        if (msg instanceof MqttConnectMessage) {            handleConnect(ctx, (MqttConnectMessage) msg);        } else {            log.error("Unsupported type msg [{}]", msg);        }    }
private void handleConnect(ChannelHandlerContext ctx, MqttConnectMessage connectMessage) { log.info("connect msg is [{}]", connectMessage); }
复制代码


打印出 connectMessage 如下


[MqttConnectMessage[fixedHeader=MqttFixedHeader[messageType=CONNECT, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=22], variableHeader=MqttConnectVariableHeader[name=MQTT, version=4, hasUserName=false, hasPassword=false, isWillRetain=false, isWillFlag=false, isCleanSession=true, keepAliveTimeSeconds=60], payload=MqttConnectPayload[clientIdentifier=JavaSample, willTopic=null, willMessage=null, userName=null, password=null]]]
复制代码


通常,mqtt connect message 中会包含 qos、用户名、密码等信息,由于我们启动客户端的时候也没有携带用户名和密码,这里获取到的都为 null,我们先不校验这些消息,直接给客户端返回 connack 消息,代表连接成功


        final MqttConnAckMessage ackMessage = MqttMessageBuilders.connAck().returnCode(CONNECTION_ACCEPTED).build();        ctx.channel().writeAndFlush(ackMessage);
复制代码


我们再运行起 Server 和 Client,随后可以看到已经走过了 Connect 阶段,进入了 publish message 过程,接下来我们再实现更多的其他场景



附上此阶段的 MqttHandler 代码


package com.github.shoothzj.mqtt;
import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.handler.codec.mqtt.MqttConnAckMessage;import io.netty.handler.codec.mqtt.MqttConnectMessage;import io.netty.handler.codec.mqtt.MqttConnectPayload;import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;import io.netty.handler.codec.mqtt.MqttFixedHeader;import io.netty.handler.codec.mqtt.MqttMessageBuilders;import lombok.extern.slf4j.Slf4j;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED;
@Slf4jpublic class MqttHandler extends ChannelInboundHandlerAdapter {
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { super.channelRead(ctx, msg); if (msg instanceof MqttConnectMessage) { handleConnect(ctx, (MqttConnectMessage) msg); } else { log.error("Unsupported type msg [{}]", msg); } }
private void handleConnect(ChannelHandlerContext ctx, MqttConnectMessage connectMessage) { log.info("connect msg is [{}]", connectMessage); final MqttFixedHeader fixedHeader = connectMessage.fixedHeader(); final MqttConnectVariableHeader variableHeader = connectMessage.variableHeader(); final MqttConnectPayload connectPayload = connectMessage.payload(); final MqttConnAckMessage ackMessage = MqttMessageBuilders.connAck().returnCode(CONNECTION_ACCEPTED).build(); ctx.channel().writeAndFlush(ackMessage); }
}
复制代码


我们当前把所有的逻辑都放在 MqttHandler 里面,不方便后续的扩展。抽象出一个 MqttProcessor 接口来处理具体的请求,MqttHandler 负责解析 MqttMessage 的类型并分发。MqttProcess 接口设计如下


package io.github.protocol.mqtt.broker.processor;
import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.mqtt.MqttConnAckMessage;import io.netty.handler.codec.mqtt.MqttConnectMessage;import io.netty.handler.codec.mqtt.MqttMessage;import io.netty.handler.codec.mqtt.MqttPubAckMessage;import io.netty.handler.codec.mqtt.MqttPublishMessage;import io.netty.handler.codec.mqtt.MqttSubAckMessage;import io.netty.handler.codec.mqtt.MqttSubscribeMessage;import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
public interface MqttProcessor {
void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) throws Exception;
void processConnAck(ChannelHandlerContext ctx, MqttConnAckMessage msg) throws Exception;
void processPublish(ChannelHandlerContext ctx, MqttPublishMessage msg) throws Exception;
void processPubAck(ChannelHandlerContext ctx, MqttPubAckMessage msg) throws Exception;
void processPubRec(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;
void processPubRel(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;
void processPubComp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;
void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage msg) throws Exception;
void processSubAck(ChannelHandlerContext ctx, MqttSubAckMessage msg) throws Exception;
void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage msg) throws Exception;
void processUnsubAck(ChannelHandlerContext ctx, MqttUnsubAckMessage msg) throws Exception;
void processPingReq(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;
void processPingResp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;
void processDisconnect(ChannelHandlerContext ctx) throws Exception;
void processAuth(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;
}
复制代码


我们允许这些方法抛出异常,当遇到极难处理的故障时,把 mqtt 连接断掉(如后端存储故障),等待客户端的重连。


MqttHandler 中来调用 MqttProcessor,相关 MqttHandler 代码如下


        Preconditions.checkArgument(message instanceof MqttMessage);        MqttMessage msg = (MqttMessage) message;        try {            if (msg.decoderResult().isFailure()) {                Throwable cause = msg.decoderResult().cause();                if (cause instanceof MqttUnacceptableProtocolVersionException) {                    // Unsupported protocol version                    MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(                            new MqttFixedHeader(MqttMessageType.CONNACK,                                    false, MqttQoS.AT_MOST_ONCE, false, 0),                            new MqttConnAckVariableHeader(                                    MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION,                                    false), null);                    ctx.writeAndFlush(connAckMessage);                    log.error("connection refused due to invalid protocol, client address [{}]",                            ctx.channel().remoteAddress());                    ctx.close();                    return;                } else if (cause instanceof MqttIdentifierRejectedException) {                    // ineligible clientId                    MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(                            new MqttFixedHeader(MqttMessageType.CONNACK,                                    false, MqttQoS.AT_MOST_ONCE, false, 0),                            new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED,                                    false), null);                    ctx.writeAndFlush(connAckMessage);                    log.error("ineligible clientId, client address [{}]", ctx.channel().remoteAddress());                    ctx.close();                    return;                }                throw new IllegalStateException(msg.decoderResult().cause().getMessage());            }            MqttMessageType messageType = msg.fixedHeader().messageType();            if (log.isDebugEnabled()) {                log.debug("Processing MQTT Inbound handler message, type={}", messageType);            }            switch (messageType) {                case CONNECT:                    Preconditions.checkArgument(msg instanceof MqttConnectMessage);                    processor.processConnect(ctx, (MqttConnectMessage) msg);                    break;                case CONNACK:                    Preconditions.checkArgument(msg instanceof MqttConnAckMessage);                    processor.processConnAck(ctx, (MqttConnAckMessage) msg);                    break;                case PUBLISH:                    Preconditions.checkArgument(msg instanceof MqttPublishMessage);                    processor.processPublish(ctx, (MqttPublishMessage) msg);                    break;                case PUBACK:                    Preconditions.checkArgument(msg instanceof MqttPubAckMessage);                    processor.processPubAck(ctx, (MqttPubAckMessage) msg);                    break;                case PUBREC:                    processor.processPubRec(ctx, msg);                    break;                case PUBREL:                    processor.processPubRel(ctx, msg);                    break;                case PUBCOMP:                    processor.processPubComp(ctx, msg);                    break;                case SUBSCRIBE:                    Preconditions.checkArgument(msg instanceof MqttSubscribeMessage);                    processor.processSubscribe(ctx, (MqttSubscribeMessage) msg);                    break;                case SUBACK:                    Preconditions.checkArgument(msg instanceof MqttSubAckMessage);                    processor.processSubAck(ctx, (MqttSubAckMessage) msg);                    break;                case UNSUBSCRIBE:                    Preconditions.checkArgument(msg instanceof MqttUnsubscribeMessage);                    processor.processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);                    break;                case UNSUBACK:                    Preconditions.checkArgument(msg instanceof MqttUnsubAckMessage);                    processor.processUnsubAck(ctx, (MqttUnsubAckMessage) msg);                    break;                case PINGREQ:                    processor.processPingReq(ctx, msg);                    break;                case PINGRESP:                    processor.processPingResp(ctx, msg);                    break;                case DISCONNECT:                    processor.processDisconnect(ctx);                    break;                case AUTH:                    processor.processAuth(ctx, msg);                    break;                default:                    throw new UnsupportedOperationException("Unknown MessageType: " + messageType);            }        } catch (Throwable ex) {            ReferenceCountUtil.safeRelease(msg);            log.error("Exception was caught while processing MQTT message, ", ex);            ctx.close();        }
复制代码


这里的代码,主要是针对 MqttMessage 的不同类型,调用 MqttProcessor 的不同方法,值得一提的有两点


  • 提前判断了一些解码异常,fast fail

  • 全局捕获异常,并进行断链处理

维护 MqttSession


维护 Mqtt 会话的 session,主要用来持续跟踪客户端会话信息,跟踪在系统中占用的资源等,考虑到无论是何种后端实现,都需要维护 Mqtt 的 Session,我们构筑一个 AbstractMqttProcessor 来维护 MqttSession


package io.github.protocol.mqtt.broker.processor;
import io.github.protocol.mqtt.broker.MqttSessionKey;import io.github.protocol.mqtt.broker.auth.MqttAuth;import io.github.protocol.mqtt.broker.util.ChannelUtils;import io.github.protocol.mqtt.broker.util.MqttMessageUtil;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.mqtt.MqttConnAckMessage;import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;import io.netty.handler.codec.mqtt.MqttConnectMessage;import io.netty.handler.codec.mqtt.MqttConnectReturnCode;import io.netty.handler.codec.mqtt.MqttFixedHeader;import io.netty.handler.codec.mqtt.MqttMessage;import io.netty.handler.codec.mqtt.MqttMessageFactory;import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;import io.netty.handler.codec.mqtt.MqttMessageType;import io.netty.handler.codec.mqtt.MqttPubAckMessage;import io.netty.handler.codec.mqtt.MqttPublishMessage;import io.netty.handler.codec.mqtt.MqttQoS;import io.netty.handler.codec.mqtt.MqttSubAckMessage;import io.netty.handler.codec.mqtt.MqttSubAckPayload;import io.netty.handler.codec.mqtt.MqttSubscribeMessage;import io.netty.handler.codec.mqtt.MqttSubscribePayload;import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;import lombok.extern.slf4j.Slf4j;import org.apache.commons.lang3.StringUtils;
import java.util.stream.IntStream;
@Slf4jpublic abstract class AbstractProcessor implements MqttProcessor {
protected final MqttAuth mqttAuth;
public AbstractProcessor(MqttAuth mqttAuth) { this.mqttAuth = mqttAuth; }
@Override public void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) throws Exception { String clientId = msg.payload().clientIdentifier(); String username = msg.payload().userName(); byte[] pwd = msg.payload().passwordInBytes(); if (StringUtils.isBlank(clientId) || StringUtils.isBlank(username)) { MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), null); ctx.writeAndFlush(connAckMessage); log.error("the clientId username pwd cannot be empty, client address[{}]", ctx.channel().remoteAddress()); ctx.close(); return; } if (!mqttAuth.connAuth(clientId, username, pwd)) { MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false), null); ctx.writeAndFlush(connAckMessage); log.error("the clientId username pwd cannot be empty, client address[{}]", ctx.channel().remoteAddress()); ctx.close(); return; }
MqttSessionKey mqttSessionKey = new MqttSessionKey(); mqttSessionKey.setUsername(username); mqttSessionKey.setClientId(clientId); ChannelUtils.setMqttSession(ctx.channel(), mqttSessionKey); log.info("username {} clientId {} remote address {} connected", username, clientId, ctx.channel().remoteAddress()); onConnect(mqttSessionKey); MqttConnAckMessage mqttConnectMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, false), null); ctx.writeAndFlush(mqttConnectMessage); }
protected void onConnect(MqttSessionKey mqttSessionKey) { }
@Override public void processConnAck(ChannelHandlerContext ctx, MqttConnAckMessage msg) throws Exception { MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel()); if (mqttSession == null) { log.error("conn ack, client address {} not authed", ctx.channel().remoteAddress()); ctx.close(); } }
@Override public void processPublish(ChannelHandlerContext ctx, MqttPublishMessage msg) throws Exception { MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel()); if (mqttSession == null) { log.error("publish, client address {} not authed", ctx.channel().remoteAddress()); ctx.close(); return; } if (msg.fixedHeader().qosLevel() == MqttQoS.FAILURE) { log.error("failure. clientId {}, username {} ", mqttSession.getClientId(), mqttSession.getUsername()); return; } if (msg.fixedHeader().qosLevel() == MqttQoS.EXACTLY_ONCE) { log.error("does not support QoS2 protocol. clientId {}, username {} ", mqttSession.getClientId(), mqttSession.getUsername()); return; } onPublish(ctx, mqttSession, msg); }
protected void onPublish(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey, MqttPublishMessage msg) throws Exception { }
@Override public void processPubAck(ChannelHandlerContext ctx, MqttPubAckMessage msg) throws Exception { MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel()); if (mqttSession == null) { log.error("pub ack, client address {} not authed", ctx.channel().remoteAddress()); ctx.close(); } }
@Override public void processPubRec(ChannelHandlerContext ctx, MqttMessage msg) throws Exception { MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel()); if (mqttSession == null) { log.error("pub rec, client address {} not authed", ctx.channel().remoteAddress()); ctx.close(); } }
@Override public void processPubRel(ChannelHandlerContext ctx, MqttMessage msg) throws Exception { MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel()); if (mqttSession == null) { log.error("pub rel, client address {} not authed", ctx.channel().remoteAddress()); ctx.close(); } }
@Override public void processPubComp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception { MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel()); if (mqttSession == null) { log.error("pub comp, client address {} not authed", ctx.channel().remoteAddress()); ctx.close(); } }
@Override public void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage msg) throws Exception { MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel()); if (mqttSession == null) { log.error("sub, client address {} not authed", ctx.channel().remoteAddress()); ctx.close(); } onSubscribe(ctx, mqttSession, msg.payload()); MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); IntStream intStream = msg.payload().topicSubscriptions().stream().mapToInt(s -> s.qualityOfService().value()); MqttSubAckPayload payload = new MqttSubAckPayload(intStream.toArray()); ctx.writeAndFlush(MqttMessageFactory.newMessage( fixedHeader, MqttMessageIdVariableHeader.from(msg.variableHeader().messageId()), payload)); }
protected void onSubscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey, MqttSubscribePayload subscribePayload) throws Exception { }
@Override public void processSubAck(ChannelHandlerContext ctx, MqttSubAckMessage msg) throws Exception { MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel()); if (mqttSession == null) { log.error("sub ack, client address {} not authed", ctx.channel().remoteAddress()); ctx.close(); } }
@Override public void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage msg) throws Exception { MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel()); if (mqttSession == null) { log.error("unsub, client address {} not authed", ctx.channel().remoteAddress()); ctx.close(); } }
@Override public void processUnsubAck(ChannelHandlerContext ctx, MqttUnsubAckMessage msg) throws Exception { MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel()); if (mqttSession == null) { log.error("unsub ack, client address {} not authed", ctx.channel().remoteAddress()); ctx.close(); } }
@Override public void processPingReq(ChannelHandlerContext ctx, MqttMessage msg) throws Exception { ctx.writeAndFlush(MqttMessageUtil.pingResp()); }
@Override public void processPingResp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception { MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel()); if (mqttSession == null) { log.error("ping resp, client address {} not authed", ctx.channel().remoteAddress()); ctx.close(); } }
@Override public void processDisconnect(ChannelHandlerContext ctx) throws Exception { MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel()); if (mqttSession == null) { log.error("disconnect, client address {} not authed", ctx.channel().remoteAddress()); } onDisconnect(mqttSession); }
protected void onDisconnect(MqttSessionKey mqttSessionKey) { }
@Override public void processAuth(ChannelHandlerContext ctx, MqttMessage msg) throws Exception { MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel()); if (mqttSession == null) { log.error("auth, client address {} not authed", ctx.channel().remoteAddress()); ctx.close(); } }}
复制代码


可以看到,这里的 AbstractProcessor 主要是维护了 MqttSessionKey,校验 MqttSessionKey,并拦截 publish 中不支持的 Qos2、Failure。同时,也影响了 mqtt 心跳请求。同样的,我们允许在 onPublishonSubscribe 中抛出异常。


基于消息队列实现的 mqtt 网关的基础思想也比较简单,简而言之就是,有 publish 消息的时候向消息队列中生产消息。有订阅的时候就从消息队列中拉取消息。由此延伸出来,我们可能需要维护每个 mqtt topic 和 producer、consumer 的对应关系,因为像 kafka、pulsar 这些消息中间件的消费者都是区分 topic 的,片段通用代码如下:


    protected final ReentrantReadWriteLock.ReadLock rLock;
protected final ReentrantReadWriteLock.WriteLock wLock;
protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionProducerMap;
protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionConsumerMap;
protected final Map<MqttTopicKey, P> producerMap;
protected final Map<MqttTopicKey, C> consumerMap;
public AbstractMqProcessor(MqttAuth mqttAuth) { super(mqttAuth); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); rLock = lock.readLock(); wLock = lock.writeLock(); this.sessionProducerMap = new HashMap<>(); this.sessionConsumerMap = new HashMap<>(); this.producerMap = new HashMap<>(); this.consumerMap = new HashMap<>(); }
@Override protected void onConnect(MqttSessionKey mqttSessionKey) { wLock.lock(); try { sessionProducerMap.put(mqttSessionKey, new ArrayList<>()); sessionConsumerMap.put(mqttSessionKey, new ArrayList<>()); } finally { wLock.unlock(); } }
@Override protected void onDisconnect(MqttSessionKey mqttSessionKey) { wLock.lock(); try { // find producers List<MqttTopicKey> produceTopicKeys = sessionProducerMap.get(mqttSessionKey); if (produceTopicKeys != null) { for (MqttTopicKey mqttTopicKey : produceTopicKeys) { P producer = producerMap.get(mqttTopicKey); if (producer != null) { ClosableUtils.close(producer); producerMap.remove(mqttTopicKey); } } } sessionProducerMap.remove(mqttSessionKey); List<MqttTopicKey> consumeTopicKeys = sessionConsumerMap.get(mqttSessionKey); if (consumeTopicKeys != null) { for (MqttTopicKey mqttTopicKey : consumeTopicKeys) { C consumer = consumerMap.get(mqttTopicKey); if (consumer != null) { ClosableUtils.close(consumer); consumerMap.remove(mqttTopicKey); } } } sessionConsumerMap.remove(mqttSessionKey); } finally { wLock.unlock(); } }}
复制代码

kafka processor 实现


由于 kafka producer 不区分 topic,我们可以在 kafka processor 中复用 producer,在将来单个 kafka producer 的性能到达上限时,我们可以将 kafka producer 扩展为 kafka producer 列表进行轮询处理,消费者由于 mqtt 协议可能针对每个订阅 topic 有不同的行为,不合适复用同一个消费者实例。我们在构造函数中启动 KafkaProducer


    private final KafkaProcessorConfig kafkaProcessorConfig;
private final KafkaProducer<String, ByteBuffer> producer;
public KafkaProcessor(MqttAuth mqttAuth, KafkaProcessorConfig kafkaProcessorConfig) { super(mqttAuth); this.kafkaProcessorConfig = kafkaProcessorConfig; this.producer = createProducer(); }
protected KafkaProducer<String, ByteBuffer> createProducer() { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProcessorConfig.getBootstrapServers()); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer.class); return new KafkaProducer<>(properties); }
复制代码


处理 MqttPublish 消息,MqttPublish 消息包含如下几个关键参数


MqttQoS mqttQoS = publishMessage.fixedHeader().qosLevel();String topic = publishMessage.variableHeader().topicName();ByteBuffer byteBuffer = publishMessage.payload().nioBuffer();
复制代码


其中


  • qos 代表这条消息的质量级别,0 没有任何保障,1 代表至少一次,2 代表恰好一次。当前仅支持 qos0、qos1

  • topicName 就是 topic 的名称

  • ByteBuffer 就是消息的内容


根据 topic、qos 发送消息,代码如下


        String topic = msg.variableHeader().topicName();        ProducerRecord<String, ByteBuffer> record = new ProducerRecord<>(topic, msg.payload().nioBuffer());        switch (msg.fixedHeader().qosLevel()) {            case AT_MOST_ONCE -> producer.send(record, (metadata, exception) -> {                if (exception != null) {                    log.error("mqttSessionKey {} send message to kafka error", mqttSessionKey, exception);                    return;                }                log.debug("mqttSessionKey {} send message to kafka success, topic {}, partition {}, offset {}",                        mqttSessionKey, metadata.topic(), metadata.partition(), metadata.offset());            });            case AT_LEAST_ONCE -> {                try {                    RecordMetadata recordMetadata = producer.send(record).get();                    log.info("mqttSessionKey {} send message to kafka success, topic {}, partition {}, offset {}",                            mqttSessionKey, recordMetadata.topic(),                            recordMetadata.partition(), recordMetadata.offset());                    ctx.writeAndFlush(MqttMessageUtil.pubAckMessage(msg.variableHeader().packetId()));                } catch (Exception e) {                    log.error("mqttSessionKey {} send message to kafka error", mqttSessionKey, e);                }            }            case EXACTLY_ONCE, FAILURE -> throw new IllegalStateException(                    String.format("mqttSessionKey %s can not reach here", mqttSessionKey));        }
复制代码


处理订阅消息,我们暂时仅根据订阅的 topic,创建 topic 进行消费即可,由于 kafka 原生客户端建议的消费代码模式如下


while (true) {  ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(1));  for (ConsumerRecord<String, byte[]> record : records) {    // do logic  }}
复制代码


我们需要切换到其他线程对 consumer 进行消息,书写一个 KafkaConsumerListenerWrapper 的 wrapper,转换为 listener 异步消费模型


package io.github.protocol.mqtt.broker.processor;
import lombok.extern.slf4j.Slf4j;import org.apache.kafka.clients.admin.AdminClient;import org.apache.kafka.clients.admin.AdminClientConfig;import org.apache.kafka.clients.admin.KafkaAdminClient;import org.apache.kafka.clients.admin.NewTopic;import org.apache.kafka.clients.admin.TopicDescription;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;import org.apache.kafka.common.errors.WakeupException;import org.apache.kafka.common.serialization.ByteArrayDeserializer;import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;import java.util.Collections;import java.util.Properties;import java.util.concurrent.ExecutionException;
@Slf4jpublic class KafkaConsumerListenerWrapper implements AutoCloseable {
private final AdminClient adminClient;
private final KafkaConsumer<String, byte[]> consumer;
public KafkaConsumerListenerWrapper(KafkaProcessorConfig config, String username) { Properties adminProperties = new Properties(); adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers()); this.adminClient = KafkaAdminClient.create(adminProperties); Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, username); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); this.consumer = new KafkaConsumer<>(properties); }
public void start(String topic, KafkaMessageListener listener) throws Exception { try { TopicDescription topicDescription = adminClient.describeTopics(Collections.singletonList(topic)) .values().get(topic).get(); log.info("topic info is {}", topicDescription); } catch (ExecutionException ee) { if (ee.getCause() instanceof UnknownTopicOrPartitionException) { log.info("topic {} not exist, create it", topic); adminClient.createTopics(Collections.singletonList(new NewTopic(topic, 1, (short) 1))); } else { log.error("find topic info {} error", topic, ee); } } catch (Exception e) { throw new IllegalStateException("find topic info error", e); } consumer.subscribe(Collections.singletonList(topic)); log.info("consumer topic {} start", topic); new Thread(() -> { try { while (true) { ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, byte[]> record : records) { listener.messageReceived(record); } } } catch (WakeupException we) { consumer.close(); } catch (Exception e) { log.error("consumer topic {} consume error", topic, e); consumer.close(); } }).start(); Thread.sleep(5_000); }
@Override public void close() throws Exception { log.info("wake up {} consumer", consumer); consumer.wakeup(); }}
复制代码


    @Override    protected void onSubscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,                               MqttSubscribePayload subscribePayload) throws Exception {        for (MqttTopicSubscription topicSubscription : subscribePayload.topicSubscriptions()) {            KafkaConsumerListenerWrapper consumer = createConsumer(mqttSessionKey, topicSubscription.topicName());            subscribe(ctx, consumer, topicSubscription.topicName());        }    }
private KafkaConsumerListenerWrapper createConsumer(MqttSessionKey mqttSessionKey, String topic) { MqttTopicKey mqttTopicKey = new MqttTopicKey(); mqttTopicKey.setTopic(topic); mqttTopicKey.setMqttSessionKey(mqttSessionKey);
wLock.lock(); try { KafkaConsumerListenerWrapper consumer = consumerMap.get(mqttTopicKey); if (consumer == null) { consumer = new KafkaConsumerListenerWrapper(kafkaProcessorConfig, mqttSessionKey.getUsername()); sessionConsumerMap.compute(mqttSessionKey, (mqttSessionKey1, mqttTopicKeys) -> { if (mqttTopicKeys == null) { mqttTopicKeys = new ArrayList<>(); } mqttTopicKeys.add(mqttTopicKey); return mqttTopicKeys; }); consumerMap.put(mqttTopicKey, consumer); } return consumer; } finally { wLock.unlock(); } }
protected void subscribe(ChannelHandlerContext ctx, KafkaConsumerListenerWrapper consumer, String topic) throws Exception { BoundInt boundInt = new BoundInt(65535); consumer.start(topic, record -> { log.info("receive message from kafka, topic {}, partition {}, offset {}", record.topic(), record.partition(), record.offset()); MqttPublishMessage mqttPublishMessage = MqttMessageUtil.publishMessage( MqttQoS.AT_LEAST_ONCE, topic, boundInt.nextVal(), record.value()); ctx.writeAndFlush(mqttPublishMessage); }); }
复制代码


在上述的代码中,有一个需要通篇注意的点:日志打印的时候,注意要将关键的信息携带,比如:topic、mqtt username、mqtt clientId 等,在写 demo 的时候没有感觉,但是海量请求下需要定位问题的时候,就知道这些信息的关键之处了。


使用 BountInt 这个简单的工具类来生成从 0~65535 的 packageId,满足协议的要求

pulsar processor 实现


pulsar 相比 kafka 来说,更适合作为 mqtt 协议的代理。原因有如下几点:


  • pulsar 支持百万 topic、topic 实现更轻量

  • pulsar 原生支持 listener 的消费模式,不需要每个消费者启动一个线程

  • pulsar 支持 share 的消费模式,消费模式更灵活

  • pulsar 消费者的 subscribe 可确保成功创建订阅,相比 kafka 的消费者没有这样的语义保障


    protected final ReentrantReadWriteLock.ReadLock rLock;
protected final ReentrantReadWriteLock.WriteLock wLock;
protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionProducerMap;
protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionConsumerMap;
protected final Map<MqttTopicKey, Producer<byte[]>> producerMap;
protected final Map<MqttTopicKey, Consumer<byte[]>> consumerMap;
private final PulsarProcessorConfig pulsarProcessorConfig;
private final PulsarAdmin pulsarAdmin;
private final PulsarClient pulsarClient;
public PulsarProcessor(MqttAuth mqttAuth, PulsarProcessorConfig pulsarProcessorConfig) { super(mqttAuth); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); rLock = lock.readLock(); wLock = lock.writeLock(); this.sessionProducerMap = new HashMap<>(); this.sessionConsumerMap = new HashMap<>(); this.producerMap = new HashMap<>(); this.consumerMap = new HashMap<>(); this.pulsarProcessorConfig = pulsarProcessorConfig; try { this.pulsarAdmin = PulsarAdmin.builder() .serviceHttpUrl(pulsarProcessorConfig.getHttpUrl()) .build(); this.pulsarClient = PulsarClient.builder() .serviceUrl(pulsarProcessorConfig.getServiceUrl()) .build(); } catch (Exception e) { throw new IllegalStateException("Failed to create pulsar client", e); } }
复制代码


处理 publish 消息


    @Override    protected void onPublish(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,                             MqttPublishMessage msg) throws Exception {        String topic = msg.variableHeader().topicName();        Producer<byte[]> producer = getOrCreateProducer(mqttSessionKey, topic);        int len = msg.payload().readableBytes();        byte[] messageBytes = new byte[len];        msg.payload().getBytes(msg.payload().readerIndex(), messageBytes);        switch (msg.fixedHeader().qosLevel()) {            case AT_MOST_ONCE -> producer.sendAsync(messageBytes).                    thenAccept(messageId -> log.info("clientId [{}],"                                    + " username [{}]. send message to pulsar success messageId: {}",                            mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), messageId))                    .exceptionally((e) -> {                        log.error("clientId [{}], username [{}]. send message to pulsar fail: ",                                mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), e);                        return null;                    });            case AT_LEAST_ONCE -> {                try {                    MessageId messageId = producer.send(messageBytes);                    MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK,                            false, MqttQoS.AT_MOST_ONCE, false, 0);                    MqttPubAckMessage pubAckMessage = (MqttPubAckMessage) MqttMessageFactory.newMessage(fixedHeader,                            MqttMessageIdVariableHeader.from(msg.variableHeader().packetId()), null);                    log.info("clientId [{}], username [{}]. send pulsar success. messageId: {}",                            mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), messageId);                    ctx.writeAndFlush(pubAckMessage);                } catch (PulsarClientException e) {                    log.error("clientId [{}], username [{}]. send pulsar error: {}",                            mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), e.getMessage());                }            }            case EXACTLY_ONCE, FAILURE -> throw new IllegalStateException(                    String.format("mqttSessionKey %s can not reach here", mqttSessionKey));        }    }
private Producer<byte[]> getOrCreateProducer(MqttSessionKey mqttSessionKey, String topic) throws Exception { MqttTopicKey mqttTopicKey = new MqttTopicKey(); mqttTopicKey.setTopic(topic); mqttTopicKey.setMqttSessionKey(mqttSessionKey);
rLock.lock(); try { Producer<byte[]> producer = producerMap.get(mqttTopicKey); if (producer != null) { return producer; } } finally { rLock.unlock(); }
wLock.lock(); try { Producer<byte[]> producer = producerMap.get(mqttTopicKey); if (producer == null) { producer = createProducer(topic); sessionProducerMap.compute(mqttSessionKey, (mqttSessionKey1, mqttTopicKeys) -> { if (mqttTopicKeys == null) { mqttTopicKeys = new ArrayList<>(); } mqttTopicKeys.add(mqttTopicKey); return mqttTopicKeys; }); producerMap.put(mqttTopicKey, producer); } return producer; } finally { wLock.unlock(); } }
protected Producer<byte[]> createProducer(String topic) throws Exception { return pulsarClient.newProducer(Schema.BYTES).topic(topic).create(); }
复制代码


处理 subscribe 消息


    @Override    protected void onSubscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,                               MqttSubscribePayload subscribePayload) throws Exception {        for (MqttTopicSubscription topicSubscription : subscribePayload.topicSubscriptions()) {            subscribe(ctx, mqttSessionKey, topicSubscription.topicName());        }    }
protected void subscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey, String topic) throws Exception { MqttTopicKey mqttTopicKey = new MqttTopicKey(); mqttTopicKey.setTopic(topic); mqttTopicKey.setMqttSessionKey(mqttSessionKey);
wLock.lock(); try { Consumer<byte[]> consumer = consumerMap.get(mqttTopicKey); if (consumer == null) { consumer = createConsumer(ctx, mqttSessionKey.getUsername(), topic); sessionConsumerMap.compute(mqttSessionKey, (mqttSessionKey1, mqttTopicKeys) -> { if (mqttTopicKeys == null) { mqttTopicKeys = new ArrayList<>(); } mqttTopicKeys.add(mqttTopicKey); return mqttTopicKeys; }); consumerMap.put(mqttTopicKey, consumer); } } finally { wLock.unlock(); } }
protected Consumer<byte[]> createConsumer(ChannelHandlerContext ctx, String username, String topic) throws Exception { BoundInt boundInt = new BoundInt(65535); try { PartitionedTopicStats partitionedStats = pulsarAdmin.topics().getPartitionedStats(topic, false); log.info("topic {} partitioned stats {}", topic, partitionedStats); } catch (PulsarAdminException.NotFoundException nfe) { log.info("topic {} not found", topic); pulsarAdmin.topics().createPartitionedTopic(topic, 1); } return pulsarClient.newConsumer(Schema.BYTES).topic(topic) .messageListener((consumer, msg) -> { log.info("receive message from pulsar, topic {}, message {}", topic, msg.getMessageId()); MqttPublishMessage mqttPublishMessage = MqttMessageUtil.publishMessage( MqttQoS.AT_LEAST_ONCE, topic, boundInt.nextVal(), msg.getData()); ctx.writeAndFlush(mqttPublishMessage); }) .subscriptionName(username).subscribe(); }
复制代码


测试用例鲁邦的软件应该有相应的测试用例,这里简单写了两个基础的 pubsub 用例,实际的 production ready 的项目,测试用例会更加复杂,涵盖各种异常的场景。有句话说的很好 ”单元测试是对开发人员的即时激励“,我也很认同这句话


kafka 启动 kafka 测试 broker 我们可以通过 embedded-kafka-java 这个项目来启动用做单元测试的 kafka broker。通过如下的 group 引入依赖


    <dependency>        <groupId>io.github.embedded-middleware</groupId>        <artifactId>embedded-kafka-core</artifactId>        <version>0.0.2</version>        <scope>test</scope>    </dependency>
复制代码


我们就可以通过如下的代码启动基于 kafka 的 mqtt broker


@Slf4jpublic class MqttKafkaTestUtil {
public static MqttServer setupMqttKafka() throws Exception { EmbeddedKafkaServer embeddedKafkaServer = new EmbeddedKafkaServer(); new Thread(() -> { try { embeddedKafkaServer.start(); } catch (Exception e) { log.error("kafka broker started exception ", e); } }).start(); Thread.sleep(5_000); MqttServerConfig mqttServerConfig = new MqttServerConfig(); mqttServerConfig.setPort(0); mqttServerConfig.setProcessorType(ProcessorType.KAFKA); KafkaProcessorConfig kafkaProcessorConfig = new KafkaProcessorConfig(); kafkaProcessorConfig.setBootstrapServers(String.format("localhost:%d", embeddedKafkaServer.getKafkaPort())); mqttServerConfig.setKafkaProcessorConfig(kafkaProcessorConfig); MqttServer mqttServer = new MqttServer(mqttServerConfig); new Thread(() -> { try { mqttServer.start(); } catch (Exception e) { log.error("mqsar broker started exception ", e); } }).start(); Thread.sleep(5000L); return mqttServer; }
}
复制代码


kafka 端到端测试用例,比较简单,通过 mqtt client publish 一条消息,然后消费出来


@Log4j2public class MqttKafkaPubSubTest {
@Test public void pubSubTest() throws Exception { MqttServer mqttServer = MqttKafkaTestUtil.setupMqttKafka(); String topic = UUID.randomUUID().toString(); String content = "test-msg"; String broker = String.format("tcp://localhost:%d", mqttServer.getPort()); String clientId = UUID.randomUUID().toString(); MemoryPersistence persistence = new MemoryPersistence(); MqttClient sampleClient = new MqttClient(broker, clientId, persistence); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setUserName(UUID.randomUUID().toString()); connOpts.setPassword(UUID.randomUUID().toString().toCharArray()); connOpts.setCleanSession(true); log.info("Mqtt connecting to broker"); sampleClient.connect(connOpts); CompletableFuture<String> future = new CompletableFuture<>(); log.info("Mqtt subscribing"); sampleClient.subscribe(topic, (s, mqttMessage) -> { log.info("messageArrived"); future.complete(mqttMessage.toString()); }); log.info("Mqtt subscribed"); MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(1); log.info("Mqtt message publishing"); sampleClient.publish(topic, message); log.info("Mqtt message published"); TimeUnit.SECONDS.sleep(3); sampleClient.disconnect(); String msg = future.get(5, TimeUnit.SECONDS); Assertions.assertEquals(content, msg); }
}
复制代码


pulsar 我们可以通过 embedded-pulsar-java 这个项目来启动用做单元测试的 pulsar broker。通过如下的 group 引入依赖


    <dependency>        <groupId>io.github.embedded-middleware</groupId>        <artifactId>embedded-pulsar-core</artifactId>        <version>0.0.2</version>        <scope>test</scope>    </dependency>
复制代码


我们就可以通过如下的代码启动基于 pulsar 的 mqtt broker


@Slf4jpublic class MqttPulsarTestUtil {
public static MqttServer setupMqttPulsar() throws Exception { EmbeddedPulsarServer embeddedPulsarServer = new EmbeddedPulsarServer(); embeddedPulsarServer.start(); MqttServerConfig mqttServerConfig = new MqttServerConfig(); mqttServerConfig.setPort(0); mqttServerConfig.setProcessorType(ProcessorType.PULSAR); PulsarProcessorConfig pulsarProcessorConfig = new PulsarProcessorConfig(); pulsarProcessorConfig.setHttpUrl(String.format("http://localhost:%d", embeddedPulsarServer.getWebPort())); pulsarProcessorConfig.setServiceUrl(String.format("pulsar://localhost:%d", embeddedPulsarServer.getTcpPort())); mqttServerConfig.setPulsarProcessorConfig(pulsarProcessorConfig); MqttServer mqttServer = new MqttServer(mqttServerConfig); new Thread(() -> { try { mqttServer.start(); } catch (Exception e) { log.error("mqsar broker started exception ", e); } }).start(); Thread.sleep(5000L); return mqttServer; }}
复制代码


pulsar 端到端测试用例,比较简单,通过 mqtt client publish 一条消息,然后消费出来


@Log4j2public class MqttPulsarPubSubTest {
@Test public void pubSubTest() throws Exception { MqttServer mqttServer = MqttPulsarTestUtil.setupMqttPulsar(); String topic = UUID.randomUUID().toString(); String content = "test-msg"; String broker = String.format("tcp://localhost:%d", mqttServer.getPort()); String clientId = UUID.randomUUID().toString(); MemoryPersistence persistence = new MemoryPersistence(); MqttClient sampleClient = new MqttClient(broker, clientId, persistence); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setUserName(UUID.randomUUID().toString()); connOpts.setPassword(UUID.randomUUID().toString().toCharArray()); connOpts.setCleanSession(true); log.info("Mqtt connecting to broker"); sampleClient.connect(connOpts); CompletableFuture<String> future = new CompletableFuture<>(); log.info("Mqtt subscribing"); sampleClient.subscribe(topic, (s, mqttMessage) -> { log.info("messageArrived"); future.complete(mqttMessage.toString()); }); log.info("Mqtt subscribed"); MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(1); log.info("Mqtt message publishing"); sampleClient.publish(topic, message); log.info("Mqtt message published"); TimeUnit.SECONDS.sleep(3); sampleClient.disconnect(); String msg = future.get(5, TimeUnit.SECONDS); Assertions.assertEquals(content, msg); }}
复制代码

性能优化


这里我们简单描述几个性能优化点,像一些调整线程数、buffer 大小这类的参数调整就不在这里赘述了,这些需要具体的性能压测来决定参数的设置。

在 linux 上使用 Epoll 网络模型


public class EventLoopUtil {
/** * @return an EventLoopGroup suitable for the current platform */ public static EventLoopGroup newEventLoopGroup(int nThreads, ThreadFactory threadFactory) { if (Epoll.isAvailable()) { return new EpollEventLoopGroup(nThreads, threadFactory); } else { return new NioEventLoopGroup(nThreads, threadFactory); } }
public static Class<? extends ServerSocketChannel> getServerSocketChannelClass(EventLoopGroup eventLoopGroup) { if (eventLoopGroup instanceof EpollEventLoopGroup) { return EpollServerSocketChannel.class; } else { return NioServerSocketChannel.class; } }
}
复制代码


通过 Epollo.isAvailable,以及在指定 channel 类型的时候通过判断 group 的类型选择对应的 channel 类型


        EventLoopGroup acceptorGroup = EventLoopUtil.newEventLoopGroup(1,                new DefaultThreadFactory("mqtt-acceptor"));        EventLoopGroup workerGroup = EventLoopUtil.newEventLoopGroup(1,                new DefaultThreadFactory("mqtt-worker"));
复制代码


                b.group(acceptorGroup, workerGroup)                    // key point                    .channel(EventLoopUtil.getServerSocketChannelClass(workerGroup))                    .option(ChannelOption.SO_BACKLOG, 100)                    .handler(new LoggingHandler(LogLevel.INFO))                    .childHandler(new ChannelInitializer<SocketChannel>() {                        @Override                        public void initChannel(SocketChannel ch) throws Exception {                            ChannelPipeline p = ch.pipeline();                            // decoder                            p.addLast(new MqttDecoder());                            p.addLast(MqttEncoder.INSTANCE);                            p.addLast(new MqttHandler(processor(mqttServerConfig)));                        }                    });
复制代码


关闭 tcp keepalive 由于 mqtt 协议本身就有心跳机制,所以可以关闭 tcp 的 keepalive,依赖 mqtt 协议层的心跳即可,节约海量连接下的性能。配置 ChannelOption.SO_KEEPALIVE 为 false 即可


                .option(ChannelOption.SO_KEEPALIVE, false)
复制代码


超时时间调短默认情况下,无论是单元测试中 mqtt,还是 pulsar producer 和 kafka producer 的生产超时时间,都相对较长(一般为 30s),如果在内网环境部署,可以将超时时间调整到 5s。来避免无意义的超时等待


使用多个 KafkaProducer 来优化性能单个 KafkaProducer 会达到 tcp 链路带宽的瓶颈,当有海量请求,而延时在 kafka 生产比较突出的情况下,可以考虑启动多个 KafkaProducer。并根据 mqtt 协议的特点(链路多,单个链路上 qps 不高),用 mqttSessionKey 的哈希值来决定使用那个 KafkaProducer 发送消息


在 KafkaProcessorConfig 中添加如下配置,生产者个数,默认为 1


    private int producerNum = 1;
复制代码


在初始化的时候,初始化 Producer 数组,而不是单个 Producer


    this.producerArray = new KafkaProducer[kafkaProcessorConfig.getProducerNum()];    for (int i = 0; i < kafkaProcessorConfig.getProducerNum(); i++) {        producerArray[i] = createProducer();    }
复制代码


封装一个方法来获取 producer


    private Producer<String, ByteBuffer> getProducer(MqttSessionKey mqttSessionKey) {        return producerArray[Math.abs(mqttSessionKey.hashCode() % kafkaProcessorConfig.getProducerNum())];    }
复制代码

结语


本文的代码均已上传到github。我们这里仅仅只实现了基础的 mqtt 连接、发布、订阅功能,甚至不支持暂停、取消订阅。想要实现一个成熟商用的 mqtt 网关,我们还需要用户隔离、对协议的更多支持、可靠性、可运维、流控、安全等能力。如有商用生产级别的 mqtt 需求,又无法快速构筑成熟的 mqtt 网关的,可以选择华为云IoTDA服务,提供稳定可靠的 mqtt 服务,支持海量设备连接上云、设备和云端消息双向通信能力。


点击关注,第一时间了解华为云新鲜技术~

发布于: 刚刚阅读数: 3
用户头像

提供全面深入的云计算技术干货 2020-07-14 加入

生于云,长于云,让开发者成为决定性力量

评论

发布
暂无评论
一文带你掌握物联网Mqtt网关搭建背后的技术原理_后端_华为云开发者联盟_InfoQ写作社区