如何使用 Docker 内的 kafka 服务
欢迎访问我的 GitHub
这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos
基于 Docker 可以很轻松的搭建一个 kafka 集群,其他机器上的应用如何使用这个 kafka 集群服务呢?本次实战就来解决这个问题。
基本情况
整个实战环境一共有三台机器,各自的职责如下图所示:
整个环境的部署情况如下图:
版本信息
操作系统:Centos7
docker:17.03.2-ce
docker-compose:1.23.2
kafka:0.11.0.3
zookeeper:3.4.9
JDK:1.8.0_191
spring boot:1.5.9.RELEASE
spring-kafka:1.3.8.RELEASE
重点介绍
本次实战有几处重点需要注意:
spring-kafka 和 kafka 的版本匹配问题,请关注官方文档:https://spring.io/projects/spring-kafka
kafka 的 kafka 的 advertised.listeners 配置,应用通过此配置来连接 broker;
应用所在服务器要配置 host,才能连接到 broker;
接下来开始实战吧;
配置 host
为了让生产和消费消息的应用能够连接 kafka 成功,需要配置应用所在服务器的/etc/hosts 文件,增加以下一行内容:
192.168.1.101 是 docker 所在机器的 IP 地址;
请注意,生产和消费消息的应用所在服务器都要做上述配置;
可能有的读者在此会有疑问:为什么要配置 host 呢?我把 kafka 配置的 advertised.listeners 配置成 kafka 的 IP 地址不就行了么?这样的配置我试过,但是用 kafka-console-producer.sh 和 kafka-console-consumer.sh 连接 kafka 的时候会报错"LEADER_NOT_AVAILABLE"。
在 docker 上部署 kafka
在 docker 机器上编写 docker-compose.yml 文件,内容如下:
上述配置中有两处需要注意:
第一,KAFKA_ADVERTISED_LISTENERS 的配置,这个参数会写到 kafka 配置的 advertised.listeners 这一项中,应用会用来连接 broker;
第二,KAFKA_CREATE_TOPICS 的配置,表示容器启动时会创建名为"topic001"的主题,并且 partition 等于 2,副本为 1;
在 docker-compose.yml 所在目录执行命令 docker-compose up -d,启动容器;
执行命令 docker ps,可见容器情况,kafka 的容器名为 temp_kafka1_1:
执行以下命令可以查看 topic001 的基本情况:
看到的信息如下:
源码下载
接下来的实战是编写生产消息和消费消息的两个应用的源码,您可以选择直接从 GitHub 下载这两个工程的源码,地址和链接信息如下表所示:
这个 git 项目中有多个文件夹,本章源码在 kafka01103consumer 和 kafka01103producer 这两个文件夹下,如下图红框所示:
接下来开始编码:
开发生产消息的应用
创建一个 maven 工程,pom.xml 内容如下:
再次强调 spring-kafka 版本和 kafka 版本的匹配很重要;
配置文件 application.properties 内容:
发送消息的业务代码只有一个 MessageController 类:
编码完成后,在 pom.xml 所在目录执行命令 mvn clean package -U -DskipTests,即可在 target 目录下发现文件 kafka01103producer-0.0.1-SNAPSHOT.jar,将此文件复制到 192.168.1.102 机器上;
登录 192.168.1.102,在文件 kafka01103producer-0.0.1-SNAPSHOT.jar 所在目录执行命令 java -jar kafka01103producer-0.0.1-SNAPSHOT.jar,即可启动生产消息的应用;
开发消费消息的应用
创建一个 maven 工程,pom.xml 内容如下:
再次强调 spring-kafka 版本和 kafka 版本的匹配很重要;
配置文件 application.properties 内容:
消费消息的业务代码只有一个 Consumer 类,收到消息后,会将内容内容和消息的详情打印出来:
编码完成后,在 pom.xml 所在目录执行命令 mvn clean package -U -DskipTests,即可在 target 目录下发现文件 kafka01103consumer-0.0.1-SNAPSHOT.jar,将此文件复制到 192.168.1.104 机器上;
登录 192.168.1.104,在文件 kafka01103consumer-0.0.1-SNAPSHOT.jar 所在目录执行命令 java -jar kafka01103consumer-0.0.1-SNAPSHOT.jar,即可启动消费消息的应用,控制台输出如下:
上述内容显示了当前应用消费了两个 partition;
再启动一个同样的应用,这样每个应用负责一个 parititon 的消费,做法是在文件 kafka01103consumer-0.0.1-SNAPSHOT.jar 所在目录执行命令 java -jar kafka01103consumer-0.0.1-SNAPSHOT.jar --server.port=8081,看看控制台的输出:
可见新的进程消费的是 0 号 partition,此时再去看看先启动的进程的控制台,见到了新的日志,显示该进程只消费 1 号 pairtition 了:
验证消息的生产和消费
在浏览器输入以下地址:192.168.1.102:8080/send/Tom/hello
浏览器显示返回的结果是:send message to [Tom] success (2019-01-01 13:58:08),表示操作成功;
去检查两个消费者进程的控制台,发现其中一个成功的消费了消息,如下:
至此,外部应用使用基于 Docker 的 kafa 服务实战就完成了,如果您也在用 Docker 部署 kafka 服务,给外部应用使用,希望本文能给您提供一些参考;
欢迎关注 InfoQ:程序员欣宸
版权声明: 本文为 InfoQ 作者【程序员欣宸】的原创文章。
原文链接:【http://xie.infoq.cn/article/713af824848990ff147fb2e47】。文章转载请联系作者。
评论