写点什么

CnosDB 向 EMQX 实时推送消息,实现端到端的数据实时流转

作者:CnosDB
  • 2024-12-26
    德国
  • 本文字数:1388 字

    阅读完需:约 5 分钟

CnosDB向EMQX实时推送消息,实现端到端的数据实时流转


《构建高性能物联网数据平台:EMQX和CnosDB的完整教程》中,我们探讨了如何利用 EMQX 的数据桥接功能,将采集来的时序数据实时传输到 CnosDB 中。本次分享将进一步介绍如何通过 CnosDB 的订阅功能,实现数据推送至 EMQX。通过这种方式,我们不仅能够确保数据的即时传输,还能充分发挥两者的优势,提升整体数据处理能力。


前置条件

在开始之前,需要准备以下工具和环境:


  1. CnosDB 是一款高性能、高压缩率的开源分布式时序数据库。作为数据的发起端,需要将数据写入 CnosDB,并分发到 EMQX。

  2. Cnos-Telegraf 是一款基于 Telegraf 开源的数据采集代理,主要用于数据监控、处理与分析。它可以将数据分发到 CnosDB 或将 CnosDB 中的数据进行外部订阅。

  3. EMQX(Erlang/OTP MQTT Broker)是基于 Erlang/OTP 开发的开源 MQTT 消息中间件,负责在数据库场景中进行消息传递和数据流转。

  4. mosquitto_sub 是一个命令行工具,用于订阅 MQTT 主题。EMQX 中的数据以主题为单位进行发布和订阅。通过 mosquitto_sub 订阅特定主题,可以获取相关数据。

环境搭建

  1. 操作系统


  • Ubuntu 20.04.6 LTS x86_64


  1. EMQX


从官方仓库下载最新的 EMQX 镜像(版本 5.8.2)。启动容器服务时,请确保将 1883 端口映射出来:


其他安装方式请参考:https://docs.cnosdb.com/docs/deploy/


docker run -d --name emqx -p 18083:18083 -p 1883:1883 emqx:latest
复制代码


  1. Cnos-Telegraf 服务


通过官方仓库下载 Cnos-Telegraf 的最新代码并编译。修改配置文件如下:


[global_tags][agent]  interval = "10s"  round_interval = true  metric_batch_size = 1000  metric_buffer_limit = 10000  collection_jitter = "0s"  flush_interval = "10s"  flush_jitter = "0s"  precision = "0s"
[[outputs.mqtt]] servers = ["tcp://127.0.0.1:1883"] topic = "telegraf/metrics" username = "admin" password = "public" data_format = "influx" [[inputs.cnosdb_subscription]] service_address = ":7703"
复制代码


启动 Cnos-Telegraf 服务:


./bin/telegraf --config ./telegraf.conf
复制代码


  1. mosquitto_sub 服务


如果未安装 mosquitto_sub,可以通过以下命令安装:


sudo apt-get updatesudo apt-get install mosquitto-clients
复制代码


启动 mosquitto_sub 监听 EMQX 的订阅:


mosquitto_sub -h 127.0.0.1 -u admin -P public -t telegraf/metrics -p 1883
复制代码


  1. 下载并编译最新版企业版 CnosDB(版本 2.4.2),使用默认配置启动服务:


./target/release/cnosdb run --config ./config/config_8902.toml -M singleton
复制代码


在 CnosDB 中创建订阅(目标端口为 Cnos-Telegraf 的监听端口):


CREATE SUBSCRIPTION test ON public DESTINATIONS ALL "127.0.0.1:7703";
复制代码


然后创建表并插入数据:


CREATE TABLE air (visibility DOUBLE, temperature DOUBLE, pressure DOUBLE, TAGS(station));INSERT INTO air (TIME, station, visibility, temperature, pressure) VALUES (1667456411000000000, 'XiaoMaiDao1', 56, 69, 411);
复制代码

数据验证

写入成功后,CnosDB 表中和  mosquitto_sub 的订阅都会收到一条数据。如下:


  • CnosDB 接收到的数据:


  • 推送至 EMQX 中并使用 mosquitto_sub 订阅的数据(默认格式):


以上就是将 CnosDB 数据发送到 EMQX 的完整流程。希望这次的分享能够帮助你更好地理解数据流转的实现!



欣赏 CnosDB 社区名画:




发布于: 刚刚阅读数: 4
用户头像

CnosDB

关注

还未添加个人签名 2022-04-18 加入

打造高性能、高压缩比、高可用的分布式云原生时间序列数据库,引领世界迈向万物智联 欢迎关注 https://www.cnosdb.com

评论

发布
暂无评论
CnosDB向EMQX实时推送消息,实现端到端的数据实时流转_AI_CnosDB_InfoQ写作社区