写点什么

KWDB 多模分布式数据库助力共享打印机物联网 IoT 最佳实践落地,实现高效存储与查询时序数据

作者:KaiwuDB
  • 2025-07-08
    重庆
  • 本文字数:19664 字

    阅读完需:约 65 分钟

KWDB多模分布式数据库助力共享打印机物联网IoT最佳实践落地,实现高效存储与查询时序数据

作者:Sunny_媛


原文链接:https://blog.csdn.net/m0_68635815/article/details/148190910

一、前言:

近年来,随着物联网(IoT)、人工智能(AI)、云计算、大数据等技术的快速发展,随着物联网设备的普及和智能设备的增加,产生了大量的时间序列数据,这些数据需要被实时存储和处理。时序数据库以其高效处理大量时间序列数据的能力,成为处理这些数据的理想选择‌。


那么,今天给大家带来的一款 - KWDB 作为一款面向 AIoT 场景的国产分布式数据库,凭借其高效的时序数据处理能力与多模融合特性,逐渐成为行业关注的焦点。本文将从 Docker 容器化部署集群方式,到 KWDB 落地最佳实践案例,再到 KWDB 的技术分析,本文将以 3 台 ECS 上部署 docker 集群来演示,让大家可以快速了解并体验 KWDB 多模数据库的强大功能与便捷管理功能。


二、什么是时序数据库?

时序数据库‌是一种专门设计用于处理‌时间序列数据‌的数据库系统。时间序列数据通常是指按照时间顺序记录的数据,通常包括时间戳和对应的观测值。时序数据广泛应用于各种领域,如物联网 IoT、智能城市、金融市场分析、气象预测、交通流量监测和生产过程监控等‌。

2.1 时序数据库的有什么样的特点呢?主要的用途场景有哪些呢?



时序数据库(Time-Series Database, TSDB)是专为处理带时间标签的数据而设计的数据库系统,其核心作用包括高效存储、快速查询和分析时间序列数据,适用于高频率、大规模且时间依赖性强的数据场景。

2.2 关系型数据库与时序数据库的差异与不同:

在物联网 IoT 时代,各类数据如潮水般涌现,数据处理与存储需求日趋复杂,数据类型的多元化直接推动了专用数据库的发展。平时也是接触过最多的是 OLAP 与 OLTP 这种关系型数据库,那么我们可以来对比一下时序数据库主要用途。


2.2.1 关系型数据库:

支持事务处理,其事务机制一般将一系列数据库操作组合成一个逻辑单元。比如银行转账场景中,A 账户转出 100 元,B 账户转入 100 元,该事务中的所有操作只能全部成功执行或全部不执行。分布式系统中关系型数据库的事务机制需要记录数据过程状态,并提供失败回滚机制,进行并发控制。

2.2.2 时序数据库:

传感器采集的数据只来自单一数据源,每条数据都是传感器对该时刻测点数值(如打印了几张纸,打印是黑白还是彩色)的真实记录,不存在同时修改两个数值的场景,也没有写冲突的场景(多个人同时修改同一条数据),因此事务在时序数据库中是不必要的,时序数据库需要优先保证的是大量数据的高效稳定写入。



作为数据管理领域的两种核心引擎,时序数据库与关系型数据库分别服务于不同的数字化场景:前者凭借其通用性优势,广泛支撑着电商、物流、企业管理系统等业务,后者专精于处理按时间生成的时序数据,主要应用于工业物联网、金融等领域。

2.3 常见的时序数据库有哪些?

由于关系型数据库天生的劣势导致其无法进行高效的存储和数据的查询,因此需要一种专门针对时间序列数据来做优化的数据库系统,即时间序列数据库,主要用于处理带时间标签(按照时间的顺序变化,即时间序列化)的数据。



那么,有这么多时序数据库产品,KWDB 到底有何优势呢?接下来,让我们一起来逐一来解读,KWDB 在开放性、可扩展性、性能和易用性方面具备显著优势,KWDB 可以在同一数据库实例内支持同时构建关系型与时序型数据库,实现统一接入、融合存储与分析处理,可高效应对多模态数据融合处理挑战。

三、KWDB 面向 AIoT 场景的分布式、多模融合数据库产品:

KWDB 是一款面向 AIoT 场景的分布式、多模融合数据库产品。支持在同一个实例中建立时序库和关系库,并统一处理多种类型的数据,具备对海量时序数据的高效读写与分析能力。产品具备高可用、安全稳定、易运维等特性,广泛应用于工业物联网、数字能源、车联网、智慧矿山等多个行业领域,为用户提供一站式数据存储、管理与分析的基础平台。



①. 面向 AIoT 场景的分布式、多模融合、支持原生 AI 的数据库产品。


②. 同一实例同时建立时序库和关系库并融合处理多模数据。


③. 具备千万级设备接入、百万级数据秒级写入、亿级数据秒级读取等时序数据高效处理能力。


④. 具有稳定安全、高可用、易运维等特点。


⑤. 面向工业物联网、数字能源、车联网、智慧产业等领域。


⑥. 提供一站式数据存储、管理与分析的基座。

四、Docker 集群部署方案:

首先在安装集群之前,可以思考一下在集群规划阶段,需要合理规划集群的拓扑结构、硬件环境、安全性等方面,目前 KWDB 支持 2 种集群模式:


①. 单副本集群:整个集群只有 1 份数据副本,所有数据的存储和更新操作都由该副本负责,集群节点出现故障时,数据写入、查询和 DDL 操作可能失败。


②. 多副本集群:每份数据默认有 3 份副本,且副本分布在不同节点上,支持高可用性、能够实现故障转移、能够实现数据强一致性等优势。



那么,我们这里推荐去演示的环境也是 3 台设备,为了方便演示,这里使用阿里云的云 ECS 服务器来进行 docker 集群的部署,安装 KWDB 集群相关的软件:


4.1 安装前的准备:

本文实践为 Docker 3 台设备集群环境,操作系统版本依赖的是 Ubuntu 22.04.1。


在部署 KWDB 集群时,系统将对配置文件、运行环境、硬件配置、软件依赖和 SSH 免密登录进行检查,需要根据以下内容检查待部署节点的硬件和软件环境是否符合要求,这里有 2 种安装 KWDB 集群的方式:


①. 裸机部署:使用软件包安装搭建 KWDB 集群方式。


②. 容器部署:使用 docker 镜像容器搭建 KWDB 集群方式。



在使用之前首先一定要在本地环境中准备好 docker、docker-compose 等必要的工具,如果没有的话,建议大家可以先使用以下 shell 脚本进行安装 Docker。


创建一个 Shell 脚本文件名字叫“install_docker.sh”,在脚本中编写相关安装 Docker 的命令。


#!/bin/bash
# 更新包列表apt-get update
# 安装必要的软件包apt install -y apt-transport-https ca-certificates curl gnupg lsb-release software-properties-common
# 创建密钥环目录mkdir -p /etc/apt/keyrings
# 获取并添加Docker的GPG密钥curl -fsSL https://mirrors.aliyun.com/docker-ce/linux/ubuntu/gpg | sudo gpg --dearmor -o /etc/apt/keyrings/docker-archive-keyring.gpg
# 添加Docker的官方APT源echo "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker-archive-keyring.gpg] https://mirrors.aliyun.com/docker-ce/linux/ubuntu $(lsb_release -cs) stable" | sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
# 更新包列表apt update
# 安装Docker及相关工具apt install -y docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin
# 重载Docker守护进程配置systemctl daemon-reload
# 重启Docker服务systemctl restart docker
echo "Docker安装完成并重启。"
复制代码


给脚本“install_docker.sh”添加执行权限并运行脚本:


chmod +x install_docker.sh && ./install_docker.shdocker versiondocker compose version
复制代码


查看 3 台 ECS 云服务器是否全部安装完成 docker 和 docker-compose,并且检查相关版本,我们这次 docker 的般般是 28.1.1,docker compose 的版本是 v2.25.1。



注意一下,三台 CES 主机都需要进行安装 docker 和 docker-compose 软件,因为每台云主机都要跑一个 docker 镜像。

4.2 拉取官方 KWDB 镜像:

因为这次官方有版本要求(v2.2.0),执行 docker pull 拉取 KWDB 容器镜像,这里下载安装的 KWDB 镜像为 kwdb/kwdb:2.2.0。


docker pull kwdb/kwdb:2.2.0
复制代码



注意一下,三台 CES 主机都需要进行 docker 下载 kwdb/kwdb:2.2.0 镜像,因为每台云主机都要跑一个 docker 镜像。



在部署过程中,系统会自动生成相关日志。如果部署时出现错误,用户可以通过查看终端输出或 KWDB 安装目录中 log 目录里的日志文件,获取详细的错误信息。

4.3 生成 KWDB TLS 安全模式证书文件:

以非安全模式部署的集群存在严重的安全风险,KWDB 强烈建议采用安全模式部署集群,通过 TLS 或 TLCP 加密技术验证节点和客户端的身份,并对节点与客户端之间的数据传输进行加密,有效防范未经授权的访问和数据篡改。



因此,我们使用的是 3 台 ECS 云服务器,所以是跨机器的 TLS 安全模式部署,以 TLS 安全模式部署 KWDB 集群, 使用以下命令创建数据库证书颁发机构、root 用户的客户端证书以及节点服务器证书,并且需要同时把这个证书 copy 到其它的云 ECS 服务器上。


①. 使用 ./kwbase cert create-node 命令为所有节点创建证书和密钥。


②. 将 CA 证书和密钥、节点证书和密钥传输至所有节点。


③. 如果需要在其它节点上运行 KWDB 客户端命令,还需要将 root 用户的证书和密钥复制到该节点。


④. 只有拥有 root 用户证书和密钥的节点,才能够访问集群。


docker run --rm --privileged -v /kwdb/certs:/kaiwudb/certs -w /kaiwudb/bin kwdb/kwdb:2.2.0  \bash -c './kwbase cert create-ca --certs-dir=/kaiwudb/certs --ca-key=/kaiwudb/certs/ca.key && \        ./kwbase cert create-client root --certs-dir=/kaiwudb/certs --ca-key=/kaiwudb/certs/ca.key && \        ./kwbase cert create-node 127.0.0.1 localhost 0.0.0.0 8.154.42.14 --certs-dir=/kaiwudb/certs --ca-key=/kaiwudb/certs/ca.key'
复制代码



创建完成后,即可以在本地/kwdb/certs 目录下面生成了一些 CA 的相关证书文件,比如,用户客户端 CA 证书及节点服务器 CA 证书,在访问的时候,就可以通过这个生成的 CA 证书来进行 TLS 安全模式访问。


4.4 创建 KWDB TLS 安全模式部署集群节点:

上面已经帮我们生成了 3 种 CA 证书了,通过 tar 压缩包之后,再通过 scp 同步到其它 2 个节点的 ECS 服务器,这样,就可以进行 KWDB 数据库实例的容器创建了,以下是相关的创建命令,关于创建的相关参数解释如下:


备注:具体参数详情解释可以参考官方页面:https://www.kaiwudb.com/template_version/pc/doc/db-operation/cluster-settings-config.html


docker 创建相关参数解释:



docker 容器启动执行命令相关参数解释:



在了解了相关的参数后,接下来我们就可以使用 TLS 安全模式部署 KWDB 集群,将分别进行 3 台 ECS 云服务器的部署,再以 docker ps 命令查看是否跑起来了。


第一台 kwdb-cluster-server001 云服务器 docker 创建数据库实例命令:


docker run -d --name kwdb-cluster-server001 --privileged \  --ulimit memlock=-1 --ulimit nofile=1048576 \  -p 26257:26257 -p 8080:8080 \  -v /kwdb/certs:/kaiwudb/certs \  -v /kwdb/kwdb-cluster-server001:/kaiwudb/deploy/kaiwudb-container \  --ipc shareable -w /kaiwudb/bin \  kwdb/kwdb:2.2.0 \  ./kwbase start --certs-dir=/kaiwudb/certs --listen-addr=0.0.0.0:26257 \  --advertise-addr=8.154.42.14:26257 --http-addr=0.0.0.0:8080 \  --store=/kaiwudb/deploy/kaiwudb-container --join 8.154.42.14:26257
复制代码


第二台 kwdb-cluster-server002 云服务器 docker 创建数据库实例命令:


docker run -d --name kwdb-cluster-server002 --privileged \  --ulimit memlock=-1 --ulimit nofile=1048576 \  -p 26257:26257 -p 8080:8080 \  -v /kwdb/certs:/kaiwudb/certs \  -v /kwdb/kwdb-cluster-server002:/kaiwudb/deploy/kaiwudb-container \  --ipc shareable -w /kaiwudb/bin \  kwdb/kwdb:2.2.0 \  ./kwbase start --certs-dir=/kaiwudb/certs --listen-addr=0.0.0.0:26257 \  --advertise-addr=8.154.31.6:26257 --http-addr=0.0.0.0:8080 \  --store=/kaiwudb/deploy/kaiwudb-container --join 8.154.42.14:26257
复制代码


第三台 kwdb-cluster-server003 云服务器 docker 创建数据库实例命令:


docker run -d --name kwdb-cluster-server003 --privileged \  --ulimit memlock=-1 --ulimit nofile=1048576 \  -p 26257:26257 -p 8081:8080 \  -v /kwdb/certs:/kaiwudb/certs \  -v /kwdb/kwdb-cluster-server003:/kaiwudb/deploy/kaiwudb-container \  --ipc shareable -w /kaiwudb/bin \  kwdb/kwdb:2.2.0 \  ./kwbase start --certs-dir=/kaiwudb/certs --listen-addr=0.0.0.0:26257 \  --advertise-addr=47.110.243.180:26257 --http-addr=0.0.0.0:8080 \  --store=/kaiwudb/deploy/kaiwudb-container --join 8.154.42.14:26257
复制代码



①. 部署 KWDB 数据实例 1(kwdb-cluster-server001),将 advertise-addr 和 join 对应的 IP 都改成自己对应的公网 IP 地址。


②. 部署 KWDB 数据实例 2(kwdb-cluster-server002),将 advertise-addr 对应的 ip 都改成自己对应的公网 IP 地址,同时,设置 join 对应的 IP 设置为 KWDB 数据库实例 1(kwdb-cluster-server001)的 IP 地址。


③. 部署 KWDB 数据实例 3(kwdb-cluster-server003),将 advertise-addr 对应的 ip 都改成自己对应的公网 IP 地址,同时,设置 join 对应的 IP 设置为 KWDB 数据库实例 1(kwdb-cluster-server001)的 IP 地址。


安全端口说明:


下表列出 KWDB 服务需要映射的端口,在安装部署前,确保目标机器的以下端口没有被占用且没有被防火墙拦截。如果是使用到云服务器的话,则需要将网络安全组的端口限制打开(刚开始没想到,调试很久了):


①. 8080 端口:数据库 Web 服务端口


②. 26257 端口:数据库服务端口、节点监听端口和对外连接端口

4.5 KWDB TLS 安全部署模式创建 Docker 集群实例:

其它详细参数可以参考官网地址:https://www.kaiwudb.com/template_version/pc/doc/tool-command-reference/client-tool/kwbase-sql-reference.html#%E4%BD%BF%E7%94%A8%E4%B8%BE%E4%BE%8B


kwbase init 命令用于初始化 KWDB 集群,使用 TLS 安全模式来访问、验证部署的集群,连接的 KWDB 指定的 IP 节点。


# 初始化KWDB集群docker exec kwdb-cluster-server001 ./kwbase init --certs-dir=/kaiwudb/certs --host=8.154.42.14:26257# 检查所有容器实例的状态docker exec -it kwdb-cluster-server001 ./kwbase  node status --certs-dir=/kaiwudb/certs
复制代码


在初始化完成后,如果执行成功会提示“Cluster successfully initialized”,就表示成功创建了集群,可以使用 node status 来进行查看 KWDB 集群的节点状态,可以看到 3 台集群设备都加进来了,而且状态都是 is_available 的,表示是可以使用的。



# kwbase sql 命令用于开启交互式 SQL Shelldocker exec -it kwdb-cluster-server001 ./kwbase sql --host=8.154.42.14:26257 --certs-dir=/kaiwudb/certs
复制代码


也可以使用 kwbase sql 命令用于开启交互式 SQL Shell,进入数据库控制台中:


①. 创建一个时序数据库,这里需要带一个 TS 的标识(对应的引擎 Time series)。


②. 创建完成后,使用常见的 SQL 查询语句 show databases 即可查看所有的数据库,不需要额外学习其它的 SQL 知识。


③. 在创建时序数据库 printer_iot 之后,我们可以看到其它 2 个 cluster 的节点也自动同步了数据库。

五、场景实操 - KWDB 分布式多模数据库在共享打印机 IoT 最佳实践应用:



公司主要是做打印机共享办公的,在日常办公、学习生活里,打印这件事儿,总能整出不少烦心事:公司赶着开重要会议,资料却还在打印机那儿“排队难产”;学生党熬夜赶出的论文,到打印店一看,色差严重、排版错乱等众多问题,公司推出共享打印机物联网 IoT 模式彻底革新传统打印模式!

5.1 公司共享打印机 IoT 业务介绍:

打印机物联网系统的数据上传物联网数据多种纬度的业务相关时序数据,但是也需要结合设备功能特征和业务场景进行针对性采集,实际采集方案需根据业务场景动态调整,例如共享打印机需强化用户计费数据采集,而工业级设备侧重设备诊断数据密度。


5.2 业务相关表格构创建:

根据上面物联网共享打印机设备相关的数据体系结构,我们可以将创建 7 张不同纬度的时序数据表,用来实际的项目场景体验,如下为在创建表中遇到的一些问题,也可以帮助大家一起来规避。


问题一:创建时序表时,提示:“ERROR: tag printer_id can not be a nullable tag as primary tag”,通过查询原因发现是将 printer_id 设置为既是主键又是可以为空(NULL)的,增加 NOT NULL 属性即可解决问题。


CREATE TABLE printer_status (    timestamp TIMESTAMPTZ NOT NULL,    temperature FLOAT,    fan_speed INTEGER,    print_head_temp FLOAT,    humidity FLOAT,    power_status VARCHAR(50),    error_code VARCHAR(50),    created_at TIMESTAMPTZ NOT NULL,    model VARCHAR(50),    firmware_version VARCHAR(50)) TAGS (printer_id CHAR(32)) PRIMARY TAGS (printer_id);
复制代码



问题二:在创建时序表时,提示:“ERROR: column error_message: unsupported column type string in timeseries table”,通过查询原因发现是尝试将一个不适合直接用于时间序列分析的数据类型(如字符串类型)用在时间序列分析的函数或模型中,将 error_code 中的 text 类型改为 varchar 类型即可解决问题。


CREATE TABLE printer_status (    timestamp TIMESTAMPTZ NOT NULL,    temperature FLOAT,    fan_speed INTEGER,    print_head_temp FLOAT,    humidity FLOAT,    power_status VARCHAR(50),    error_code VARCHAR(50),    created_at TIMESTAMPTZ NOT NULL,    model VARCHAR(50),    firmware_version VARCHAR(50)) TAGS (printer_id CHAR(32) NOT NULL) PRIMARY TAGS (printer_id);
复制代码



最后我们将这 7 张物联网 IoT 相关的业务时序数据表如下,可以看到 table_type 类型也是跟时序数据库一样,是 Time series table 的类型,表示这是时序数据表,跟关系型数据表有略微点不同。


5.3 物联网共享打印机 C 端数据上报最佳实践方案落地实施:

接下来我们使用 go 代码写一段自动生成的脚本来跑一下集群写入数据的方案来试试,来验证一下时序数据库在集群中的性能来对比一下。


首先是 KWDB 数据库连接方式与其它 MySQL 和 PostgreSQL 略有区别,一般来说,在关系型数据库中只需要提供用户名和密码,即可连接到数据库的实例中来,但是上面我们使用的是 TLS 安全模式的部署方式来安装的,所以,这里我们需要使用相关 CA 证书来连接 KWDB 数据库实例。


url := fmt.Sprintf("postgresql://%s@%s/%s?sslmode=verify-full&sslrootcert=%s&sslcert=%s&sslkey=%s",        "root", "8.154.42.14:26257", "printer_iot",        "./certs/ca.crt",        "./certs/client.root.crt",        "./certs/client.root.key")
config, err := pgx.ParseConfig(url) if err != nil { log.Fatalf("error parsing connection configuration: %v", err) }
config.RuntimeParams["application_name"] = "batch_insert_all_tables" conn, err := pgx.ConnectConfig(context.Background(), config) if err != nil { log.Fatalf("error connecting to database: %v", err) } defer conn.Close(context.Background())
复制代码


需要使用到 ca.crt、client.root.crt、client.root.key 这几个文件来进行 TLS 安全模式的验证,这里我们使用的是 github.com/jackc/pgx/v5 的应用程序连接器,如果是其它语言的同学,可以参考官网连接示例:https://www.kaiwudb.com/kaiwudb_docs/#/development/overview.html



接下来就是表的数据生成,这里我们以 printer_status 表为参考实例,使用脚本来生成相关的测试数据来模拟真实场景中大批量的数据插入情况,以下是相关的代码:


func insertPrinterStatus(conn *pgx.Conn, batchSize int, totalInserted map[string]int) {    var values []interface{}    var placeholders []string
for i := 0; i < batchSize; i++ { timestamp := time.Now().Format("2006-01-02 15:04:05.000Z07:00") values = append(values, timestamp, 59.038678+float64(i)*0.1, // temperature 1510+i, // fan_speed 83.219425+float64(i)*0.1, // print_head_temp 0.471526+float64(i)*0.01, // humidity "on", // power_status fmt.Sprintf("E%d", 2791+i), // error_code timestamp, // created_at "Model-7", // model "v3.0.3", // firmware_version fmt.Sprintf("printer_%04d", totalInserted["printer_status"]+i), // printer_id )
start := i * 11 + 1 placeholder := fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d)", start, start+1, start+2, start+3, start+4, start+5, start+6, start+7, start+8, start+9, start+10) placeholders = append(placeholders, placeholder) }
sql := fmt.Sprintf("INSERT INTO printer_status (timestamp, temperature, fan_speed, print_head_temp, humidity, power_status, error_code, created_at, model, firmware_version, printer_id) VALUES %s", strings.Join(placeholders, ","))
_, err := conn.Exec(context.Background(), sql, values...) if err != nil { log.Printf("error batch inserting printer_status data: %v\n", err) return }
totalInserted["printer_status"] += batchSize log.Printf("成功插入 %d 条 printer_status 记录,总计: %d\n", batchSize, totalInserted["printer_status"])}
复制代码



这里需要注意一下就是时间戳的问题,在刚开始测试时,发现使用 time.Now()函数,显示成功,但是就是数据库 count 没有数据,只能打印 SQL 语句在 shell 控制台进行操作,发现时间上有问题,后面改为 time.Now().Format(“2006-01-02 15:04:05.000Z07:00”)时间格式即可插入到 KWDB 多模分布式数据库中。


// 设置批量插入的参数    batchSize := 5000 // 每批次插入记录数    totalInserted := make(map[string]int)
for { // 为每个表执行批量插入 insertPrinterStatus(conn, batchSize, totalInserted) insertPrinterCounter(conn, batchSize, totalInserted) insertConsumableStatus(conn, batchSize, totalInserted) insertPrintJob(conn, batchSize, totalInserted) insertPaymentRecord(conn, batchSize, totalInserted) insertFirmwareUpgrade(conn, batchSize, totalInserted) insertHardwareCheck(conn, batchSize, totalInserted) }
复制代码

5.3.1 问题描述 - SQL 语句执行成功,但是数据库不增加数据:

但是在实际的代码测试中,遇到以下问题,SQL 并没有抛错,但是数据库也不增加数据?只能进行打印 SQL 日志在 shell 控制台来进行测试。

5.3.2 异常原因分析:

发现报错的语句在 SQL shell 控制台中是执行成功了,显示 insert 1,表示成功了,但是查询 select count(*)总数时,并没有发现成功了,经过与插入成功的数据字段类型与值进行双向比对,发现是时间的原因,更换正确的时间后插入正常。


# 报错语句:INSERT INTO printer_status (timestamp, temperature, fan_speed, print_head_temp, humidity, power_status, error_code, created_at, model, firmware_version, printer_id) VALUES ('2025-05-23T11:00:42Z', 58.476,534, 38.6705, 89.04077, 'error','E4380', '2025-05-23T11:00:42Z', 'Model-7', 'v2.5.3', 'printer_0000');# 正常语句:INSERT INTO printer_status (timestamp, temperature, fan_speed, print_head_temp, humidity, power_status, error_code, created_at, model, firmware_version, printer_id) VALUES ('2025-05-23 11:00:42.605+00:00', 58.476,534, 38.6705, 89.04077, 'error','E4380', '2025-05-23 11:00:42.605+00:00', 'Model-7', 'v2.5.3', 'printer_0000');
复制代码



批量设置插入数据参数逻辑:


// 设置批量插入的参数    batchSize := 5000 // 每批次插入记录数    totalInserted := make(map[string]int)
for { // 为每个表执行批量插入 insertPrinterStatus(conn, batchSize, totalInserted) insertPrinterCounter(conn, batchSize, totalInserted) insertConsumableStatus(conn, batchSize, totalInserted) insertPrintJob(conn, batchSize, totalInserted) insertPaymentRecord(conn, batchSize, totalInserted) insertFirmwareUpgrade(conn, batchSize, totalInserted) insertHardwareCheck(conn, batchSize, totalInserted) }
复制代码



接下来就是批量数据操作函数定义,这里可以设置批量插入的数量,但是发现超过 5000 行,就会显示“error batch inserting data: Error: more than 65535 arguments to prepared statmement: 1100000(SQLSTATE 08P01)”。


2025/05/23 21:54:12 error batch inserting data: ERROR: more than 65535 arguments to prepared statement: 1100000 (SQLSTATE 08P01)2025/05/23 21:54:21 error batch inserting data: ERROR: prepared statement "stmtcache_ff8a6b3e681096237b63a367a2c9c29264dd5bd3d5c1af9d" already exists (SQLSTATE 42P05)2025/05/23 21:54:30 error batch inserting data: ERROR: prepared statement "stmtcache_ff8a6b3e681096237b63a367a2c9c29264dd5bd3d5c1af9d" already exists (SQLSTATE 42P05)2025/05/23 21:54:40 error batch inserting data: ERROR: prepared statement "stmtcache_ff8a6b3e681096237b63a367a2c9c29264dd5bd3d5c1af9d" already exists (SQLSTATE 42P05)2025/05/23 21:54:49 error batch inserting data: ERROR: prepared statement "stmtcache_ff8a6b3e681096237b63a367a2c9c29264dd5bd3d5c1af9d" already exists (SQLSTATE 42P05)2025/05/23 21:54:58 error batch inserting data: ERROR: prepared statement "stmtcache_ff8a6b3e681096237b63a367a2c9c29264dd5bd3d5c1af9d" already exists (SQLSTATE 42P05)2025/05/23 21:55:07 error batch inserting data: ERROR: prepared statement "stmtcache_ff8a6b3e681096237b63a367a2c9c29264dd5bd3d5c1af9d" already exists (SQLSTATE 42P05)2025/05/23 21:55:16 error batch inserting data: ERROR: prepared statement "stmtcache_ff8a6b3e681096237b63a367a2c9c29264dd5bd3d5c1af9d" already exists (SQLSTATE 42P05)
复制代码


接下来是上面完整的代码:


package main
import ( "context" "fmt" "log" "strings" "time" "math/rand"
"github.com/jackc/pgx/v5")
func main() { url := fmt.Sprintf("postgresql://%s@%s/%s?sslmode=verify-full&sslrootcert=%s&sslcert=%s&sslkey=%s", "root", "8.154.42.14:26257", "printer_iot", "./certs/ca.crt", "./certs/client.root.crt", "./certs/client.root.key")
config, err := pgx.ParseConfig(url) if err != nil { log.Fatalf("error parsing connection configuration: %v", err) }
config.RuntimeParams["application_name"] = "batch_insert_all_tables" conn, err := pgx.ConnectConfig(context.Background(), config) if err != nil { log.Fatalf("error connecting to database: %v", err) } defer conn.Close(context.Background())
// 设置批量插入的参数 batchSize := 5000 // 每批次插入记录数 totalInserted := make(map[string]int)
for { // 为每个表执行批量插入 insertPrinterStatus(conn, batchSize, totalInserted) insertPrinterCounter(conn, batchSize, totalInserted) insertConsumableStatus(conn, batchSize, totalInserted) insertPrintJob(conn, batchSize, totalInserted) insertPaymentRecord(conn, batchSize, totalInserted) insertFirmwareUpgrade(conn, batchSize, totalInserted) insertHardwareCheck(conn, batchSize, totalInserted) }}
func insertPrinterStatus(conn *pgx.Conn, batchSize int, totalInserted map[string]int) { var values []interface{} var placeholders []string
for i := 0; i < batchSize; i++ { timestamp := time.Now().Format("2006-01-02 15:04:05.000Z07:00") values = append(values, timestamp, 59.038678+float64(i)*0.1, // temperature 1510+i, // fan_speed 83.219425+float64(i)*0.1, // print_head_temp 0.471526+float64(i)*0.01, // humidity "on", // power_status fmt.Sprintf("E%d", 2791+i), // error_code timestamp, // created_at "Model-7", // model "v3.0.3", // firmware_version fmt.Sprintf("printer_%04d", totalInserted["printer_status"]+i), // printer_id )
start := i * 11 + 1 placeholder := fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d)", start, start+1, start+2, start+3, start+4, start+5, start+6, start+7, start+8, start+9, start+10) placeholders = append(placeholders, placeholder) }
sql := fmt.Sprintf("INSERT INTO printer_status (timestamp, temperature, fan_speed, print_head_temp, humidity, power_status, error_code, created_at, model, firmware_version, printer_id) VALUES %s", strings.Join(placeholders, ","))
_, err := conn.Exec(context.Background(), sql, values...) if err != nil { log.Printf("error batch inserting printer_status data: %v\n", err) return }
totalInserted["printer_status"] += batchSize log.Printf("成功插入 %d 条 printer_status 记录,总计: %d\n", batchSize, totalInserted["printer_status"])}
func insertPrinterCounter(conn *pgx.Conn, batchSize int, totalInserted map[string]int) { var values []interface{} var placeholders []string
for i := 0; i < batchSize; i++ { timestamp := time.Now().Format("2006-01-02 15:04:05.000Z07:00") values = append(values, timestamp, 10000+i, // total_pages 3000+i, // color_pages 7000+i, // grayscale_pages timestamp, // created_at fmt.Sprintf("printer_%04d", totalInserted["printer_counter"]+i), // printer_id )
start := i * 6 + 1 placeholder := fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d)", start, start+1, start+2, start+3, start+4, start+5) placeholders = append(placeholders, placeholder) }
sql := fmt.Sprintf("INSERT INTO printer_counter (timestamp, total_pages, color_pages, grayscale_pages, created_at, printer_id) VALUES %s", strings.Join(placeholders, ","))
_, err := conn.Exec(context.Background(), sql, values...) if err != nil { log.Printf("error batch inserting printer_counter data: %v\n", err) return }
totalInserted["printer_counter"] += batchSize log.Printf("成功插入 %d 条 printer_counter 记录,总计: %d\n", batchSize, totalInserted["printer_counter"])}
func insertConsumableStatus(conn *pgx.Conn, batchSize int, totalInserted map[string]int) { var values []interface{} var placeholders []string
for i := 0; i < batchSize; i++ { timestamp := time.Now().Format("2006-01-02 15:04:05.000Z07:00") values = append(values, timestamp, "toner", // type 80-i%20, // level 5000-i%1000, // estimated_pages timestamp, // created_at fmt.Sprintf("printer_%04d", totalInserted["consumable_status"]+i), // printer_id fmt.Sprintf("consumable_%04d", totalInserted["consumable_status"]+i), // consumable_id )
start := i * 7 + 1 placeholder := fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d, $%d)", start, start+1, start+2, start+3, start+4, start+5, start+6) placeholders = append(placeholders, placeholder) }
sql := fmt.Sprintf("INSERT INTO consumable_status (timestamp, type, level, estimated_pages, created_at, printer_id, consumable_id) VALUES %s", strings.Join(placeholders, ","))
_, err := conn.Exec(context.Background(), sql, values...) if err != nil { log.Printf("error batch inserting consumable_status data: %v\n", err) return }
totalInserted["consumable_status"] += batchSize log.Printf("成功插入 %d 条 consumable_status 记录,总计: %d\n", batchSize, totalInserted["consumable_status"])}
func insertPrintJob(conn *pgx.Conn, batchSize int, totalInserted map[string]int) { var values []interface{} var placeholders []string
for i := 0; i < batchSize; i++ { timestamp := time.Now().Format("2006-01-02 15:04:05.000Z07:00") values = append(values, timestamp, fmt.Sprintf("job_%04d", totalInserted["print_job"]+i), // job_id fmt.Sprintf("user_%04d", rand.Intn(1000)), // user_id "pdf", // file_type 10+rand.Intn(50), // page_count rand.Intn(10), // color_pages 1, // status "", // error_code 60+rand.Intn(300), // duration timestamp, // created_at fmt.Sprintf("printer_%04d", totalInserted["print_job"]+i), // printer_id )
start := i * 11 + 1 placeholder := fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d)", start, start+1, start+2, start+3, start+4, start+5, start+6, start+7, start+8, start+9, start+10) placeholders = append(placeholders, placeholder) }
sql := fmt.Sprintf("INSERT INTO print_job (timestamp, job_id, user_id, file_type, page_count, color_pages, status, error_code, duration, created_at, printer_id) VALUES %s", strings.Join(placeholders, ","))
_, err := conn.Exec(context.Background(), sql, values...) if err != nil { log.Printf("error batch inserting print_job data: %v\n", err) return }
totalInserted["print_job"] += batchSize log.Printf("成功插入 %d 条 print_job 记录,总计: %d\n", batchSize, totalInserted["print_job"])}
func insertPaymentRecord(conn *pgx.Conn, batchSize int, totalInserted map[string]int) { var values []interface{} var placeholders []string
for i := 0; i < batchSize; i++ { timestamp := time.Now().Format("2006-01-02 15:04:05.000Z07:00") values = append(values, timestamp, 50.0+float64(rand.Intn(200)), // amount fmt.Sprintf("coupon_%04d", rand.Intn(1000)), // coupon_id 5.0+float64(rand.Intn(20)), // coupon_amount "wechat", // payment_method fmt.Sprintf("trans_%04d", totalInserted["payment_record"]+i), // transaction_id timestamp, // created_at fmt.Sprintf("printer_%04d", totalInserted["payment_record"]+i), // printer_id fmt.Sprintf("user_%04d", rand.Intn(1000)), // user_id )
start := i * 9 + 1 placeholder := fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d)", start, start+1, start+2, start+3, start+4, start+5, start+6, start+7, start+8) placeholders = append(placeholders, placeholder) }
sql := fmt.Sprintf("INSERT INTO payment_record (timestamp, amount, coupon_id, coupon_amount, payment_method, transaction_id, created_at, printer_id, user_id) VALUES %s", strings.Join(placeholders, ","))
_, err := conn.Exec(context.Background(), sql, values...) if err != nil { log.Printf("error batch inserting payment_record data: %v\n", err) return }
totalInserted["payment_record"] += batchSize log.Printf("成功插入 %d 条 payment_record 记录,总计: %d\n", batchSize, totalInserted["payment_record"])}
func insertFirmwareUpgrade(conn *pgx.Conn, batchSize int, totalInserted map[string]int) { var values []interface{} var placeholders []string
for i := 0; i < batchSize; i++ { timestamp := time.Now().Format("2006-01-02 15:04:05.000Z07:00") values = append(values, timestamp, "v2.0.0", // old_version "v3.0.0", // new_version true, // success "", // error_message 300+rand.Intn(600), // duration timestamp, // created_at fmt.Sprintf("printer_%04d", totalInserted["firmware_upgrade"]+i), // printer_id )
start := i * 8 + 1 placeholder := fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d)", start, start+1, start+2, start+3, start+4, start+5, start+6, start+7) placeholders = append(placeholders, placeholder) }
sql := fmt.Sprintf("INSERT INTO firmware_upgrade (timestamp, old_version, new_version, success, error_message, duration, created_at, printer_id) VALUES %s", strings.Join(placeholders, ","))
_, err := conn.Exec(context.Background(), sql, values...) if err != nil { log.Printf("error batch inserting firmware_upgrade data: %v\n", err) return }
totalInserted["firmware_upgrade"] += batchSize log.Printf("成功插入 %d 条 firmware_upgrade 记录,总计: %d\n", batchSize, totalInserted["firmware_upgrade"])}
func insertHardwareCheck(conn *pgx.Conn, batchSize int, totalInserted map[string]int) { var values []interface{} var placeholders []string
for i := 0; i < batchSize; i++ { timestamp := time.Now().Format("2006-01-02 15:04:05.000Z07:00") values = append(values, timestamp, "normal", // print_head_check "success", // calibration "passed", // diagnostics "normal", // temperature 0.471526+float64(i)*0.01, // humidity 220.0+float64(rand.Intn(10)), // voltage timestamp, // created_at fmt.Sprintf("printer_%04d", totalInserted["hardware_check"]+i), // printer_id )
start := i * 9 + 1 placeholder := fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d)", start, start+1, start+2, start+3, start+4, start+5, start+6, start+7, start+8) placeholders = append(placeholders, placeholder) }
sql := fmt.Sprintf("INSERT INTO hardware_check (timestamp, print_head_check, calibration, diagnostics, temperature, humidity, voltage, created_at, printer_id) VALUES %s", strings.Join(placeholders, ","))
_, err := conn.Exec(context.Background(), sql, values...) if err != nil { log.Printf("error batch inserting hardware_check data: %v\n", err) return }
totalInserted["hardware_check"] += batchSize log.Printf("成功插入 %d 条 hardware_check 记录,总计: %d\n", batchSize, totalInserted["hardware_check"])}
复制代码

5.3.3 问题描述 - 其它集群机器没有数据同步:

上面通过 go 的脚本在跑数据,满心欢喜的去查询 kwdb-cluster-server002 和 kwdb-cluster-server003 的数据库实例的数据,但是查不了任何一张表,都是报错:“Error: no inbound stream connection”,只 kwdb-cluster-server001 有数据,有这是什么问题呢?


①. 创建完 3 台 KWDB 数据库实例后,也可以看到时序数据库 printer_iot 是可以同步过来的。


②. 创建完 7 张相关时序表也是可以同步到过来的。



没有办法,只能在 kwdb-cluster-server001 查看 logs 日志看看能不能发现问题呢?果然,看到这是这里提示大概意思是 grpc 连接不到 47.110.243.180 的 26257 端口和 8.154.31.180 的 26257 端口,但是安全组肯定是开放了的。


5.3.4 异常原因分析:

最后排查到,发现在插入的语句在创建证书的语句存在问题,需要将所有的 ip 都加入进来,不能只加 kwdb-cluster-server001 的服务器 IP 地址,将另外 2 台服务器的 IP 地址也加上即可,再生成 CA 相关证书,copy 到其它节点上再创建一下容器即可解决这个问题。



docker run --rm --privileged -v /kwdb/certs:/kaiwudb/certs -w /kaiwudb/bin kwdb/kwdb:2.2.0  \bash -c './kwbase cert create-ca --certs-dir=/kaiwudb/certs --ca-key=/kaiwudb/certs/ca.key && \        ./kwbase cert create-client root --certs-dir=/kaiwudb/certs --ca-key=/kaiwudb/certs/ca.key && \        ./kwbase cert create-node 127.0.0.1 localhost 0.0.0.0 8.154.42.14 47.110.243.180 8.154.31.6 --certs-dir=/kaiwudb/certs --ca-key=/kaiwudb/certs/ca.key'
复制代码


按上面的步骤重新进行删除容器再进行部署操作,再重复以上操作即可以完成新的环境重新进行部署,可以发现这个问题就能进行解决,3 台服务器的数据都已经进行同步了。

5.3.5 三台服务器实例数据同步对比:

上面解决了其它 2 台数据不同步的问题的,我们就可以进行大批量插入的动作,可以看到数据同步也是比较及时的,就算略微有一些差异也是在 1 秒内就能同步完成,基本可以说没有太大的数据延迟的问题:



①. 【查询期间无数据插入情况】第一次查询,3 台服务器都同时在 12:04:50 的时候进行查询,可以发现查询的表数据量都是 275000 条,都是一致的,在查询后马上就有数据插入的操作。


②. 【查询期间有数据插入情况】第二次查询,3 台服务器都同时在 12:05:00 的时候进行查询,可以发现查询的表数据量都是 275000 条,都是一致的,在查询前马上就有数据插入的操作,但是数据应该还没有插入到数据库中。


③. 【查询期间有数据插入情况】第三次查询,3 台服务器都同时在 12:01:10 的时候进行查询,可以发现查询的表数据量不一致的(差异不大),在查询前马上就有数据插入的操作,表示数据正在插入的过程。


同时,也可以观看以下视屏,可以发现在 3 台服务器数据同步的过程中,延迟不超过 1s,就算略微有一些差异也是在 1 秒内就能同步完成,基本可以说没有太大的数据延迟的问题,之前用某云时,主从数据库会存在 2-3s 的延迟,就是先注册马上登陆会提示“用户数据不存在”,只能使用延迟几秒提示“正在登录中”来解决这个问题。


通过查看 kwdb-cluster-server01 的数据库同步的日志,如下可以分析通过了 244 个 goroutines 在跑日志,将数据同步到其它 2 个节点中。


5.3.6 某一台服务器宕机模拟演示操练:

通过使用 docker stop 容器,可以将 KWDB 其中一台 kwdb_cluster_server002 暂时停止,通过查看集群节点状态,发现第二台 kwdb_cluster_server002 确实已经已经 is_available 是 flase 的。



但是还想通过 docker start 启动时,发现启动不了,查看 logs 日志发现识别有点问题,像是识别成了新增的节点 ,而不是原来的节点重新启动,向官方人员咨询也没有得到很好的回复,希望尽快修复这个问题点。


** INFO: initial startup completed.* Node will now attempt to join a running cluster, or wait for `kwbase init`.* Client connections will be accepted after this completes successfully.* Check the log file(s) for progress. *** WARNING: The server appears to be unable to contact the other nodes in the cluster. Please try:* * - starting the other nodes, if you haven't already;* - double-checking that the '--join' and '--listen'/'--advertise' flags are set up correctly;* - running the 'kwbase init' command if you are trying to initialize a new cluster.* * If problems persist, please see on KWDB web site.*KWDB node starting at 2025-05-24 05:45:50.226961358 +0000 UTC (took 37.4s)build:                2.2.0 @ 2025/03/31 07:20:02 (go1.16.15)sql:                 postgresql://root@8.154.31.6:26257?sslcert=%2Fkaiwudb%2Fcerts%2Fclient.root.crt&sslkey=%2Fkaiwudb%2Fcerts%2Fclient.root.key&sslmode=verify-full&sslrootcert=%2Fkaiwudb%2Fcerts%2Fca.crtRPC client flags:    ./kwbase <client cmd> --host=8.154.31.6:26257 --certs-dir=/kaiwudb/certslogs:                /kaiwudb/deploy/kaiwudb-container/logstemp dir:            /kaiwudb/deploy/kaiwudb-container/kwbase-temp023690807external I/O path:   /kaiwudb/deploy/kaiwudb-container/externstore[0]:            path=/kaiwudb/deploy/kaiwudb-containerstorage engine:      rocksdbstatus:              initialized new node, joined pre-existing clusterclusterID:           1e886e85-df09-4ba1-abfc-07767fb6382bnodeID:              2initiating graceful shutdown of server** ERROR: Cluster expansion feature needs an enterprise license to enable.*
复制代码


另外,在其它节点(kwdb_cluster_server002、kwdb_cluster_server003)的 SQL shell 控制台中进行查询会出现以下的报错,但是在 kwdb_cluster_server001 没有发现此问题,不过,通过程序查询 SQL 没有这种问题出现。


本文总结:



上面是基于 Docker 的 KWDB TLS 安全部署集群,并且使用物联网共享打印机的最佳落地方案实践顺利的完成,搭建了 3 台节点集群并完成了时序数据库、时序数据表的创建、访问、同步的各种场景测试,同时,体验了 KWDB 分布式多模数据库在 IoT 的强大的分布式能力与数据插入高性能的特点。


发布于: 刚刚阅读数: 6
用户头像

KaiwuDB

关注

还未添加个人签名 2021-04-29 加入

KaiwuDB 是浪潮集团控股的数据库企业,公司汇聚了全球顶尖的数据库人才,以多模数据库为核心产品,面向工业物联网、数字能源、交通车联网、智慧产业等各大行业领域,提供领先创新的数据服务软件。

评论

发布
暂无评论
KWDB多模分布式数据库助力共享打印机物联网IoT最佳实践落地,实现高效存储与查询时序数据_KaiwuDB_InfoQ写作社区