写点什么

Flink 消费 kafka 消息实战

作者:程序员欣宸
  • 2022 年 7 月 27 日
  • 本文字数:4689 字

    阅读完需:约 15 分钟

Flink消费kafka消息实战

欢迎访问我的 GitHub

这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos


  • 本次实战的内容是开发 Flink 应用,消费来自 kafka 的消息,进行实时计算;

环境情况

  • 本次实战用到了三台机器,它们的 IP 地址和身份如下表所示:



  • 注意:

  1. 本文的重点是 Flink,所以在 192.168.1.101 这台机器上通过 Docker 快速搭建了 kafka server 和消息生产者,只要向这台机器的消息生产者容器发起 http 请求,就能生产一条消息到 kafka;

  2. 192.168.1.104 这台机器安装了 Apache Bench,可以通过简单的命令,向 192.168.1.101 发起大量 http 请求,这样就能产生大量 kafka 消息;


  • 整体架构如下图:

操作步骤

  1. 在机器 192.168.1.101 上部署三个容器(消息生产者、zookeeper、kafka);

  2. 在机器 192.168.1.104 上安装 Apache Bench;

  3. 在机器 192.168.1.102 上配置 kafak 相关的 host;

  4. 开发 Flink 应用,部署到机器 192.168.1.102;

  5. 在机器 192.168.1.104 上发起压力测试,请求地址是消息生产者的 http 接口地址,产生大量消息;

  6. 观察 Flink 应用的处理情况;

版本信息

  1. 操作系统:Centos7

  2. docker:17.03.2-ce

  3. docker-compose:1.23.2

  4. kafka:0.11.0.3

  5. zookeeper:3.4.9

  6. JDK:1.8.0_191

  7. spring boot:1.5.9.RELEASE

  8. spring-kafka:1.3.8.RELEASE

  9. Flink:1.7

在机器 192.168.1.101 上部署三个容器(消息生产者、zookeeper、kafka)

  • 构建 kafka 相关的环境不是本文重点,因此这里利用 docker 快速实现,步骤如下:

  • 在机器 192.168.1.101 上安装 docker 和 docker-compose;

  • 创建 docker-compose.yml 文件,内容如下:


version: '2'services:  zookeeper:    image: wurstmeister/zookeeper    ports:      - "2181:2181"  kafka1:    image: wurstmeister/kafka:2.11-0.11.0.3    ports:      - "9092:9092"    environment:      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092      KAFKA_LISTENERS: PLAINTEXT://:9092      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181      KAFKA_CREATE_TOPICS: "topic001:2:1"    volumes:      - /var/run/docker.sock:/var/run/docker.sock  producer:    image: bolingcavalry/kafka01103producer:0.0.1-SNAPSHOT    ports:      - "8080:8080"
复制代码


  • 在 docker-compose.yml 所在目录执行命令**docker-compose up -d</font>,即可启动容器;

  • 如果您想了解更多 docker 环境下 kafka 消息生产者的细节,请参考《如何使用Docker内的kafka服务》

在机器 192.168.1.104 上安装 Apache Bench

  • 不同的操作系统安装 Apache Bench 的命令也不一样:


  1. ubuntu 上的安装命令**apt-get install apache2-utils</font>;

  2. centos 上的安装命令**yum install httpd-tools</font>;

源码下载

  • 接下来的实战是编写 Flink 应用的源码,您可以选择直接从 GitHub 下载这个工程的源码,地址和链接信息如下表所示:


  • 这个 git 项目中有多个文件夹,本章源码在 flinkkafkademo 这个文件夹下,如下图红框所示:

开发 Flink 应用,部署到机器 192.168.1.102


mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.7.0
复制代码


  • 根据提示,输入 groupId 为 com.bolingcavalry,artifactId 为 flinkkafkademo,其他的直接按下回车键即可使用默认值,这样就得到了一个 maven 工程:flinkkafkademo;

  • 打开工程的 pom.xml 文件,增加以下两个依赖:


<dependency>  <groupId>org.apache.flink</groupId>    <artifactId>flink-connector-kafka-0.11_2.12</artifactId>    <version>${flink.version}</version></dependency>
<dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.28</version></dependency>
复制代码


  • 新增一个辅助类,用于将 kafka 消息中的内容转换成 java 对象:


/** * @Description: 解析原始消息的辅助类 * @author: willzhao E-mail: zq2599@gmail.com * @date: 2019/1/1 20:13 */public class JSONHelper {
/** * 解析消息,得到时间字段 * @param raw * @return */ public static long getTimeLongFromRawMessage(String raw){ SingleMessage singleMessage = parse(raw); return null==singleMessage ? 0L : singleMessage.getTimeLong(); }
/** * 将消息解析成对象 * @param raw * @return */ public static SingleMessage parse(String raw){ SingleMessage singleMessage = null;
if (raw != null) { singleMessage = JSONObject.parseObject(raw, SingleMessage.class); }
return singleMessage; }}
复制代码


  • SingleMessage 对象的定义:


public class SingleMessage {
private long timeLong; private String name; private String bizID; private String time; private String message;
public long getTimeLong() { return timeLong; }
public void setTimeLong(long timeLong) { this.timeLong = timeLong; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public String getBizID() { return bizID; }
public void setBizID(String bizID) { this.bizID = bizID; }
public String getTime() { return time; }
public void setTime(String time) { this.time = time; }
public String getMessage() { return message; }
public void setMessage(String message) { this.message = message; }}
复制代码


  • 实时处理的操作都集中在 StreamingJob 类,源码的关键位置已经加了注释,就不再赘述了:



package com.bolingcavalry;
import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.tuple.Tuple;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;import org.apache.flink.streaming.api.functions.windowing.WindowFunction;import org.apache.flink.streaming.api.watermark.Watermark;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import javax.annotation.Nullable;import java.util.Properties;
/** * Skeleton for a Flink Streaming Job. * * <p>For a tutorial how to write a Flink streaming application, check the * tutorials and examples on the <a href="http://flink.apache.org/docs/stable/">Flink Website</a>. * * <p>To package your application into a JAR file for execution, run * 'mvn clean package' on the command line. * * <p>If you change the name of the main class (with the public static void main(String[] args)) * method, change the respective entry in the POM.xml file (simply search for 'mainClass'). */public class StreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); // 要设置启动检查点 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties props = new Properties(); props.setProperty("bootstrap.servers", "kafka1:9092"); props.setProperty("group.id", "flink-group");
//数据源配置,是一个kafka消息的消费者 FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>("topic001", new SimpleStringSchema(), props);
//增加时间水位设置类 consumer.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<String> (){ @Override public long extractTimestamp(String element, long previousElementTimestamp) { return JSONHelper.getTimeLongFromRawMessage(element); }
@Nullable @Override public Watermark checkAndGetNextWatermark(String lastElement, long extractedTimestamp) { if (lastElement != null) { return new Watermark(JSONHelper.getTimeLongFromRawMessage(lastElement)); } return null; } });
env.addSource(consumer) //将原始消息转成Tuple2对象,保留用户名称和访问次数(每个消息访问次数为1) .flatMap((FlatMapFunction<String, Tuple2<String, Long>>) (s, collector) -> { SingleMessage singleMessage = JSONHelper.parse(s);
if (null != singleMessage) { collector.collect(new Tuple2<>(singleMessage.getName(), 1L)); } }) //以用户名为key .keyBy(0) //时间窗口为2秒 .timeWindow(Time.seconds(2)) //将每个用户访问次数累加起来 .apply((WindowFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple, TimeWindow>) (tuple, window, input, out) -> { long sum = 0L; for (Tuple2<String, Long> record: input) { sum += record.f1; }
Tuple2<String, Long> result = input.iterator().next(); result.f1 = sum; out.collect(result); }) //输出方式是STDOUT .print();
env.execute("Flink-Kafka demo"); }}
复制代码


  • 在 pom.xml 所在文件夹执行以下命令打包:


mvn clean package -Dmaven.test.skip=true -U
复制代码


  • 打包成功后,会在 target 目录下生成文件**flinkkafkademo-1.0-SNAPSHOT.jar</font>,将此文件提交到 Flinkserver 上,如下图:

  • 点击下图红框中的"Upload"按钮:

  • 如下图,选中刚刚上传的文件,填写类名,再点击"Submit"按钮即可启动 Job:

  • 如下图,在 Overview 页面可见正在运行的任务:

  • 现在所有服务都准备完毕,可以生产消息验证了;

在机器 192.168.1.104 上发起压力测试,产生大量消息

  • 登录部署了 Apache Bench 的机器,执行以下命令:


ab -n 10000 -c 2 http://192.168.1.101:8080/send/Jack/hello
复制代码


192.168.1.101 是消息生产者的 web 服务的地址,上述命令发起了并发数为 2 的压力测试,一共会发起一万次请求;


  • 压力测试完毕后,在 Flink 的 Task Managers 页面的 Stdout 页可以见到实时计算的统计数据,如下图:

  • 至此,Flink 消费 kafka 消息的实战就全部完成了,本次实战从消息产生到实时处理全部实现,希望在您构建基于 kafak 的实时计算环境时可以提供一些参考;

欢迎关注 InfoQ:程序员欣宸

学习路上,你不孤单,欣宸原创一路相伴...

发布于: 刚刚阅读数: 3
用户头像

搜索"程序员欣宸",一起畅游Java宇宙 2018.04.19 加入

前腾讯、前阿里员工,从事Java后台工作,对Docker和Kubernetes充满热爱,所有文章均为作者原创,个人Github:https://github.com/zq2599/blog_demos

评论

发布
暂无评论
Flink消费kafka消息实战_Java_程序员欣宸_InfoQ写作社区