写点什么

手把手教你使用 Timestream 实现物联网时序数据存储和分析!

  • 2022 年 1 月 10 日
  • 本文字数:10691 字

    阅读完需:约 35 分钟

手把手教你使用 Timestream 实现物联网时序数据存储和分析!


Amazon Timestream 是一种快速、可扩展的无服务器时间序列数据库服务,适用于物联网和运营应用程序,使用该服务每天可以轻松存储和分析数万亿个事件,速度提高了 1000 倍,而成本仅为关系数据库的十分之一。通过将近期数据保留在内存中,并根据用户定义的策略将历史数据移至成本优化的存储层,Amazon Timestream 为客户节省了管理时间序列数据生命周期的时间和成本。Amazon Timestream 专门构建的查询引擎可用于访问和分析近期数据和历史数据,而无需在查询中显示指定数据是保存在内存中还是成本优化层中。Amazon Timestream 内置了时间序列分析函数,可以实现近乎实时地识别数据的趋势和模式。Amazon Timestream 是无服务器服务,可自动缩放以调整容量和性能,因此无需管理底层基础设施,可以专注于构建应用程序。


本文介绍通过 Timestream、Kinesis Stream 托管服务和 Grafana 和 Flink Connector 开源软件实现物联网(以 PM 2.5 场景为示例)时序数据实时采集、存储和分析,其中包含部署架构、环境部署、数据采集、数据存储和分析,希望当您有类似物联网时序数据存储和分析需求的时候,能从中获得启发,助力业务发展。


架构


Amazon Timestream 能够使用内置的分析函数(如平滑、近似和插值)快速分析物联网应用程序生成的时间序列数据。例如,智能家居设备制造商可以使用 Amazon Timestream 从设备传感器收集运动或温度数据,进行插值以识别没有运动的时间范围,并提醒消费者采取措施(例如减少热量)以节约能源。


本文物联网(以 PM 2.5 场景为示例),实现 PM2.5 数据实时采集、时序数据存储和实时分析, 其中架构主要分成三大部分:

  • 实时时序数据采集:通过 Python 数据采集程序结合 Kinesis Stream 和 Kinesis Data Analytics for Apache Flink connector 模拟实现从 PM 2.5 监控设备, 将数据实时采集数据到 Timestream。

  • 时序数据存储:通过 Amazon Timestream 时序数据库实现时序数据存储,设定内存和磁性存储(成本优化层)存储时长,可以实现近期数据保留在内存中,并根据用户定义的策略将历史数据移至成本优化的存储层。

  • 实时时序数据分析:通过 Grafana (安装 Timesteam For Grafana 插件)实时访问 Timestream 数据,通过 Grafana 丰富的分析图表形式,结合 Amazon Timestream 内置的时间序列分析函数,可以实现近乎实时地识别物联网数据的趋势和模式。

具体的架构图如下:


部署环境


1.1 创建 Cloudformation

请使用自己帐号 (region 请选择 us-east-1)

下载 Cloudformation Yaml 文件:

https://bigdata-bingbing.s3-ap-northeast-1.amazonaws.com/timestream-short-new.yaml



其它都选择缺省, 点击 Create Stack button.


Cloud Formation 创建成功


1.2 连接到新建的 Ec2 堡垒机:

修改证书文件权限

chmod 0600 [path to downloaded .pem file]

ssh -i [path to downloaded .pem file] ec2-user@[bastionEndpoint]

执行 aws configure:

aws configure

default region name, 输入:“us-east-1”,其它选择缺省设置。


1.3 连接到 EC2 堡垒机 安装相应软件

设置时区

TZ='Asia/Shanghai'; export TZ
复制代码


Install python3

sudo yum install -y python3
复制代码


Install python3 pip

sudo yum install -y python3-pip
复制代码


pip3 install boto3

sudo pip3 install boto3
复制代码


pip3 install numpy

sudo pip3 install numpy
复制代码


install git

sudo yum install -y git
复制代码


1.4 下载 Github Timesteram Sample 程序库

git clone https://github.com/awslabs/amazon-timestream-tools amazon-timestream-tools
复制代码


1.5 安装 Grafana Server

连接到 EC2 堡垒机:

sudo vi /etc/yum.repos.d/grafana.repo
复制代码


For OSS releases:(拷贝以下内容到 grafana.repo)

[grafana]name=grafanabaseurl=https://packages.grafana.com/oss/rpmrepo_gpgcheck=1enabled=1gpgcheck=1gpgkey=https://packages.grafana.com/gpg.keysslverify=1sslcacert=/etc/pki/tls/certs/ca-bundle.crt
复制代码


安装 grafana server:

sudo yum install -y grafana
复制代码


启动 grafana server:

sudo service grafana-server startsudo service grafana-server status
复制代码


配置 grafana server 在操作系统启动时 自动启动:

sudo /sbin/chkconfig --add grafana-server
复制代码


1.6 安装 timestream Plugin

sudo grafana-cli plugins install grafana-timestream-datasource
复制代码


重启 grafana

sudo service grafana-server restart
复制代码


1.7 配置 Grafana 要访问 Timesteam 服务所用的 IAM Role

获取 IAM Role Name

选择 IAM 服务, 选择要修改的 role, role name:

timestream-iot-grafanaEC2rolelabview-us-east-1

修改 role trust relationship:


将 Policy document 全部选中, 替换成以下内容:

{  "Version": "2012-10-17",  "Statement": [    {      "Sid":"",      "Effect": "Allow",      "Principal": {        "Service": "ec2.amazonaws.com"      },      "Action": "sts:AssumeRole"    },    {      "Sid":"",      "Effect": "Allow",      "Principal": {        "AWS": "[请替换成CloudFormation output中的role arn]"      },      "Action": "sts:AssumeRole"    }   ]}
复制代码


修改后的 trust relationship:


1.8 登录到 Grafana server

第一次登录到 Grafana Server:

1. 打开浏览器,访问 http://[Grafana server public ip]:3000

2. 缺省的 Grafana Server 监听端口是:3000 


如何获取 Ec2 Public IP 地址, 如下图所示, 访问 Cloudformation output:

3. 在登陆界面, 输入 username: admin; password:admin.(输入用户名和密码都是 admin)

4. 点击 Log In.登陆成功后, 会收到提示修改密码


1.9 Grafana server 中增加 Timestream 数据源

增加 Timestream 数据源


1.10 Grafana server 中配置 Timestream 数据源

拷贝配置所需要 role ARN 信息 (从 cloudformation output tab)Default Region: us-east-1



IoT 数据存储


2.1 创建 Timestream 数据库 iot



2.2 创建 Timestream 表 pm25


IoT 数据导入


3.1 安装 Flink connector to Timestream

安装 java8

sudo yum install -y java-1.8.0-openjdk*
复制代码


java -version
复制代码


安装 debug info, otherwise jmap will throw exception

sudo yum  --enablerepo='*-debug*' install -y java-1.8.0-openjdk-debuginfo
复制代码


Install maven

sudo wget https://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-maven.repo -O /etc/yum.repos.d/epel-apache-maven.repo sudo sed -i s/\$releasever/6/g /etc/yum.repos.d/epel-apache-maven.repo sudo yum install -y apache-maven mvn --version 
复制代码


change java version from 1.7 to 1.8

sudo update-alternatives --config java
复制代码


sudo update-alternatives --config javac
复制代码


安装 Apache Flink

最新的 Apache Flink 版本支持 Kinesis Data Analytics 是 1.8.2.

1. Create flink folder

cd
复制代码


mkdir flink
复制代码


cd flink
复制代码


2. 下载 Apache Flink version 1.8.2 源代码:

wget https://archive.apache.org/dist/flink/flink-1.8.2/flink-1.8.2-src.tgz
复制代码


3. 解压 Apache Flink 源代码:

tar -xvf flink-1.8.2-src.tgz
复制代码


4. 进入到 Apache Flink 源代码目录:

cd flink-1.8.2
复制代码


5. Compile and install Apache Flink (这个编译时间比较长 需要大致 20 分钟):

mvn clean install -Pinclude-kinesis -DskipTests
复制代码


3.2 创建 Kinesis Data Stream Timestreampm25Stream

aws kinesis create-stream --stream-name Timestreampm25Stream --shard-count 1
复制代码


3.3 运行 Flink Connector 建立 Kinesis 连接到 Timestream:

cdcd amazon-timestream-tools/integrations/flink_connectormvn clean compile
复制代码


数据采集过程中 请持续运行以下命令:

mvn exec:java -Dexec.mainClass="com.amazonaws.services.kinesisanalytics.StreamingJob" -Dexec.args="--InputStreamName Timestreampm25Stream --Region us-east-1 --TimestreamDbName iot --TimestreamTableName pm25"
复制代码


3.4 准备 PM2.5 演示数据:

连接到 EC2 堡垒机

1. 下载 5 演示数据生成程序:

cdmkdir pm25cd pm25wget https://bigdata-bingbing.s3-ap-northeast-1.amazonaws.com/pm25_new_kinisis_test.py .
复制代码


2. 运行 5 演示数据生成程序 (python 程序 2 个参数 –region default: us-east-1; –stream default: Timestreampm25Stream)

数据采集过程中 请持续运行以下命令

python3 pm25_new_kinisis_test.py
复制代码


IoT 数据分析


4.1 登陆到 Grafana Server 创建仪表板和 Panel

创建 Dashboard 查询时 请设定时区为本地浏览器时区:

创建新的 Panel:

选择要访问的数据源,将要查询分析所执行的 SQL 语句粘贴到新的 Panel 中:


4.2 创建时间数据分析仪表版 Dashboard PM2.5 Analysis 1(Save as PM2.5 Analysis 1)


4.2.1 查询北京各个监控站点 PM2.5 平均值

New Panel

SELECT CASE WHEN location = 'fengtai_xiaotun' THEN avg_pm25 ELSE NULL END AS fengtai_xiaotou,CASE WHEN location = 'fengtai_yungang' THEN avg_pm25 ELSE NULL END AS fengtai_yungang,CASE WHEN location = 'daxing' THEN avg_pm25 ELSE NULL END AS daxing,CASE WHEN location = 'wanshou' THEN avg_pm25 ELSE NULL END AS wanshou,CASE WHEN location = 'gucheng' THEN avg_pm25 ELSE NULL END AS gucheng,CASE WHEN location = 'tiantan' THEN avg_pm25 ELSE NULL END AS tiantan,CASE WHEN location = 'yanshan' THEN avg_pm25 ELSE NULL END AS yanshan,CASE WHEN location = 'miyun' THEN avg_pm25 ELSE NULL END AS miyun,CASE WHEN location = 'changping' THEN avg_pm25 ELSE NULL END AS changping,CASE WHEN location = 'aoti' THEN avg_pm25 ELSE NULL END AS aoti,CASE WHEN location = 'mengtougou' THEN avg_pm25 ELSE NULL END AS mentougou,CASE WHEN location = 'huairou' THEN avg_pm25 ELSE NULL END AS huairou,CASE WHEN location = 'haidian' THEN avg_pm25 ELSE NULL END AS haidian,CASE WHEN location = 'nongzhan' THEN avg_pm25 ELSE NULL END AS nongzhan,CASE WHEN location = 'tongzhou' THEN avg_pm25 ELSE NULL END AS tongzhou,CASE WHEN location = 'dingling' THEN avg_pm25 ELSE NULL END AS dingling,CASE WHEN location = 'yanqing' THEN avg_pm25 ELSE NULL END AS yanqing,CASE WHEN location = 'guanyuan' THEN avg_pm25 ELSE NULL END AS guanyuan,CASE WHEN location = 'dongsi' THEN avg_pm25 ELSE NULL END AS dongsi,CASE WHEN location = 'shunyi' THEN avg_pm25 ELSE NULL END AS shunyiFROM (SELECT location, round(avg(measure_value::bigint),0) as avg_pm25FROM "iot"."pm25" where measure_name='pm2.5' and city='Beijing'and time >= ago(30s)group by location,bin(time,30s)order by avg_pm25 desc)
复制代码


选择图形显示 select Gauge


Save Panel as Beijing PM2.5 analysis

Edit Panel Title:Beijing PM2.5 analysis


Save Dashboard PM2.5 analysis 1:


4.2.2 查询上海一天内各个监控站点 PM2.5 平均值

New Panel

SELECT CASE WHEN location = 'songjiang' THEN avg_pm25 ELSE NULL END AS songjiang,CASE WHEN location = 'fengxian' THEN avg_pm25 ELSE NULL END AS fengxian, CASE WHEN location = 'no 15 factory' THEN avg_pm25 ELSE NULL END AS No15_factory, CASE WHEN location = 'xujing' THEN avg_pm25 ELSE NULL END AS xujing, CASE WHEN location = 'pujiang' THEN avg_pm25 ELSE NULL END AS pujiang,  CASE WHEN location = 'putuo' THEN avg_pm25 ELSE NULL END AS putuo,  CASE WHEN location = 'shangshida' THEN avg_pm25 ELSE NULL END AS shangshida,CASE WHEN location = 'jingan' THEN avg_pm25 ELSE NULL END AS jingan, CASE WHEN location = 'xianxia' THEN avg_pm25 ELSE NULL END AS xianxia, CASE WHEN location = 'hongkou' THEN avg_pm25 ELSE NULL END AS hongkou, CASE WHEN location = 'jiading' THEN avg_pm25 ELSE NULL END AS jiading, CASE WHEN location = 'zhangjiang' THEN avg_pm25 ELSE NULL END AS zhangjiang, CASE WHEN location = 'miaohang' THEN avg_pm25 ELSE NULL END AS miaohang, CASE WHEN location = 'yangpu' THEN avg_pm25 ELSE NULL END AS yangpu, CASE WHEN location = 'huinan' THEN avg_pm25 ELSE NULL END AS huinan, CASE WHEN location = 'chongming' THEN avg_pm25 ELSE NULL END AS chongmingFrom(SELECT location, round(avg(measure_value::bigint),0) as avg_pm25FROM "iot"."pm25" where measure_name='pm2.5' and city='Shanghai'and time >= ago(30s)group by location,bin(time,30s)order by avg_pm25 desc)
复制代码


Save Panel as Shanghai PM2.5 analysis

Edit Panel Title:Shanghai PM2.5 analysis

Save Dashboard PM2.5 analysis 1


4.2.3 查询广州各个监控站点 PM2.5 平均值

New Panel

SELECT CASE WHEN location = 'panyu' THEN avg_pm25 ELSE NULL END AS panyu,CASE WHEN location = 'commercial school' THEN avg_pm25 ELSE NULL END AS commercial_school, CASE WHEN location = 'No 5 middle school' THEN avg_pm25 ELSE NULL END AS No_5_middle_school,CASE WHEN location = 'guangzhou monitor station' THEN avg_pm25 ELSE NULL END AS Guangzhou_monitor_station, CASE WHEN location = 'nansha street' THEN avg_pm25 ELSE NULL END AS Nansha_street, CASE WHEN location = 'No 86 middle school' THEN avg_pm25 ELSE NULL END AS No_86_middle_school, CASE WHEN location = 'luhu' THEN avg_pm25 ELSE NULL END AS luhu, CASE WHEN location = 'nansha' THEN avg_pm25 ELSE NULL END AS nansha, CASE WHEN location = 'tiyu west' THEN avg_pm25 ELSE NULL END AS tiyu_west, CASE WHEN location = 'jiulong town' THEN avg_pm25 ELSE NULL END AS jiulong_town, CASE WHEN location = 'huangpu' THEN avg_pm25 ELSE NULL END AS Huangpu, CASE WHEN location = 'baiyun' THEN avg_pm25 ELSE NULL END AS Baiyun, CASE WHEN location = 'maofeng mountain' THEN avg_pm25 ELSE NULL END AS Maofeng_mountain, CASE WHEN location = 'chong hua' THEN avg_pm25 ELSE NULL END AS Chonghua, CASE WHEN location = 'huadu' THEN avg_pm25 ELSE NULL END AS huadufrom(    SELECT location, round(avg(measure_value::bigint),0) as avg_pm25FROM "iot"."pm25" where measure_name='pm2.5' and city='Guangzhou'and time >= ago(30s)group by location,bin(time,30s)order by avg_pm25 desc)
复制代码


Save Panel as Guangzhou PM2.5 analysis

Edit Panel Title:Guangzhou PM2.5 analysis

Save Dashboard PM2.5 analysis 1


4.2.4 查询深圳各个监控站点 PM2.5 平均值

New Panel

SELECT CASE WHEN location = 'huaqiao city' THEN avg_pm25 ELSE NULL END AS Huaqiao_city, CASE WHEN location = 'xixiang' THEN avg_pm25 ELSE NULL END AS xixiang,CASE WHEN location = 'guanlan' THEN avg_pm25 ELSE NULL END AS guanlan,CASE WHEN location = 'longgang' THEN avg_pm25 ELSE NULL END AS Longgang,CASE WHEN location = 'honghu' THEN avg_pm25 ELSE NULL END AS Honghu,CASE WHEN location = 'pingshan' THEN avg_pm25 ELSE NULL END AS Pingshan,CASE WHEN location = 'henggang' THEN avg_pm25 ELSE NULL END AS Henggang,CASE WHEN location = 'minzhi' THEN avg_pm25 ELSE NULL END AS Minzhi,CASE WHEN location = 'lianhua' THEN avg_pm25 ELSE NULL END AS Lianhua,CASE WHEN location = 'yantian' THEN avg_pm25 ELSE NULL END AS Yantian,CASE WHEN location = 'nanou' THEN avg_pm25 ELSE NULL END AS Nanou,CASE WHEN location = 'meisha' THEN avg_pm25 ELSE NULL END AS MeishaFrom(SELECT location, round(avg(measure_value::bigint),0) as avg_pm25FROM "iot"."pm25" where measure_name='pm2.5' and city='Shenzhen'and time >= ago(30s)group by location,bin(time,30s)order by avg_pm25 desc)
复制代码


Save Panel as Shenzhen PM2.5 analysis

Edit Panel Title:Shenzhen PM2.5 analysis

Save Dashboard PM2.5 analysis 1


4.2.5 深圳华侨城时间序列分析(最近 5 分钟内 PM2.5 分析)

New Panel

select location, CREATE_TIME_SERIES(time, measure_value::bigint) as PM25 FROM iot.pm25where measure_name='pm2.5' and location='huaqiao city'and time >= ago(5m)GROUP BY location
复制代码


选择图形显示 select Lines; Select Points:

Save Panel as Shen Zhen Huaqiao City PM2.5 analysis

Edit Panel Title:深圳华侨城最近 5 分钟 PM2.5 分析

Save Dashboard PM2.5 analysis 1


4.2.6 找出过去 2 小时内深圳华侨城以 30 秒为间隔的平均 PM2.5 值 (使用线性插值填充缺失的值)

New Panel

WITH binned_timeseries AS (    SELECT location, BIN(time, 30s) AS binned_timestamp, ROUND(AVG(measure_value::bigint), 2) AS avg_PM25    FROM "iot".pm25    WHERE measure_name = 'pm2.5'        AND location='huaqiao city'        AND time > ago(2h)    GROUP BY location, BIN(time, 30s)), interpolated_timeseries AS (    SELECT location,        INTERPOLATE_LINEAR(            CREATE_TIME_SERIES(binned_timestamp, avg_PM25),                SEQUENCE(min(binned_timestamp), max(binned_timestamp), 30s)) AS interpolated_avg_PM25    FROM binned_timeseries    GROUP BY location)SELECT time, ROUND(value, 2) AS interpolated_avg_PM25FROM interpolated_timeseriesCROSS JOIN UNNEST(interpolated_avg_PM25)
复制代码


选择图形显示 select Lines:

Save Panel as Shen Zhen Huaqiao City PM2.5 analysis 1

Edit Panel Title:过去 2 小时深圳华侨城平均 PM2.5 值 (使用线性插值填充缺失值)

Save Dashboard PM2.5 analysis 1


4.2.7 过去 5 分钟内所有城市 PM2.5 平均值排名 (线性插值)

New Panel

SELECT CASE WHEN city = 'Shanghai' THEN inter_avg_PM25 ELSE NULL END AS Shanghai,CASE WHEN city = 'Beijing' THEN inter_avg_PM25 ELSE NULL END AS Beijing,CASE WHEN city = 'Guangzhou' THEN inter_avg_PM25 ELSE NULL END AS Guangzhou,CASE WHEN city = 'Shenzhen' THEN inter_avg_PM25 ELSE NULL END AS Shenzhen,CASE WHEN city = 'Hangzhou' THEN inter_avg_PM25 ELSE NULL END AS Hangzhou,CASE WHEN city = 'Nanjing' THEN inter_avg_PM25 ELSE NULL END AS Nanjing,CASE WHEN city = 'Chengdu' THEN inter_avg_PM25 ELSE NULL END AS Chengdu,CASE WHEN city = 'Chongqing' THEN inter_avg_PM25 ELSE NULL END AS Chongqing,CASE WHEN city = 'Tianjin' THEN inter_avg_PM25 ELSE NULL END AS Tianjin,CASE WHEN city = 'Shenyang' THEN inter_avg_PM25 ELSE NULL END AS Shenyang,CASE WHEN city = 'Sanya' THEN inter_avg_PM25 ELSE NULL END AS Sanya,CASE WHEN city = 'Lasa' THEN inter_avg_PM25 ELSE NULL END AS Lasafrom(WITH binned_timeseries AS (    SELECT city,location, BIN(time, 30s) AS binned_timestamp, ROUND(AVG(measure_value::bigint), 2) AS avg_PM25    FROM "iot".pm25    WHERE measure_name = 'pm2.5'        AND time > ago(5m)    GROUP BY city,location, BIN(time, 30s)), interpolated_timeseries AS (    SELECT city,location,        INTERPOLATE_LINEAR(            CREATE_TIME_SERIES(binned_timestamp, avg_PM25),                SEQUENCE(min(binned_timestamp), max(binned_timestamp), 30s)) AS interpolated_avg_PM25    FROM binned_timeseries    GROUP BY city,location), all_location_interpolated as (SELECT city,location,time, ROUND(value, 2) AS interpolated_avg_PM25FROM interpolated_timeseriesCROSS JOIN UNNEST(interpolated_avg_PM25))select city,avg(interpolated_avg_PM25) AS inter_avg_PM25from all_location_interpolatedgroup by cityorder by avg(interpolated_avg_PM25) desc)
复制代码


选择 Panel 图形类型:


Save Panel as all city analysis 1

Edit Panel Title:过去 5 分钟所有城市 PM2.5 平均值

Save Dashboard PM2.5 analysis 1


4.2.8 过去 5 分钟内 PM2.5 最高的十个采集点(线性插值)

New Panel

WITH binned_timeseries AS (    SELECT city,location, BIN(time, 30s) AS binned_timestamp, ROUND(AVG(measure_value::bigint), 2) AS avg_PM25    FROM "iot".pm25    WHERE measure_name = 'pm2.5'        AND time > ago(5m)    GROUP BY city,location, BIN(time, 30s)), interpolated_timeseries AS (    SELECT city,location,        INTERPOLATE_LINEAR(            CREATE_TIME_SERIES(binned_timestamp, avg_PM25),                SEQUENCE(min(binned_timestamp), max(binned_timestamp), 30s))                 AS interpolated_avg_PM25    FROM binned_timeseries    GROUP BY city,location), interpolated_cross_join as (SELECT city,location,time, ROUND(value, 2) AS interpolated_avg_PM25FROM interpolated_timeseriesCROSS JOIN UNNEST(interpolated_avg_PM25))select city,location, avg(interpolated_avg_PM25) as avg_PM25_locfrom interpolated_cross_joingroup by city,locationorder by avg_PM25_loc desclimit 10
复制代码


选择 Table

Save Panel as all city analysis 2

Edit Panel Title:过去 5 分钟内 PM2.5 最高的十个采集点(线性插值)

Save Dashboard PM2.5 analysis 1


4.2.9 过去 5 分钟内 PM2.5 最低的十个采集点(线性插值)

New Panel

WITH binned_timeseries AS (    SELECT city,location, BIN(time, 30s) AS binned_timestamp, ROUND(AVG(measure_value::bigint), 2) AS avg_PM25    FROM "iot".pm25    WHERE measure_name = 'pm2.5'        AND time > ago(5m)    GROUP BY city,location, BIN(time, 30s)), interpolated_timeseries AS (    SELECT city,location,        INTERPOLATE_LINEAR(            CREATE_TIME_SERIES(binned_timestamp, avg_PM25),                SEQUENCE(min(binned_timestamp), max(binned_timestamp), 30s))                 AS interpolated_avg_PM25    FROM binned_timeseries    GROUP BY city,location), interpolated_cross_join as (SELECT city,location,time, ROUND(value, 2) AS interpolated_avg_PM25FROM interpolated_timeseriesCROSS JOIN UNNEST(interpolated_avg_PM25))select city,location, avg(interpolated_avg_PM25) as avg_PM25_locfrom interpolated_cross_joingroup by city,locationorder by avg_PM25_loc asclimit 10
复制代码


选择 Table

Save Panel as all city analysis 3

Edit Panel Title:过去 5 分钟内 PM2.5 最低的十个采集点(线性插值)

Save Dashboard PM2.5 analysis 1

设置仪表板 每 5 秒钟刷新一次:



本 blog 着重介绍通过 Timestream、Kinesis Stream 托管服务和 Grafana 实现物联网(以 PM 2.5 场景为示例)时序数据实时采集、存储和分析,其中包含部署架构、环境部署、数据采集、数据存储和分析,希望当您有类似物联网时序数据存储和分析需求的时候,有所启发,实现海量物联网时序数据高效管理、挖掘物联网数据中蕴含的规律、模式和价值,助力业务发展。


附录:

《Amazon Timestream 开发人员指南》

https://docs.aws.amazon.com/zh_cn/timestream/latest/developerguide/what-is-timestream.html


《Amazon Timestream 开发程序示例》

https://github.com/awslabs/amazon-timestream-tools/tree/master/sample_apps


《Amazon Timestream 与 Grafana 集成示例》

https://docs.aws.amazon.com/zh_cn/timestream/latest/developerguide/Grafana.html#Grafana.sample-app

本篇作者:刘冰冰


亚马逊云科技数据库解决方案架构师,负责基于亚马逊云科技的数据库解决方案的咨询与架构设计,同时致力于大数据方面的研究和推广。在加入亚马逊云科技之前曾在 Oracle 工作多年,在数据库云规划、设计运维调优、DR 解决方案、大数据和数仓以及企业应用等方面有丰富的经验。



用户头像

还未添加个人签名 2019.09.17 加入

还未添加个人简介

评论

发布
暂无评论
手把手教你使用 Timestream 实现物联网时序数据存储和分析!