写点什么

大数据培训如何实现集成 Kafka 与 Storm 的结合

作者:@零度
  • 2022 年 4 月 13 日
  • 本文字数:3780 字

    阅读完需:约 12 分钟

 文章转载来源于数据仓库与 Python 大数据

到目前为止,我们已经知道 Storm 的作用主要是进行流式计算,对于源源不断的均匀数据流流入处理是非常有效的,而现实生活中大部分场景并不是均匀的数据流,而是时而多时而少的数据流入,这种情况下显然用批量处理是不合适的,如果使用 Storm 做实时计算的话可能因为数据拥堵而导致服务器挂掉,应对这种情况,使用 Kafka 作为消息队列是非常合适的选择,Kafka 可以将不均匀的数据转换成均匀的消息流,从而和 Storm 比较完善的结合,这样才可以实现稳定的流式计算,那么接下来开发一个简单的案例来实现 Storm 和 Kafka 的结合。

Storm 和 Kafka 结合,实质上无非是之前我们说过的计算模式结合起来,就是数据先进入 Kafka 生产者,然后 Storm 作为消费者进行消费,最后将消费后的数据输出或者保存到文件、数据库、分布式存储等等,具体框如下图 1 所示:


图 1 集成 Kafka 与 Storm 的框架

Storm 从 Kafka 中接收数据

为了实现 Kafka 与 Storm 的集成,我们需要添加以下的依赖信息。

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka_2.9.2</artifactId>

<version>0.8.2.2</version>

<exclusions>

<exclusion>

<groupId>org.apache.zookeeper</groupId>

<artifactId>zookeeper</artifactId>

</exclusion>

<exclusion>

<groupId>log4j</groupId>

<artifactId>log4j</artifactId>

</exclusion>

</exclusions>

</dependency>

<dependency>

<groupId>org.apache.storm</groupId>

<artifactId>storm-kafka</artifactId>

<version>1.0.3</version>

</dependency>

修改之前的 WordCountTopology 主程序,创建一个新的 KafkaSpout 组件,并将其作为 WordCountTopology 任务的输入。这 KafkaSpout 将从 Kafka 中接收消息,作为 Kafka 消息的 Consumer 使用。下面是创建这个 KafkaSpout 组件的核心代码程序_大数据培训

//支持从 Kakfa 消息系统中读取数据

private static KafkaSpout createKafkaSpout() {

//指定 ZooKeeper 的地址信息

BrokerHosts brokerHosts = new ZkHosts("kafka101:2181");

//创建 KafkaSpout 的配置信息。

SpoutConfig spoutConfig = new SpoutConfig(brokerHosts,

"mytopic1", "/mytopic1",

UUID.randomUUID().toString());

spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();

//返回一个 KafkaSpout

return new KafkaSpout(spoutConfig);

}

注意:Storm 默认是从头开始读取 Kafka 的消息,如果要从 Kafka 最新的便宜地址开始读取数据,需要添加如下代码:

conf.startOffsetTime = kafka.api.OffsetRequest.LatestTime();

另一方面,由于应用程序将从 Kafka 中接收消息,由于 Kafka 发送来的消息中并没有“sentence”字段,所以需要修改一下 WordCountSplitBolt 的代码,如下:

//进行单词拆分

public class WordCountSplitBolt extends BaseRichBolt{

private OutputCollector collector;

@Override

public void execute(Tuple tuple) {

//如何处理上一级组件发来的 Tuple

//取出数据: I love Beijing

//String data = tuple.getStringByField("sentence");

String data = tuple.getString(0);

String[] words = data.split(" ");

for(String w:words) { //k2 v2

this.collector.emit(new Values(w,1));

}

}

@Override

public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {

// OutputCollector 该级组件的输出流

this.collector = collector;

}

@Override

public void declareOutputFields(OutputFieldsDeclarer declare) {

declare.declare(new Fields("word","count"));

}

}

注意:这里的第 10 行代码和第 11 行代码的区别。

为了测试的方便,我们将 WordCountTopology 任务运行在本地模式下。下面为大家展示了完整的代码程序。

import org.apache.storm.Config;

import org.apache.storm.LocalCluster;

import org.apache.storm.StormSubmitter;

import org.apache.storm.generated.AlreadyAliveException;

import org.apache.storm.generated.AuthorizationException;

import org.apache.storm.generated.InvalidTopologyException;

import org.apache.storm.generated.StormTopology;

import org.apache.storm.kafka.BrokerHosts;

import org.apache.storm.kafka.KafkaSpout;

import org.apache.storm.kafka.SpoutConfig;

import org.apache.storm.kafka.StringScheme;

import org.apache.storm.kafka.ZkHosts;

import org.apache.storm.spout.SchemeAsMultiScheme;

import org.apache.storm.topology.IRichBolt;

import org.apache.storm.topology.IRichSpout;

import org.apache.storm.topology.TopologyBuilder;

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.ITuple;

public class WordCountTopology {

public static void main(String[] args) {

//创建一个 Topology

TopologyBuilder builder = new TopologyBuilder();

//指定任务的 spout 组件

//builder.setSpout("myspout", new WordCountSpout());

builder.setSpout("myspout", createKafkaSpout());

//指定拆分单词的 bolt,是随机分组

builder.setBolt("mysplit",

new WordCountSplitBolt())

.shuffleGrouping("myspout");

//指定单词计数的 bolt

builder.setBolt("mytotal",

new WordCountTotalBolt())

.fieldsGrouping("mysplit",

new Fields("word"));

//创建任务

StormTopology topology = builder.createTopology();

//配置参数

Config conf = new Config();

//本地模式

LocalCluster cluster = new LocalCluster();

cluster.submitTopology("MyWC", conf, topology);

}

private static IRichSpout createKafkaSpout() {

BrokerHosts zkHost = new ZkHosts("kafka101:2181");

SpoutConfig conf = new SpoutConfig(zkHost,

"mytopic1",

"/mytopic1",

"mygroupID");

//指定反序列化机制

conf.scheme = new SchemeAsMultiScheme(new StringScheme());

conf.startOffsetTime = kafka.api.OffsetRequest.LatestTime();

return new KafkaSpout(conf);

}

}

测试 Kafka 与 Storm 的集成

(1) 在 kafka101、kafka102 和 kafka103 上启动 ZooKeeper 集群,执行下面的命令 :

zkServer.sh start

(2) 启动 Kafka 集群

(3) 执行下面的命令,启动 Kafka Producer Console

bin/kafka-console-producer.sh --broker-list kafka101:9092--topic mytopic1

(4) 启动 Storm 应用程序,如下图 2 所示:


图 2 启动 Storm 应用程序

(5)在 Kafka Producer Console 输入一些字符串,并回车发送。观察 Storm 应用程序的结果。可以看到当在 Kafka Producer Console 上输入了数据,在 Storm 的应用程序中将实时接收消息,并处理消息。如下图 3 所示:


图 3 Storm 应用程序接收 Kafka 的消息

Storm 将数据输出到 Kafka

在前面的例子中,我们集成了 Storm 和 Kafka。将 Kafka 作为 Storm 的 Spout,Storm 将从 Kafka 中接收数据,并处理数据。其实还有另一种情况,就是 Storm 处理完成后,也可以将数据输出到 Kafka 中,下图描述了这一过程。


图 4 Storm 应用程序发送消息到 Kafka

在了解上面的过程后,我们可以改造一下之前的 Topology 主程序的代码,创建一个新的 Bolt 任务,将任务处理完成的数据输出到 Kafka 中,下面列出了改造后的代码程序。

private static IRichBolt createKafkaBolt() {

Properties props = new Properties();

//broker 的地址

props.put("bootstrap.servers", "kafka101:9092");

//说明了使用何种序列化方式将用户提供的 key 和 vaule 值序列化成字节。

props.put("key.serializer",

"org.apache.kafka.common.serialization.StringSerializer");

props.put("value.serializer",

"org.apache.kafka.common.serialization.StringSerializer");

props.put("acks", "1");

KafkaBolt<String, String> bolt = new KafkaBolt<String, String>();

//指定 Kafka 的配置信息

bolt.withProducerProperties(props);

//指定 Topic 的名字

bolt.withTopicSelector(new DefaultTopicSelector("mytopic1"));

//指定将上一级 Bolt 处理完成后的 Key 和 Value//

//KafkaBolt 将会按照这里指定的 Key 和 Value 将数据发送到 Kakfa

bolt.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper

<String, String>("word","total"));

return bolt;

}

创建好了新的 Bolt 组件后,可以将其添加进入 Topology 任务中,下面列出了改造后的主程序代码。

public static void main(String[] args) {

//创建一个 Topology

TopologyBuilder builder = new TopologyBuilder();

//指定任务的 spout 组件

//builder.setSpout("myspout", new WordCountSpout());

builder.setSpout("myspout", createKafkaSpout());

//指定拆分单词的 bolt,是随机分组

builder.setBolt("mysplit",

new WordCountSplitBolt())

.shuffleGrouping("myspout");

//指定单词计数的 bolt

builder.setBolt("mytotal",

new WordCountTotalBolt())

.fieldsGrouping("mysplit",

new Fields("word"));

//创建 KafkaBolt,将结果发送到 Kafka 中

builder.setBolt("kafkabolt", createKafkaBolt())

.shuffleGrouping("mytotal");

//创建任务

StormTopology topology = builder.createTopology();

//配置参数

Config conf = new Config();

//本地模式

LocalCluster cluster = new LocalCluster();

cluster.submitTopology("MyWC", conf, topology);

}

注意:这里的第 21 行代码和第 22 行代码,我们使用了之前的 createKafkaBolt 方法,将 KafkaBolt 任务添加进了 Topology 任务中。


用户头像

@零度

关注

关注尚硅谷,轻松学IT 2021.11.23 加入

IT培训 www.atguigu.com

评论

发布
暂无评论
大数据培训如何实现集成Kafka与Storm的结合_kafka_@零度_InfoQ写作平台