作者:完美句号
原文链接:https://blog.csdn.net/wanmeijuhao/article/details/147965173?spm=1001.2014.3001.5501
一、序列概括:
随着物联网技术在各行业的使用将越来越广,物联网与人工智能的深度结合,特别是在智能家居、智慧城市、工业 4.0 等领域有着广泛的应用,AIoT(Artificial Intelligence of Things)正成为企业实现智能化转型的关键路径。
上面一篇长文已经介绍了 KWDB 的安装以及踩过的坑,而且也对 TSBS 专门针对时间序列场景做了压力测试。
接下来,将从第 2 个方向“场景案例”一起来探索公司实际物联网 IoT 项目的思考,是否能让 KWDB 分布式多模数据库碰撞出不一样的火花呢?
二、正文前言:
本人在学校毕业后一直从事开发工作,经过了以下几个阶段的软件开发历程,从早期的 Web 时代编程、到云时代分布式编程,到如今的物联网 AIoT 时代,都是解决企业级项目开发的效率问题,通过不断的技术演进,来为公司带来降本增效的原则。
2.1 传统 Web 时代编程开发时代:
传统 Web 时代编程开发时代是一个以静态网页为主,依赖 HTML、CSS 和 JavaScript 等技术栈,通过服务器端处理逻辑和数据库交互,实现网页动态内容和交互功能的时代。
2.2 云时代分布式编程时代:
云时代分布式编程时代,强调利用云计算平台进行数据的分布式处理和应用程序的编写与开发。依托分布式架构构建高可用服务集群,以服务化 SaaS 交付模式降低 IT 运维复杂度,推动企业从固定硬件投入向灵活智能的数字化服务转型。
2.3 物联网 AIoT 时代:
物联网 AIoT 时代,就像给世界装上“超级感官”与“智慧大脑”——机器之间能“对话”,数据像血液般流动:红灯预判车流、工厂设备自主巡检,甚至冰箱会提醒牛奶过期;万物从沉默到觉醒,效率与便利在指尖悄然生长,生活像被施了魔法,一切变得更聪明、更贴心。
三、物联网 AIoT 时代数据管理的新途径 – “时序数据库”:
KWDB 的时序数据库(Timeseries Database)和关系型数据库(Relational Database)使用不同的存储引擎和元数据管理方式:
①. 时序数据库:专门存储带时间戳的序列化数据,要求表结构包含主时间戳字段和标签字段。
②. 关系型数据库:支持传统关系模型,可定义主键、外键、索引等约束,适用于非时间序列数据。
KWDB 正在成为新一代物联网与智能系统的核心数据引擎。如果您正面临复杂的数据接入、多模数据管理或实时计算需求,不妨尝试一下这款专为 AIoT 设计的数据库产品,拥抱真正面向未来的数据基础设施。
四、企业内部物联网 IoT 净水机业务平台方案思考:
公司企业的净水机业务就是采用了物联网 IoT 的设计方案,能够实时对设备的运行状态、水质状态以及滤芯状态进行全面而精确的监测。通过高效的数据传输机制,该系统实现了对设备的智能化管理,包括:
①. 同步水卡消费记录、商家或设备的记录。
②. 查询 TDS 值、CPF/DF/CF 值上传。
③. 远程开关机、重启机器、机器预警、调水费。
④. 滤芯寿命、水温、水量、pH 值、电导率、温度。
五、KWDB 在净水机物联网 IoT 方案落地最佳实践:GoLang 应用程序 + 香橙派 Orange Pi AI Pro 开发板场景案例测试:
在实际项目中,我们通过开发语言使用数据库提供的 SDK 连接到 KWDB 数据库实例进行 CURD 的操作,可以看到这里分为 2 种场景:
①. 应用程序配置连接器 KWDB,可以看到官网提供了 10 几种不同应用程序连接器的应用开发。
②. 第三方工具与 KWDB 数据读写同步,有分布式消息中间件、离线数据同步工具、开源指标收集工具的数据读写同步。
5.1 使用 pgx 驱动连接 KWDB:
KWDB 支持可以通过 pgx 驱动连接数据库,并执行创建、插入和查询操作。下面代码演示如何使用 Go 语言通过 pgx 驱动连接 KWDB。
pgx 是用 Go 语言编写的 PostgreSQL 驱动和工具包,提供了高性能的低级接口,支持用户直接利用 PostgreSQL 的特性,pgx 还包含一个适配器,与标准的数据库或 SQL 接口兼容,方便开发者进行数据库操作。
go get github.com/jackc/pgx/v5@v5.5.1
复制代码
5.2 香橙派 Orange Pi AI Pro 开发板:
香橙派 Orange Pi AI Pro 开发板凭借其硬件性能与生态适配能力,在物联网(IoT)领域展现了显著优势,开发板通过高性能 AI 加速能力与灵活扩展性,提供了强大的训练能力,优异的推理能力,可以满足大多数 AI 算法原型验证、推理应用开发的需求,可以构建智能物联网节点的理想硬件平台。
以下是简单的香橙派 Orange Pi AI Pro 开发板相关硬件设备(电源线、开发板、TF 卡),经过几分钟简单的硬件组装后,因为需要进行调试,所以,插入相关的 USB 和 HDMI 插头后,开机登陆后即可进入桌面。
5.3 Golang 模拟测试场景实践:
在香橙派 Orange Pi AI Pro 开发板上,跑一个基于 Go 语言开发的物联网净水器性能压测系统,用于模拟大量设备并发上传数据和请求操作的场景。
5.3.1 KWDB - 时序数据库:
在数据库管理中,KWDB 的时序数据库(Time-Series Database, TSD)是一种专门用于存储和管理随时间变化的数据的数据库。这类数据库特别适用于物联网(IoT)、监控系统、日志分析、金融分析等领域,能够高效地处理大量时间序列数据。
首先我们需要新建一个测试使用的数据库,这里需要注意一下,跟平时的建表语句不同的是,需要携带一个 TS 的参数标识,表示是 Time-Series 时序数据库。
这里的分区时间范围比较实用,会进行定期检测删除,删除的依据就是分区中最新的数据已经超过了保存周期的话就执行删除,比较类似于 Redis 的“定时删除策略”,会创建一个定时事件,当时间到达时,由事件处理器自动执行 key 的删除操作相当于会维护一个进程去进行检测。
# 创建iot时序数据库成功
CREATE TS DATABASE iot RETENTIONS 50d PARTITION INTERVAL 2d;
# 查看所有数据库
show databases;
# 查看时序数据库的建库语句
show create database iot;
# 切换指定的数据库
use iot;
复制代码
这种数据库的特点,非常适合我们的这种业务,比如在业务的设计中存在“冷热数据”,MongoDB 的存在是物联网最新上传的数据,老数据会针对性的进行数据刷选备份到 MySQL 进行存档保存。
5.3.2 KWDB - 时序表:
KWDB 时序表(TIME SERIES TABLE)是用于存储时间序列数据的数据表,在与平常接触的建表语句不太一样,而且有些数据类型、函数也是不支持的,具体支持的数据类型可以直接查看官方的文档 - 时序数据类型。
时序表跟关系型常规的建表语句有一些差异性,这里我做了一些总结,一张表包含 3 种类型的列:
①. 时间戳列:用于记录数据采集的时间。
②. 标签列:用于记录采集对象的静态数据,比如设备编号、标识位置、设备的型号,固定不变的数据。
③. 字段列:用于记录采集对象的实时数据,比如一直变化的数据,设备产生了不同的时间的收费、电流、电压。
接着创建 2 张时序表,一张时序表是记录净水机设备相关信息,比如这台净水机设备实时相关设备数据:TDS 值、CPF/DF/CF 值、pH 值、水温、水量、电导率、滤芯寿命、设备状态等信息。
CREATE TABLE device_data (
timestamp timestamptz NOT NULL,
tds float NOT NULL,
cpf float NOT NULL,
df float NOT NULL,
cf float NOT NULL,
ph float NOT NULL,
temperature float NOT NULL,
water_volume float NOT NULL,
filter_life INTEGER NOT NULL,
conductivity float NOT NULL,
power_status BOOLEAN NOT NULL,
created_at timestamptz NOT NULL default now()
) TAGS (
device_id VARCHAR(50) NOT NULL
) PRIMARY TAGS (device_id);
复制代码
另外一张时序表是与消费相关的,主要记录净水机设备交易数据,比如水卡消费记录、商家/设备记录。
CREATE TABLE consumption_record (
timestamp timestamptz NOT NULL,
amount float NOT NULL,
created_at timestamptz NOT NULL default now()
) TAGS (
device_id VARCHAR(50) NOT NULL,
card_id VARCHAR(50) NOT NULL
) PRIMARY TAGS (device_id);
复制代码
5.4 Golang 模拟压测脚本
上面可以看到数据库相关的准备工作已经就绪,接下来就是编写 go 相关代码,以下是 go 代码压测方案的示意图:
①. 在香橙派 Orange Pi AI Pro 开发板上运行 go 压测程序,支持动态并发数、测试持续时间、模拟设备数量,上传支持水卡消费记录同步和模拟设备数据指标(TDS 值、CPF/DF/CF 值、pH 值等)。
②. 我们这次压力测试的是 KWDB 单节点版本安全模式下进行测试,推荐在集群环境进行测试,这样会更容易贴近生产环境。
③. 最后通过 KWDB 平台集成部署 Prometheus 和 Grafana,配置好告警规则和聚合规则配置文件,即可在 Grafana 中查看相关监控指标数据。
监控部署,可以直接查看官方的手册进行安装与配置(https://www.kaiwudb.com/kaiwudb_docs/#/db-monitor/os-monitor-component/deploy-monitoring.html)。
生成模拟设备数据指标(TDS 值、CPF/DF/CF 值、pH 值等),和生成模拟支持水卡消费记录的 Mock 数据:
// generateDeviceData 生成模拟设备数据
func generateDeviceData(deviceID string) db.DeviceData {
return db.DeviceData{
DeviceID: deviceID,
Timestamp: time.Now(),
TDS: rand.Float64() * 1000,
CPF: rand.Float64() * 100,
DF: rand.Float64() * 100,
CF: rand.Float64() * 100,
PH: rand.Float64()*6 + 1, // pH 1-7
Temperature: rand.Float64()*30 + 10, // 10-40度
WaterVolume: rand.Float64() * 1000,
FilterLife: rand.Intn(100),
Conductivity: rand.Float64() * 2000,
PowerStatus: rand.Intn(2) == 1,
}
}
// generateConsumptionRecord 生成模拟消费记录
func generateConsumptionRecord(deviceID string) db.ConsumptionRecord {
return db.ConsumptionRecord{
CardID: fmt.Sprintf("CARD_%d", rand.Intn(1000)),
DeviceID: deviceID,
Amount: rand.Float64() * 100,
Timestamp: time.Now(),
}
}
复制代码
在 main.go 中,可以定义 TestConfig 结构体的参数来调整测试配置,如 Concurrency(并发数)、Duration:(测试持续时间)、DeviceCount(模拟设备数量)、ReportInterval(报告输出间隔),并且通过使用 goroutine 池可以管理并发数量,还可以通过 channel 控制并发数量,使用 sync.Mutex 确保指标统计的准确性。
// 定期输出连接池状态
go func() {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
stats := kwdb.GetStats()
log.Printf("连接池状态 - 总连接数: %d, 空闲连接数: %d, 使用中连接数: %d\n",
stats.TotalConns(), stats.IdleConns(), stats.TotalConns()-stats.IdleConns())
}
}
}()
// 测试配置
config := TestConfig{
Concurrency: 100,
Duration: 1 * time.Minute,
DeviceCount: 1000,
ReportInterval: 5 * time.Second,
}
// 初始化测试指标
metrics := &TestMetrics{}
// 创建等待组
var wg sync.WaitGroup
// 开始时间
startTime := time.Now()
// 定期报告测试进度
go func() {
for {
time.Sleep(config.ReportInterval)
if time.Since(startTime) >= config.Duration {
break
}
log.Printf("进行中 - 总请求: %d, 成功: %d, 失败: %d, 平均延迟: %v\n",
metrics.TotalRequests,
metrics.SuccessRequests,
metrics.FailedRequests,
metrics.AverageLatency)
}
}()
// 主测试循环
for time.Since(startTime) < config.Duration {
// 控制并发数
if metrics.TotalRequests%int64(config.Concurrency) == 0 {
time.Sleep(100 * time.Millisecond)
}
wg.Add(1)
deviceID := fmt.Sprintf("DEVICE_%d", rand.Intn(config.DeviceCount))
go simulateDevice(deviceID, metrics, &wg, kwdb)
}
// 等待所有goroutine完成
wg.Wait()
复制代码
注意:时间字段去掉 CURRENT_TIMESTAMP (且设置了 not_null),在插入数据时创建时间是空,这里就会提示报错提示:
ERROR: null value in column "created_at" violates not-null constraint (SQLSTATE 23502)
复制代码
另外,也在思考,如果多台设备同时运行,肯定会存在并发的问题,上面可以看到第一列是时间列,而且是默认的索引,那么会不会在同一时间段,产生多条时间一样的数据呢?这样会不会存在问题呢?会不会产生时间一样的冲突呢?带着这个问题,我们可以做以下的测试:
结果,可以得出结论:时间对于不同设备来说是可以重复的 ,同一个设备不行,因为同一设备不可能在同一时间产生 2 个值。
相关代码:
package main
import (
"fmt"
"log"
"math/rand"
"sync"
"time"
"trae/db"
"trae/config"
)
// TestConfig 测试配置
type TestConfig struct {
Concurrency int `json:"concurrency"`
Duration time.Duration `json:"duration"`
DeviceCount int `json:"device_count"`
ReportInterval time.Duration `json:"report_interval"`
}
// TestMetrics 测试指标
type TestMetrics struct {
TotalRequests int64
SuccessRequests int64
FailedRequests int64
AverageLatency time.Duration
MaxLatency time.Duration
MinLatency time.Duration
mutex sync.Mutex
}
// generateDeviceData 生成模拟设备数据
func generateDeviceData(deviceID string) db.DeviceData {
return db.DeviceData{
DeviceID: deviceID,
Timestamp: time.Now(),
TDS: rand.Float64() * 1000,
CPF: rand.Float64() * 100,
DF: rand.Float64() * 100,
CF: rand.Float64() * 100,
PH: rand.Float64()*6 + 1, // pH 1-7
Temperature: rand.Float64()*30 + 10, // 10-40度
WaterVolume: rand.Float64() * 1000,
FilterLife: rand.Intn(100),
Conductivity: rand.Float64() * 2000,
PowerStatus: rand.Intn(2) == 1,
}
}
// generateConsumptionRecord 生成模拟消费记录
func generateConsumptionRecord(deviceID string) db.ConsumptionRecord {
return db.ConsumptionRecord{
CardID: fmt.Sprintf("CARD_%d", rand.Intn(1000)),
DeviceID: deviceID,
Amount: rand.Float64() * 100,
Timestamp: time.Now(),
}
}
// simulateDevice 模拟单个设备的数据上传
func simulateDevice(deviceID string, metrics *TestMetrics, wg *sync.WaitGroup, kwdb *db.KWDB) {
defer wg.Done()
start := time.Now()
data := generateDeviceData(deviceID)
consumption := generateConsumptionRecord(deviceID)
// 模拟数据上传延迟
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
// 保存到KWDB
var saveErr, saveErr2 error
saveErr = kwdb.SaveDeviceData(&data)
saveErr2 = kwdb.SaveConsumptionRecord(&consumption)
metrics.mutex.Lock()
metrics.TotalRequests++
if saveErr == nil && saveErr2 == nil {
metrics.SuccessRequests++
} else {
metrics.FailedRequests++
if saveErr != nil {
log.Printf("设备数据保存失败: %v", saveErr)
}
if saveErr2 != nil {
log.Printf("消费记录保存失败: %v", saveErr2)
}
}
latency := time.Since(start)
if metrics.MaxLatency < latency {
metrics.MaxLatency = latency
}
if metrics.MinLatency == 0 || metrics.MinLatency > latency {
metrics.MinLatency = latency
}
metrics.AverageLatency = time.Duration(int64(metrics.AverageLatency+latency) / 2)
metrics.mutex.Unlock()
}
func main() {
// 设置随机数种子
rand.Seed(time.Now().UnixNano())
// 加载数据库配置
dbConfig, err := config.LoadConfig("config/database.yaml")
if err != nil {
log.Fatalf("加载数据库配置失败: %v", err)
}
// 初始化数据库连接
kwdb, err := db.NewKWDB(&dbConfig.Postgresql)
if err != nil {
log.Fatalf("连接数据库失败: %v", err)
}
// 注册优雅关闭
defer func() {
log.Println("正在关闭数据库连接...")
kwdb.Close()
log.Println("数据库连接已关闭")
}()
// 定期输出连接池状态
go func() {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
stats := kwdb.GetStats()
log.Printf("连接池状态 - 总连接数: %d, 空闲连接数: %d, 使用中连接数: %d\n",
stats.TotalConns(), stats.IdleConns(), stats.TotalConns()-stats.IdleConns())
}
}
}()
// 测试配置
config := TestConfig{
Concurrency: 100,
Duration: 1 * time.Minute,
DeviceCount: 1000,
ReportInterval: 5 * time.Second,
}
// 初始化测试指标
metrics := &TestMetrics{}
// 创建等待组
var wg sync.WaitGroup
// 开始时间
startTime := time.Now()
// 定期报告测试进度
go func() {
for {
time.Sleep(config.ReportInterval)
if time.Since(startTime) >= config.Duration {
break
}
log.Printf("进行中 - 总请求: %d, 成功: %d, 失败: %d, 平均延迟: %v\n",
metrics.TotalRequests,
metrics.SuccessRequests,
metrics.FailedRequests,
metrics.AverageLatency)
}
}()
// 主测试循环
for time.Since(startTime) < config.Duration {
// 控制并发数
if metrics.TotalRequests%int64(config.Concurrency) == 0 {
time.Sleep(100 * time.Millisecond)
}
wg.Add(1)
deviceID := fmt.Sprintf("DEVICE_%d", rand.Intn(config.DeviceCount))
go simulateDevice(deviceID, metrics, &wg, kwdb)
}
// 等待所有goroutine完成
wg.Wait()
// 输出最终测试报告
log.Printf("\n测试完成\n")
log.Printf("总请求数: %d\n", metrics.TotalRequests)
log.Printf("成功请求: %d\n", metrics.SuccessRequests)
log.Printf("失败请求: %d\n", metrics.FailedRequests)
log.Printf("平均延迟: %v\n", metrics.AverageLatency)
log.Printf("最大延迟: %v\n", metrics.MaxLatency)
log.Printf("最小延迟: %v\n", metrics.MinLatency)
log.Printf("QPS: %.2f\n", float64(metrics.TotalRequests)/config.Duration.Seconds())
}
复制代码
下面是 KWDB 数据库连接的代码:
package db
import (
"context"
"fmt"
"sync"
"time"
"github.com/jackc/pgx/v5/pgxpool"
"trae/config"
)
// KWDB 数据库连接管理器
type KWDB struct {
pool *pgxpool.Pool
config *config.DatabaseConfig
ctx context.Context
cancel context.CancelFunc
mutex sync.RWMutex
isClosing bool
}
// NewKWDB 创建新的KWDB实例
func NewKWDB(dbConfig *config.DatabaseConfig) (*KWDB, error) {
ctx, cancel := context.WithCancel(context.Background())
// 创建连接池配置
poolConfig, err := pgxpool.ParseConfig(dbConfig.GetDSN())
if err != nil {
cancel()
return nil, fmt.Errorf("解析数据库配置失败: %v", err)
}
// 应用连接池设置
poolConfig.MaxConns = int32(dbConfig.Pool.MaxConns)
poolConfig.MinConns = int32(dbConfig.Pool.MinConns)
poolConfig.MaxConnLifetime = dbConfig.Pool.MaxConnLifetime
poolConfig.MaxConnIdleTime = dbConfig.Pool.MaxConnIdleTime
poolConfig.HealthCheckPeriod = dbConfig.Pool.HealthCheckPeriod
// 创建连接池(带重试机制)
var pool *pgxpool.Pool
for retries := 3; retries > 0; retries-- {
pool, err = pgxpool.NewWithConfig(ctx, poolConfig)
if err == nil {
break
}
if retries > 1 {
time.Sleep(time.Second * 2)
}
}
if err != nil {
cancel()
return nil, fmt.Errorf("连接数据库失败: %v", err)
}
// 创建KWDB实例
db := &KWDB{
pool: pool,
config: dbConfig,
ctx: ctx,
cancel: cancel,
mutex: sync.RWMutex{},
isClosing: false,
}
// 启动健康检查
go db.healthCheck()
return db, nil
}
// healthCheck 定期检查数据库连接健康状态
func (db *KWDB) healthCheck() {
ticker := time.NewTicker(db.config.Pool.HealthCheckPeriod)
defer ticker.Stop()
for {
select {
case <-db.ctx.Done():
return
case <-ticker.C:
if err := db.pool.Ping(db.ctx); err != nil {
fmt.Printf("数据库健康检查失败: %v\n", err)
}
}
}
}
// Close 优雅关闭数据库连接
func (db *KWDB) Close() {
db.mutex.Lock()
db.isClosing = true
db.mutex.Unlock()
// 取消上下文
db.cancel()
// 等待所有连接完成
time.Sleep(time.Second * 5)
// 关闭连接池
db.pool.Close()
}
// GetStats 获取连接池统计信息
func (db *KWDB) GetStats() *pgxpool.Stat {
return db.pool.Stat()
}
// SaveDeviceData 保存设备数据
func (db *KWDB) SaveDeviceData(data *DeviceData) error {
db.mutex.RLock()
if db.isClosing {
db.mutex.RUnlock()
return fmt.Errorf("数据库连接正在关闭")
}
db.mutex.RUnlock()
// 重试机制
var err error
for retries := 3; retries > 0; retries-- {
// 开启事务
tx, err := db.pool.Begin(db.ctx)
if err != nil {
if retries > 1 {
time.Sleep(time.Second)
continue
}
return fmt.Errorf("开启事务失败: %v", err)
}
// 执行插入
_, err = tx.Exec(db.ctx,
"INSERT INTO device_data (device_id, timestamp, tds, cpf, df, cf, ph, temperature, water_volume, filter_life, conductivity, power_status) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
data.DeviceID, data.Timestamp, data.TDS, data.CPF, data.DF, data.CF, data.PH, data.Temperature, data.WaterVolume, data.FilterLife, data.Conductivity, data.PowerStatus)
if err != nil {
tx.Rollback(db.ctx)
if retries > 1 {
time.Sleep(time.Second)
continue
}
return fmt.Errorf("保存设备数据失败: %v", err)
}
// 提交事务
if err = tx.Commit(db.ctx); err != nil {
if retries > 1 {
time.Sleep(time.Second)
continue
}
return fmt.Errorf("提交事务失败: %v", err)
}
break
}
return err
}
// SaveDeviceDataBatch 批量保存设备数据
func (db *KWDB) SaveDeviceDataBatch(dataList []*DeviceData) error {
if len(dataList) == 0 {
return nil
}
db.mutex.Lock()
defer db.mutex.Unlock()
// 开启事务
tx, err := db.pool.Begin(db.ctx)
if err != nil {
return err
}
defer tx.Rollback(db.ctx)
// 批量插入数据
for _, data := range dataList {
_, err := tx.Exec(db.ctx, "INSERT INTO device_data (device_id, timestamp, tds, cpf, df, cf, ph, temperature, water_volume, filter_life, conductivity, power_status) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
data.DeviceID, data.Timestamp, data.TDS, data.CPF, data.DF, data.CF, data.PH, data.Temperature, data.WaterVolume, data.FilterLife, data.Conductivity, data.PowerStatus)
if err != nil {
return fmt.Errorf("批量插入设备数据失败: %v", err)
}
}
return tx.Commit(db.ctx)
}
// SaveConsumptionRecord 保存消费记录
func (db *KWDB) SaveConsumptionRecord(record *ConsumptionRecord) error {
db.mutex.Lock()
defer db.mutex.Unlock()
// 插入消费记录到PostgreSQL
_, err := db.pool.Exec(db.ctx,
"INSERT INTO consumption_record (card_id, device_id, amount, timestamp) VALUES ($1, $2, $3, $4)",
record.CardID, record.DeviceID, record.Amount, record.Timestamp)
return err
}
// SaveConsumptionRecordBatch 批量保存消费记录
func (db *KWDB) SaveConsumptionRecordBatch(records []*ConsumptionRecord) error {
if len(records) == 0 {
return nil
}
db.mutex.Lock()
defer db.mutex.Unlock()
// 开启事务
tx, err := db.pool.Begin(db.ctx)
if err != nil {
return err
}
defer tx.Rollback(db.ctx)
// 批量插入记录
for _, record := range records {
_, err := tx.Exec(db.ctx, "INSERT INTO consumption_record (card_id, device_id, amount, timestamp) VALUES ($1, $2, $3, $4)",
record.CardID, record.DeviceID, record.Amount, record.Timestamp)
if err != nil {
return fmt.Errorf("批量插入消费记录失败: %v", err)
}
}
return tx.Commit(db.ctx)
}
// Ping 测试连接是否正常
func (db *KWDB) Ping() error {
return db.pool.Ping(db.ctx)
}
// ReConnect 重新连接
func (db *KWDB) ReConnect() error {
db.mutex.Lock()
defer db.mutex.Unlock()
// 关闭旧连接池
db.pool.Close()
// 创建新的连接池
pool, err := pgxpool.New(db.ctx, db.pool.Config().ConnString())
if err != nil {
return fmt.Errorf("重新连接失败: %v", err)
}
db.pool = pool
return nil
}
复制代码
5.5 压测结果指标数据分析:
上面我们一共进行了 4 次的压力测试,可以使用 Grafana 支持查看 KWDB 集群及各个节点的监控指标,包括指标概览、硬件指标、运行指标、SQL 指标、存储指标、副本指标、分布式指标、队列指标和慢查询指标。
以下不是所有指标都来查看,我们只调一些比较重要的指标来进一步分析与总结。
5.5.1 概览页面指标 - SQL Queries 指标:
在以下节点视图中,SQL Queries 指标表示该时间序列图展示指定节点处理客户端请求的 QPS(Queries Per Second,每秒查询数)。并且支持查询的类型包括查询、更新、插入、删除。
采样值为 10 秒内的平均值。在如下集群视图中,该时间序列图展示当前集群查询负载的估计值,该估计值为每个节点最近 10 秒的活动情况的汇总值。可以在看到在以下 4 次压测中,在 Inserts 的操作最高值分别为:
①. 第一次压测时,在时间段为 2025-05-08 02:19:45 时,Inserts 操作达到了 4.15K 每秒。
② 第二次压测时,在时间段为 2025-05-08 02:22:15 时,Inserts 操作达到了 2.87K 每秒。
③ 第三次压测时,在时间段为 2025-05-08 02:42:30 时,Inserts 操作达到了 3.29K 每秒。
④ 第四次压测时,在时间段为 2025-05-08 02:52:45 时,Inserts 操作达到了 3.74K 每秒。
5.5.2 概览页面指标 - Service Latency: SQL 99th percentile:
在数据库管理中,服务延迟(Service Latency)指的是数据库操作(如查询、事务等)的执行时间,即集群从接收到查询请求到查询结束之间的时间,不包含将查询结果传输给客户端的时间。
特别是在高并发环境下,了解 99th 百分位延迟(也称为 99 百分位延迟或最长延迟)对于优化数据库性能至关重要。99th 百分位延迟指的是在最坏情况下,有 1%的请求超过了此延迟时间。
该时间序列图展示指定节点或者集群内所有节点的服务延迟的 99th 百分位数,即在观察时间内,百分之九十九(99%)的节点的服务延迟低于或等于这个值,这里可以看到最高的是 369ms。
5.5.3 硬件页面指标 – Capacity & Memory Usage:
Capacity 表示磁盘容量,这个指标可以显示磁盘的使用情况,包括总容量、已用容量和剩余容量等,帮助管理磁盘空间。用户可以通过监控存储容量图来判断什么时候需要为集群添加新的存储空间,比如如下,可以实时去查看时序数据库和关系型数据库磁盘空间占比,可以有针对性的进行调整。
Memory Usage 表示内存容量,随着 KWDB 加载的数据源数据量增加、缓存的图表数据增多以及运行的插件功能增多,内存使用量会相应变化。这个指标用于及时发现内存使用是否超出合理范围,避免因内存耗尽导致 KWDB 服务中断,如果内存使用率持续上升且无合理回落,可能暗示存在内存泄漏等问题。
如上所示,可以看到我们实验的机器设备是 8G,但是在压测的过程中,最高才使用到内存总量为 2.02G,只占用了 25%左右。
5.5.4 运行时页面指标 - GC Pause Time:
GC Pause Time 指标指的是垃圾回收(Garbage Collection,简称 GC)暂停时间,即在进行垃圾回收时,程序的工作线程被暂停的时间。GC Pause Time 指标反映了垃圾回收过程中应用程序暂停的时间长度。这个指标对于评估垃圾回收对应用程序性能的影响非常重要。较长的暂停时间会降低应用程序的响应速度和吞吐量,尤其是在交互式应用中,长时间的暂停会导致用户体验下降。因此,优化垃圾回收策略,减少暂停时间,是提高应用程序性能的关键。
在下面节点视图中,该时间序列图展示指定节点的 GC 阻塞时间。在集群视图中,该时间序列图展示集群中所有节点的 GC 阻塞时间总和,可以看到整个生命周期维持在 2ms 左右,最高是 8.58ms。
GC Runs 表示该时间序列图展示指定节点或者集群内所有节点的 GC 运行次数,可以看到平均稳定在 0.5 次左右,最高 GC 运行次数在 1.27。
六、Java 优化批量数据写入:
由于上面插入的效果没有实际体现出 KWDB 时序数据库的优势,经过官方的沟通,同时也说出了我代码的几个缺陷问题,这里我们以官方的示例来演示一下。
KaiwuDB JDBC 是 KWDB 的官方 Java 语言连接器,基于 PgJDBC 扩展实现,符合 JDBC 4.0、JDBC 4.1 和 JDBC 4.2 规范。Java 开发人员可以使用 KaiwuDB JDBC 驱动程序连接 KWDB 的服务进程,进行数据增删改查操作。使用批量接口写入数据时,如果待写入的值与列的数据类型不符或者待写入的字段不存在,KaiwuDB JDBC 会返回成功写入条数、写入失败条数,并将具体错误信息记录到日志中。
KaiwuDB JDBC 提供了传统的批量执行 SQL 接口,用户可以通过手动拼接 SQL 实现批量数据写入,同时提供了 addBatchInsert、executeBatchInsert 和 clearBatchInsert 接口,能够将同一张时序表的多次数据写入合并到一条 SQL 语句,降低 CPU 占用,提升写入性能。
注意:目前,批量写入功能只适用于 KWDB 单机版本。
6.1 版本说明:
①. 安装 openJDK(1.8 及以上版本)。
②. 安装 Maven(3.6 及以上版本)。
③. 安装 KWDB 2.2.0 数据库、配置数据库认证方式、创建数据库。
④. 获取 KWDB JDBC 驱动包。
6.2 配置数据库:
测试使用的数据库是阿里云的 ECS,配置是 16 核与 32G 的机器配置:
用时序写入短接功能,该功能默认关闭。启用后可以直接将数据写入存储,减少中间处理环节,提高性能。
# 为当前会话启用时序写入短接功能。
SET SESSION tsinsert_direct=true;
#为 KaiwuDB 集群启用时序写入短接功能:
SET CLUSTER SETTING server.tsinsert_direct.enabled = TRUE;
#允许写入时跳过错误数据,正常写入其他数据:
SET SESSION ts_ignore_batcherror=true;
复制代码
6.3 配置连接:
在 pom.xml 中添加依赖,将 KaiwuDB JDBC 引入 Java 项目:
<dependency>
<groupId>com.kaiwudb</groupId>
<artifactId>kaiwudb-jdbc</artifactId>
<version>2.2.0</version>
</dependency>
复制代码
创建一下时序表:
CREATE TABLE tsdb.tbl_raw_1 (ts TIMESTAMPTZ NOT NULL, data FLOAT8 NULL, type CHAR(10) NULL, parse VARCHAR NULL) TAGS (device CHAR(20) NOT NULL, iot_hub_name VARCHAR(64) NOT NULL) PRIMARY TAGS (device, iot_hub_name);
复制代码
6.4 代码演示:
package com.winterchen;
import com.kaiwudb.jdbc.KwStatement;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.LinkedHashMap;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
@SpringBootApplication
public class SpringbootMybatisDemo2Application {
public static void main(String[] args) {
String url = "jdbc:kaiwudb://47.111.23.173:26257/tsdb?preferQueryMode=simple";
String user = "root";
String password = "123456";
try (Connection connection = DriverManager.getConnection(url, user, password)) {
KwStatement statement = (KwStatement) connection.createStatement();
long timestamp = 1731373200000L; // 2024-11-12 09:00:00.000 初始时间戳
for (int i = 0; i < 1000; i++) {
// 循环1000次,每次写入1000行数据,共计100万行数据;每次循环插入20个设备,每个设备50行的数据
for (int row = 1; row <= 999; row++) {
int index = (row - 1) % 10 + 1;
long finalTime = timestamp + (row * 1000L) + (i * 50 * 1000L);
for (int num = 1; num <= 999; num++) {
String device = "device_" + num;
String iot = "iot_" + num;
statement.addBatchInsert(finalTime, ("tbl_raw_1"),
new LinkedHashMap<String, Object>() {{
put("ts", finalTime);
put("data", ThreadLocalRandom.current().nextDouble());
put("type", "t_001");
put("parse", UUID.randomUUID() + "'123");
}},
new LinkedHashMap<String, Object>() {{
put("device", device);
put("iot_hub_name", iot);
}});
}
}
// execute batch insert sql data
long inserTime = System.currentTimeMillis();
int[] updateCounts = statement.executeBatchInsert();
int totalCount = 0;
for (int count: updateCounts) {
totalCount += count;
}
long insertEndTime = System.currentTimeMillis() - inserTime;
System.out.println("插入数据时间:" + insertEndTime + "插入了数据:" + totalCount + "条");
// clear batch insert temp data
statement.clearBatchInsert();
}
// close statement
statement.close();
} catch (SQLException ex) {
ex.printStackTrace();
}
}
}
复制代码
通过对上面的数据进行总结分析如下,可以看到每秒插入的数据可以平均在 1.7w 左右,其实还可以加大,但是由于我机器的原因,只能到最大一次性插入 10w 条数据,有更好机器的朋友可以尝试一下。
这里也可能与网速有关系,公司的网可能会限速,我使用自己的 5G 就可以达到上面的测试标准数据。
七、跨模查询:
跨模查询是一种用于检索相关联数据的查询方法,通常用于在不同类型的数据库之间进行查询,例如在关系数据库和时序数据库之间检索相关的数据,KWDB 跨模查询支持对关系表和时序表进行关联查询、嵌套查询、联合查询。
KWDB 跨模查询支持以下关联查询:
内连接(INNER JOIN)
左连接(LEFT JOIN)
右连接(RIGHT JOIN)
全连接(FULL JOIN)
KWDB 跨模查询支持以下嵌套查询:
相关子查询(Correlated Subquery):内部查询依赖于外部查询的结果,每次外部查询的都触发内部查询的执行。
非相关子查询(Non-Correlated Subquery):内部查询独立于外部查询,只执行一次内部查询并返回固定的结果。
相关投影子查询(Correlated Scalar Subquery): 内部查询依赖于外部查询的结果,并且只返回一个单一的值作为外部查询的结果。
非相关投影子查询(Non-Correlated Scalar Subquery):内部查询独立于外部查询,并且只返回一个单一的值作为外部查询的结果。
FROM 子查询:将一个完整的 SQL 查询嵌套在另一个查询的 FROM 子句中,作为临时表格使用。
KWDB 跨模查询支持以下联合查询:
UNION:合并多个查询结果集,并去除重复行。
UNION ALL:合并多个查询结果集,但不去除重复行。
INTERSECT:返回两个查询结果集中都存在的所有行,去除重复行。
INTERSECT ALL:返回两个查询结果集中都存在的所有行,但不去除重复行。
EXCEPT:返回第一个查询结果集中不包含在第二个结果集中的行,去除重复行。
EXCEPT ALL:返回第一个查询结果集中不包含在第二个结果集中的行,不去除重复行。
warning 说明
以下为 SQL 查询相关语句:
-- 设备数据表
CREATE TABLE device_data (
id SERIAL PRIMARY KEY,
device_id VARCHAR(50),
timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
voltage FLOAT,
current FLOAT,
power FLOAT,
energy FLOAT,
power_factor FLOAT,
frequency FLOAT,
temperature FLOAT,
status VARCHAR(20),
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
);
-- 消费记录表
CREATE TABLE consumption_record (
id SERIAL PRIMARY KEY,
device_id VARCHAR(50),
timestamp TIMESTAMPTZ,
energy_usage FLOAT,
amount DECIMAL(10,2),
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
);
-- 查询设备运行状态统计
SELECT
status,
COUNT(*) as device_count,
AVG(voltage) as avg_voltage,
AVG(current) as avg_current,
AVG(power) as avg_power,
AVG(temperature) as avg_temperature
FROM device_data
WHERE timestamp >= NOW() - INTERVAL '1 hour'
GROUP BY status;
-- 查询高能耗设备
SELECT
device_id,
SUM(energy_usage) as total_energy,
SUM(amount) as total_amount,
COUNT(*) as record_count
FROM consumption_record
WHERE timestamp >= NOW() - INTERVAL '30 days'
GROUP BY device_id
HAVING SUM(energy_usage) > 1000
ORDER BY total_energy DESC;
-- 查询设备运行参数异常记录
SELECT
device_id,
timestamp,
voltage,
current,
power,
temperature,
status,
CASE
WHEN voltage NOT BETWEEN 210 AND 230 THEN '电压异常'
WHEN current > 100 THEN '电流过高'
WHEN power_factor < 0.85 THEN '功率因数低'
WHEN temperature > 75 THEN '温度过高'
ELSE '其他异常'
END as alarm_type
FROM device_data
WHERE timestamp >= NOW() - INTERVAL '24 hours'
AND (
voltage NOT BETWEEN 210 AND 230
OR current > 100
OR power_factor < 0.85
OR temperature > 75
)
ORDER BY timestamp DESC;
-- 查询设备能耗趋势
SELECT
DATE_TRUNC('hour', timestamp) as hour,
COUNT(DISTINCT device_id) as device_count,
SUM(energy_usage) as total_energy,
SUM(amount) as total_amount,
AVG(energy_usage) as avg_energy_per_device
FROM consumption_record
WHERE timestamp >= NOW() - INTERVAL '24 hours'
GROUP BY DATE_TRUNC('hour', timestamp)
ORDER BY hour;
-- 查询设备维护建议
SELECT
dd.device_id,
MAX(dd.timestamp) as last_update,
COUNT(*) as abnormal_count,
STRING_AGG(DISTINCT
CASE
WHEN dd.voltage NOT BETWEEN 210 AND 230 THEN '电压异常'
WHEN dd.current > 100 THEN '电流过高'
WHEN dd.power_factor < 0.85 THEN '功率因数低'
WHEN dd.temperature > 75 THEN '温度过高'
END,
', '
) as abnormal_types
FROM device_data dd
WHERE dd.timestamp >= NOW() - INTERVAL '7 days'
AND (
dd.voltage NOT BETWEEN 210 AND 230
OR dd.current > 100
OR dd.power_factor < 0.85
OR dd.temperature > 75
)
GROUP BY dd.device_id
HAVING COUNT(*) > 5
ORDER BY abnormal_count DESC;
-- 查询设备能效分析
WITH hourly_stats AS (
SELECT
device_id,
DATE_TRUNC('hour', timestamp) as hour,
SUM(energy_usage) as hourly_energy,
SUM(amount) as hourly_amount
FROM consumption_record
WHERE timestamp >= NOW() - INTERVAL '24 hours'
GROUP BY device_id, DATE_TRUNC('hour', timestamp)
)
SELECT
device_id,
AVG(hourly_energy) as avg_hourly_energy,
MAX(hourly_energy) as max_hourly_energy,
MIN(hourly_energy) as min_hourly_energy,
SUM(hourly_energy) as total_energy,
SUM(hourly_amount) as total_amount,
COUNT(*) as active_hours
FROM hourly_stats
GROUP BY device_id
HAVING COUNT(*) >= 12
ORDER BY avg_hourly_energy DESC;
复制代码
八、总结:
本文深入探讨了 KWDB 分布式多模数据库在净水机物联网 IoT 项目中的应用实践与压力测试。通过结合 Go 语言与香橙派 Orange Pi AI Pro 开发板,成功构建了模拟大量设备并发上传数据和请求操作的场景,验证了 KWDB 在处理时间序列数据方面的卓越性能。
在项目实施过程中,首先通过 pgx 驱动连接 KWDB 数据库,实现了数据的 CURD 操作。接着,针对净水机业务特点,设计了包含设备数据和消费记录的时序表,有效支持了设备状态监测和水卡消费记录同步等核心功能。
压力测试环节,通过模拟不同并发数、测试持续时间及模拟设备数量,全面评估了 KWDB 在高负载下的表现。测试结果显示,KWDB 在插入操作方面表现出色,每秒插入数据达到最高 2.2w(受限机器配置的原因,官方测试甚至可达几十万甚至上百万的级别),同时服务延迟保持在合理范围内,确保了系统的实时性和稳定性。
此外,通过 Grafana 监控平台,对 KWDB 集群及节点的各项监控指标进行了深入分析,包括 SQL 查询量、服务延迟、磁盘容量、内存使用及垃圾回收暂停时间等。这些指标数据为系统的优化和调整提供了有力支持,确保了 KWDB 在净水机物联网 IoT 项目中的高效稳定运行。
综上所述,KWDB 分布式多模数据库凭借其强大的时间序列数据处理能力和灵活的部署方式,在物联网 IoT 领域展现出广阔的应用前景。通过本文的实践与压测总结,为 KWDB 在类似项目中的推广和应用提供了宝贵的参考经验。
评论