写点什么

四行代码开启流计算之旅

作者:吴英骏
  • 2023-06-19
    北京
  • 本文字数:4578 字

    阅读完需:约 15 分钟

四行代码开启流计算之旅

十年前,我接触了一个大数据项目,叫做 Stratosphere(http://stratosphere.eu/)。这个项目用一个页面就吸引了我:使用 3 行代码便可以在单机上启动一个集群并使用 MapReduce 方式计算 WordCount 程序。要知道,在 Hadoop 盛行的年代,从安装 Hadoop 到写 WordCount 程序跑起来都要花好几个小时。当看到一个项目能够用 3 行代码便能做到同样功能的时候,很显然会令人眼前一亮。也是因为这 3 行代码,让我深度探索了这个项目,并在之后成为了这个项目的贡献者。


如今,那个曾经叫做 Stratosphere 的项目已经改名成 Apache Flink,并且成为了大数据时代最火的流计算引擎。而于之前那个 Stratosphere 不同的是,Flink 已经成为了一个庞大的项目,其复杂程度令人生畏。而那个参与 Flink 最早一版流计算引擎设计开发的我,依然向往着极简的用户体验,期望用户能够开速上手,极速体验到简单高效的流计算。带着这一信念,我与小伙伴们一起打造了云时代的流数据库RisingWave,为用户提供如使用 PostgreSQL 一般的高性能流计算体验。


在本文,我就为大家演示,如何使用 4 行代码,使用 RisingWave 开启你的流计算之旅。

什么是流计算

注:如果你对流计算已经有一定的认识,那么可以直接跳过这一段。


批计算与流计算是数据处理的两种最基本模式。在过去的 20 年中,批计算系统与流计算系统都经历了快速迭代,从单机时代到分布式时代,再从大数据时代到云时代,批计算系统与流计算系统均在架构上有了大幅改进。

批计算与流计算在本世纪发展历程。


批计算与流计算最核心的两个区别在于:

  • 批计算系统的计算由用户驱动,而流计算系统的计算由事件驱动;

  • 批计算系统采用全量计算模型,而流计算系统采用增量计算模型。


不管是批计算还是流计算,它们都是往“实时”这一方向发展。如今,批系统被广泛的运用在交互式分析场景中,而流计算系统则被广泛的运用在监控、告警、自动化等场景。

实时批计算系统与实时流计算系统的对比。


RisingWave:用 PostgreSQL 的体验进行流计算

RisingWave 是一款以 Apache 2.0 协议开源的 SQL 流数据库。它使用 PostgreSQL 兼容的接口,允许用户能像操作 PostgreSQL 数据库一样进行分布式流计算。


RisingWave 最典型的场景有两个:一个是流式 ETL,另一个是流式分析。


所谓流式 ETL,便是将各类消息源(如 OLTP 数据库、消息队列、文件系统等)经过加工之后(比如进行 join、aggregation、groupby、windowing 等)实时导入到终端系统(如 OLAP 数据库、数据仓库、数据湖等分析类系统,或是导回 OLTP 数据库、消息队列、文件系统中)。在该场景中,RisingWave 可完全替换 Apache Flink。


流式ETL场景。


所谓流式分析,便是将各类消息源(如 OLTP 数据库、消息队列、文件系统等)经过加工之后(比如进行 join、aggregation、groupby、windowing 等)直接呈现在终端 BI 报表中,或是允许用户直接使用不同语言客户端库访问。在该场景中,RisingWave 可替换 Apache Flink 与 SQL/NoSQL 数据库(如 MySQL、PostgreSQL、Cassandra、Redis 等)的组合。


流式分析场景。


4 行代码部署 RisingWave

首先在 Linux 或者 Mac 的命令行窗口里执行三行命令安装并运行 RisingWave:


$ brew tap risingwavelabs/risingwave$ brew install risingwave$ risingwave playground
复制代码


然后打开一个命令行窗口,执行以下命令连接到 RisingWave:


$ psql -h localhost -p 4566 -d dev -U root
复制代码


为了易于理解,我们先尝试创建一个表,并用 INSERT 来添加一些测试数据。在实际场景中,我们需要从消息队列里拿数据,那个部分留到后面再介绍。


我们来创建一个网页浏览记录的表:


CREATE TABLE website_visits (  timestamp TIMESTAMP,  user_id VARCHAR,  page_id VARCHAR,  action VARCHAR);
复制代码


接下来我们创建一个物化视图来统计每个页面的访问量,访问者数量,以及最后访问时间。这里要插一句,基于流数据的物化视图是 RisingWave 的一个核心功能。


CREATE MATERIALIZED VIEW page_visits_mv ASSELECT page_id,       COUNT(*) AS total_visits,       COUNT(DISTINCT user_id) AS unique_visitors,       MAX(timestamp) AS last_visit_timeFROM website_visitsGROUP BY page_id;
复制代码


我们用 INSERT 来加入一些数据。


INSERT INTO website_visits (timestamp, user_id, page_id, action) VALUES  ('2023-06-13T10:00:00Z', 'user1', 'page1', 'view'),  ('2023-06-13T10:01:00Z', 'user2', 'page2', 'view'),  ('2023-06-13T10:02:00Z', 'user3', 'page3', 'view'),  ('2023-06-13T10:03:00Z', 'user4', 'page1', 'view'),  ('2023-06-13T10:04:00Z', 'user5', 'page2', 'view');
复制代码


看一下目前的结果:


SELECT * from page_visits_mv;
-----Results page_id | total_visits | unique_visitors | last_visit_time
---------+--------------+-----------------+---------------------
page2 | 2 | 2 | 2023-06-13 10:04:00
page3 | 1 | 1 | 2023-06-13 10:02:00
page1 | 2 | 2 | 2023-06-13 10:03:00
(3 rows)
复制代码


让我们再插入 5 组数据:


INSERT INTO website_visits (timestamp, user_id, page_id, action) VALUES  ('2023-06-13T10:05:00Z', 'user1', 'page1', 'click'),  ('2023-06-13T10:06:00Z', 'user2', 'page2', 'scroll'),  ('2023-06-13T10:07:00Z', 'user3', 'page1', 'view'),  ('2023-06-13T10:08:00Z', 'user4', 'page2', 'view'),  ('2023-06-13T10:09:00Z', 'user5', 'page3', 'view');
复制代码


分两次插入数据是想模拟数据不停进入的过程。让我们再来看一下现在的结果:


SELECT * FROM page_visits_mv;-----Results page_id | total_visits | unique_visitors |   last_visit_time
---------+--------------+-----------------+---------------------
page1 | 4 | 3 | 2023-06-13 10:07:00
page2 | 4 | 3 | 2023-06-13 10:08:00
page3 | 2 | 2 | 2023-06-13 10:09:00
(3 rows)
复制代码


我们看到结果已经更新。如果我们处理的是真正的流数据,那么这个结果是会自动保持最新的。

实现与 Kafka 交互

鉴于流数据处理中消息队列较为常用,我们可以来看一下如何实时获取并处理 Kafka 中的数据。


如果你还没安装 Kafka,那么先到官网下载合适的压缩包(这里以 3.4.0 为例),然后解压:


$ tar -xzf kafka_2.13-3.4.0.tgz$ cd kafka_2.13-3.4.0
复制代码


下面我们来启动 Kafka。


  1. 生成一个集群 UUID:


$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
复制代码


  1. 格式化日志目录:


$ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
复制代码


  1. 启动 Kafka 服务器:


$ bin/kafka-server-start.sh config/kraft/server.properties
复制代码


启动 Kafka 服务器之后,我们可以创建一个话题(topic)。


$ bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
复制代码


创建成功后,我们就可以从命令行直接输入消息。


先运行以下命令启动生产者程序:


$ bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
复制代码


待出现>符号时,我们可以输入消息了。为了方便在 RisingWave 消费数据,我们按照 JSON 格式输入数据。


{"timestamp": "2023-06-13T10:05:00Z", "user_id": "user1", "page_id": "page1", "action": "click"}{"timestamp": "2023-06-13T10:06:00Z", "user_id": "user2", "page_id": "page2", "action": "scroll"}{"timestamp": "2023-06-13T10:07:00Z", "user_id": "user3", "page_id": "page1", "action": "view"}{"timestamp": "2023-06-13T10:08:00Z", "user_id": "user4", "page_id": "page2", "action": "view"}{"timestamp": "2023-06-13T10:09:00Z", "user_id": "user5", "page_id": "page3", "action": "view"}
复制代码


我们可以启动一个消费者程序来查看我们输入的消息。


$ bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092
复制代码


下面我们看一下在 RisingWave 怎么来获取这个消息队列的数据。在这个场景中,RisingWave 扮演的是消息的消费者角色。我们现在回到 RisingWave 窗口(即 psql 窗口),创建一个数据源(source),这样就能与刚才创建的主题(topic)建立连接。需要注意的是,这里只是建立连接,还没真正的开始消费数据。


在创建数据源时,对于 JSON 类型的数据,我们可以直接定义 schema 来映射流数据中的相关字段。为了避免与上面的表重名,我们将数据源命名为website_visits_stream


CREATE source IF NOT EXISTS website_visits_stream (    timestamp TIMESTAMP,    user_id VARCHAR,    page_id VARCHAR,    action VARCHAR    )WITH (    connector='kafka',    topic='test',    properties.bootstrap.server='localhost:9092',    scan.startup.mode='earliest'    )ROW FORMAT JSON;
复制代码


我们需要创建一个物化视图(materialized view)来让 RisingWave 开始摄取数据并进行计算。为了便于理解,我们创建了与上面的例子中类似的物化视图。


CREATE MATERIALIZED VIEW visits_stream_mv AS    SELECT page_id,    COUNT(*) AS total_visits,    COUNT(DISTINCT user_id) AS unique_visitors,    MAX(timestamp) AS last_visit_time    FROM test    GROUP BY page_id;
复制代码


我们现在可以看一下结果:


SELECT * FROM visits_stream_mv;-----Results
page_id | total_visits | unique_visitors | last_visit_time
---------+--------------+-----------------+---------------------
page1 | 1 | 1 | 2023-06-13 10:07:00
page2 | 3 | 2 | 2023-06-13 10:08:00
page3 | 1 | 1 | 2023-06-13 10:09:00
(3 rows)
复制代码


至此我们已经 从 Kafka 里获取数据并对数据进行了处理。

进阶:用 RisingWave 搭一个实时监控系统

在流处理的应用中,实时监控是一个较为常见的需求。你可以在对数据进行实时处理后进行实时的可视化展示。RisingWave 可以作为数据源,直接接入可视化工具(例如 Superset, Grafana 等)并将处理后的指标数据进行实时展示。鼓励各位可以尝试一下自己搭建一个流处理+可视化展示的系统。具体的步骤可以参考我们的使用场景文档。在该文档中,我们使用 RisingWave 来监控和处理系统运行指标,并实时在 Grafana 中展示。我们的演示较为简单,相信各位基于真实数据,在自己熟悉的业务场景中,能够实现丰富得多的展示效果。


使用RisingWave进行监控,并实时将结果展示在Grafana中。


总结

RisingWave 的最大特点是简洁:用户可以几乎无门槛的使用 SQL 进行分布式流计算。在性能方面,RisingWave 也远胜于 Apache Flink 等大数据时代的流计算平台。那么性能到底有多少提升?剧透一下:无状态(stateless)计算的性能提升在 10%-30% 左右,有状态(stateful)计算的性能提升在 10 倍或以上!敬请期待即将发布的性能报告。高效流计算平台,就应该从简单开始。


关于 RisingWave

RisingWave 是一个云原生 SQL 流式数据库。其旨在降低构建实时应用的门槛以及成本。

GitHub: risingwave.com/github

官网: risingwave.com

Slack:risingwave.com/slack

文档: risingwave.dev

B 站:RisingWave中文开源社区

微信公众号:RisingWave 中文开源社区

社区用户交流群:risingwave_assistant

发布于: 刚刚阅读数: 9
用户头像

吴英骏

关注

还未添加个人签名 2018-11-14 加入

RisingWave Labs 创始人 & CEO

评论

发布
暂无评论
四行代码开启流计算之旅_数据库_吴英骏_InfoQ写作社区