写点什么

博文推荐|使用 Apache Pulsar 构建边缘应用程序

作者:Apache Pulsar
  • 2022 年 3 月 01 日
  • 本文字数:5890 字

    阅读完需:约 19 分钟

本文由 StreamNative 组织 Apache Pulsar 中文社区志愿者翻译。原文来自 StreamNative 英文博客《Building Edge Applications With Apache Pulsar》,作者 Tim Spann,StreamNative 布道师。 译者:YOLO,就职于 BSC BOMC ORP 的 bomc 团队。

原文链接:https://streamnative.io/blog/engineering/2021-11-17-building-edge-applications-with-apache-pulsar/

近年来,远程连接设备的爆炸性增长为集中式计算范式带来了挑战。受到网络和基础设施的限制,企业越来越难以在不出现延迟或性能问题的情况下,在数据中心或云中移动和处理所有设备生成的数据。因此,边缘应用程序逐渐兴起。据 Gartner[1]估计,到 2025 年,企业将在数据中心或云之外创建和处理 75% 的数据。

那么什么是边缘应用程序?边缘应用程序运行在数据源上或其附近,如物联网设备、本地边缘服务器、边缘执行。边缘计算使计算、存储、缓存、管理、告警、机器学习和路由都能够在数据中心和云之外进行。零售、农业、制造、运输、医疗和电信等行业通过采用边缘应用程序,从而实现更低的延迟、更好的带宽、更低的基础设施成本和更高效的决策。

本文将为大家介绍开发边缘应用程序所面临的一些挑战,以及 Apache Pulsar 应用于边缘应用程序的解决方案。本文还将分享一个示例,逐步展示如何使用 Pulsar 构建边缘应用程序。

关键挑战

边缘计算的分散性在带来许多好处的同时也带来了挑战,其中主要包括:

  • 边缘应用程序通常需要支持各种设备、协议、语言和数据格式。

  • 来自边缘应用程序的通信需要与来自传感器、日志和应用程序的事件流以快速但不均匀的速度进行异步。

  • 数据的边缘生产者根据设计要求需要部署不同的消息传递集群。

  • 从设计上看,边缘应用程序在地理上具有分散性和多样性的特点。


解决办法

需要一个适应性强、混合、支持地理复制且可扩展的开源解决方案,以能够解决构建边缘应用程序所面临的问题。拥有众多用户的开源项目可以提供广泛的社区支持,以及边缘应用程序所需的丰富生态系统,包括适配器、连接器和扩展等。在过去二十年中,基于我与不同技术和开源项目的合作经验,我相信 Apache Pulsar 满足了边缘应用程序的需求。

Apache Pulsar 是一个开源、云原生、分布式消息流平台。自 2018 年 Pulsar 成为 Apache 软件基金会顶级项目以来,它的社区参与、周边生态增长和全球使用率都飞速增长。Pulsar 之所以能够解决边缘计算中存在的诸多挑战,归功于以下几点:

  • Apache Pulsar 支持多种 Schema 下的快速消息传递、元数据和多种数据格式。

  • Pulsar 支持 Go、C++、Java、Node.js、Websockets 和 Python 等多语言客户端。此外,还有社区开发者提供的 Haskell、Scala、Rust 和.Net 开源客户端,以及 Apache Flink 和 Apache Spark 的流处理库。

  • Pulsar 支持多种消息协议,包括 MQTT、Kafka、AMQP 和 JMS。

  • Pulsar 的跨地域复制功能解决了分布式设备的位置问题。

  • Pulsar 云原生的架构让其可以在多云、本地或 Kubernetes 环境中运行。它还可以适配小型边缘网关,以及像 NVIDIA Jetson Xavier NX 这样强大的设备。

在本示例中,我们在 NVIDIA Jetson Xavier NX 上构建边缘应用程序,它为我们运行边缘 Apache Pulsar 单机 broker、多个 web 摄像头和深度学习边缘应用程序提供了足够的能力。我的边缘设备包含 384 个 NVIDIA CUDA® 内核和 48 个 Tensor 内核、6 个 64 位 ARM 内核和 8 GB 128 位 LPDDR4x RAM。在后续博客中,我将向大家展示,即使在 Raspberry PI 4s 和 NVIDIA Jetson Nano 等更为简单的设备上运行 Pulsar,仍然可以满足快速边缘事件流的需要。

架构

上文介绍了解决方案的物理结构,那么现在的问题是如何对传入数据有逻辑地搭建应用架构。对于不熟悉 Pulsar 的人,首先需要了解到每个主题都属于租户和命名空间,如下图所示。


这些逻辑结构支持我们根据各种标准(如数据的原始来源和不同的业务)将数据进行分组。一旦我们决定了租户、命名空间和主题,我们就需要确定收集分析所需的额外数据所需字段。


接下来,我们需要确定数据的格式。根据不同的架构,它可以与原始格式相同,也可以根据传输、处理或存储的具体要求进行转换。此外,在许多情况下,我们的设备、设施、传感器、操作系统或传输模式会要求我们选择特定的数据格式。


在本文中,我们将使用 JSON 数据格式,它可以满足几乎任何语言和多数人的可读性需求。此外,Apache Avro 作为一种二进制格式,也不失为一个不错的选择,但我的系列博客会选择最简单的格式。

选定数据格式之后,我们可能需要在传感器、机器学习分类、日志或其他来源之外添加额外字段来丰富原始数据。我喜欢添加 IP 地址、mac 地址、主机名、创建时间戳、执行时间,以及一些关于设备运行状况的字段,如磁盘空间、内存和 CPU。如果您认为没有必要,或者您的设备已经广播设备运行状况,则可以适当增减。尤其是当拥有数千台设备时,这些字段可以帮助我们调试程序。因此,我习惯在带宽允许的情况下添加这些数据。


我们需要为事件记录找到主键或唯一标识符,物联网数据通常没有自带。我们可以在创建记录时用 UUID 生成器合成一个。


拥有个字段列表后,我们需要为数据设置一个 schema,并确定字段名、类型、默认值和是否为空。一旦定义了一个 schema,我们就可以使用 JSON schema 或使用字段构建一个类,进而使用 Pulsar SQL 查询主题中的数据。对于物联网应用程序而言,通常会用到此类事件的时间序列主数据存储。我推荐 Aerospike、InfluxDB 或 ScyllaDB。我们可以根据场景和需求使用 Pulsar IO Sink 连接器或其他机制。必要时,我们还可以使用 Spark 连接器、Flink 连接器或 NiFi 连接器。


我们的最终事件会和如下所示的 JSON 示例类似。

{"uuid": "xav_uuid_video0_lmj_20211027011044", "camera": "/dev/video0", "ipaddress": "192.168.1.70", "networktime": 4.284832000732422, "top1pct": 47.265625, "top1": "spotlight, spot", "cputemp": "29.0", "gputemp": "28.5", "gputempf": "83", "cputempf": "84", "runtime": "4", "host": "nvidia-desktop", "filename": "/home/nvidia/nvme/images/out_video0_tje_20211027011044.jpg", "imageinput": "/home/nvidia/nvme/images/img_video0_eqi_20211027011044.jpg", "host_name": "nvidia-desktop", "macaddress": "70:66:55:15:b4:a5", "te": "4.1648781299591064", "systemtime": "10/26/2021 21:10:48", "cpu": 11.7, "diskusage": "32367.5 MB", "memory": 82.1}
复制代码


边缘生产者

接下来我们在 NVIDIA Jetson Xavier NX 上测试一些库、语言和客户端,看看哪个最适合我们的场景。在 NVIDIA Jetson Xavier NX 版本 ARM 安装 Ubuntu 系统运行多个库进行原型设计之后,我发现了如下技术选项,它们可以生成符合我的应用程序所需的消息。对于这个边缘平台来说,这些虽然不是唯一路径,但不失为非常好的选择。

  • Go Lang Pulsar Producer

  • Python 3.x Websocket Producer

  • Python 3.x MQTT Producer

  • Java 8 Pulsar Producer

  • Go Lang Pulsar Producer

Go 语言 Pulsar 生产者

package main
import ( "context" "fmt" "log" "github.com/apache/pulsar-client-go/pulsar" "github.com/streamnative/pulsar-examples/cloud/go/ccloud" "github.com/hpcloud/tail")
func main() { client := ccloud.CreateClient()
producer, err := client.CreateProducer(pulsar.ProducerOptions{ Topic: "jetson-iot", }) if err != nil { log.Fatal(err) } defer producer.Close()
t, err := tail.TailFile("demo1.log", tail.Config{Follow:true}) for line := range t.Lines { if msgId, err := producer.Send(context.Background(),&pulsar.ProducerMessage{ Payload: []byte(line.Text), }); err != nil { log.Fatal(err) } else { fmt.Printf("jetson:Published message: %v-%s \n",msgId,line.Text) } }}
复制代码


Python3 Websocket 生产者

import requests, uuid, websocket, base64, json
uuid2 = uuid.uuid4()row = {}row['host'] = 'nvidia-desktop'ws = websocket.create_connection( 'ws://server:8080/ws/v2/producer/persistent/public/default/energy')message = str(json.dumps(row) )message_bytes = message.encode('ascii')base64_bytes = base64.b64encode(message_bytes)base64_message = base64_bytes.decode('ascii')ws.send(json.dumps({ 'payload' : base64_message, 'properties': { 'device' : 'jetson2gb', 'protocol' : 'websockets' },'key': str(uuid2), 'context' : 5 }))response = json.loads(ws.recv())if response['result'] == 'ok': print ('Message published successfully')else: print ('Failed to publish message:', response)ws.close()
复制代码

带有 Schema 的 Java Pulsar 生产者

public static void main(String[] args) throws Exception {        JCommanderPulsar jct = new JCommanderPulsar();        JCommander jCommander = new JCommander(jct, args);        if (jct.help) {            jCommander.usage();            return;        }        PulsarClient client = null;
if ( jct.issuerUrl != null && jct.issuerUrl.trim().length() >0 ) { try { client = PulsarClient.builder() .serviceUrl(jct.serviceUrl.toString()) .authentication(AuthenticationFactoryOAuth2.clientCredentials(new URL(jct.issuerUrl.toString()),new URL(jct.credentialsUrl.toString()), jct.audience.toString())).build(); } catch (PulsarClientException e) { e.printStackTrace(); } catch (MalformedURLException e) { e.printStackTrace(); } } else { try { client = PulsarClient.builder().serviceUrl(jct.serviceUrl.toString()).build(); } catch (PulsarClientException e) { e.printStackTrace(); } }
UUID uuidKey = UUID.randomUUID(); String pulsarKey = uuidKey.toString(); String OS = System.getProperty("os.name").toLowerCase(); String message = "" + jct.message; IoTMessage iotMessage = parseMessage("" + jct.message); String topic = DEFAULT_TOPIC; if ( jct.topic != null && jct.topic.trim().length()>0) { topic = jct.topic.trim(); } ProducerBuilder<IoTMessage> producerBuilder = client.newProducer(JSONSchema.of(IoTMessage.class)) .topic(topic) .producerName("jetson"). sendTimeout(5, TimeUnit.SECONDS);
Producer<IoTMessage> producer = producerBuilder.create();
MessageId msgID = producer.newMessage() .key(iotMessage.getUuid()) .value(iotMessage) .property("device", OS) .property("uuid2", pulsarKey) .send(); producer.close(); client.close(); producer = null; client = null; }
private static IoTMessage parseMessage(String message) {
IoTMessage iotMessage = null;
try { if ( message != null && message.trim().length() > 0) { ObjectMapper mapper = new ObjectMapper(); iotMessage = mapper.readValue(message, IoTMessage.class); mapper = null; } } catch(Throwable t) { t.printStackTrace(); }
if (iotMessage == null) { iotMessage = new IoTMessage(); } return iotMessage; }
java -jar target/IoTProducer-1.0-jar-with-dependencies.jar --serviceUrl pulsar://nvidia-desktop:6650 --topic 'iotjetsonjson' --message "...JSON…"
复制代码

你可以在这里找到所有的源代码。

现在,我们决定如何在设备上执行应用程序。可以使用系统附带的调度器,如 cron 或一些附加组件。作为参考,我经常使用 cron、MiNiFi 代理、Shell 脚本,或者将应用程序作为服务连续运行。您需要自行配置您的设备和传感器以获得最佳调度。

验证数据并进行监控

现在,我们有了源源不断的事件流进入我们的 Pulsar 集群,我们可以验证数据并监控进展。以 StreamNative Cloud Manager 界面为例,如下图所示。我们还可以选择查看此处记录的 Pulsar 指标端点。


通过 REST 检查统计数据

  • http://:8080/admin/v2/persistent/public/default/mqtt-2/stats

  • http://:8080/admin/v2/persistent/public/default/mqtt-2/internalStats

通过 Admin CLI 检查统计信息

bin/pulsar-admin topics stats-internal persistent://public/default/mqtt-2
复制代码


查找主题所在的订阅

http://nvidia-desktop:8080/admin/v2/persistent/public/default/mqtt-2/subscriptions

通过 REST 从订阅中消费

http://nvidia-desktop:8080/admin/v2/persistent/public/default/mqtt-2/subscription/mqtt2/position/10

通过 CLI 消费消息

bin/pulsar-client consume "persistent://public/default/mqtt-2" -s "mqtt2" -n 5
复制代码

通过 Pulsar SQL 查询主题

select * from pulsar."public/default".iotjetsonjson;


后续步骤

现在,我们已经构建了一个边缘应用程序,它可以以事件速度传输数据,并将数千个其他应用程序的流数据连接到 Apache Pulsar 集群中。接下来,我们可以使用 Flink SQL 添加丰富的实时分析。由此我们可以进行高级流处理、整合事件流以及进行大规模数据处理。

延伸阅读

如果您有兴趣了解有关边缘应用的更多信息并构建自己的连接器,请参阅以下资源:



关注公众号「Apache Pulsar」,获取更多技术干货


加入 Apache Pulsar 中文交流群


用户头像

Apache Pulsar

关注

下一代云原生分布式消息流平台 2017.10.17 加入

Apache 软件基金会顶级项目,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展流数据存储特性。

评论

发布
暂无评论
博文推荐|使用 Apache Pulsar 构建边缘应用程序_开源_Apache Pulsar_InfoQ写作平台