写点什么

如何使用 Docker 内的 kafka 服务

作者:程序员欣宸
  • 2022 年 7 月 26 日
  • 本文字数:8343 字

    阅读完需:约 27 分钟

如何使用Docker内的kafka服务

欢迎访问我的 GitHub

这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos


  • 基于 Docker 可以很轻松的搭建一个 kafka 集群,其他机器上的应用如何使用这个 kafka 集群服务呢?本次实战就来解决这个问题。

基本情况

  • 整个实战环境一共有三台机器,各自的职责如下图所示:



  • 整个环境的部署情况如下图:

版本信息

  1. 操作系统:Centos7

  2. docker:17.03.2-ce

  3. docker-compose:1.23.2

  4. kafka:0.11.0.3

  5. zookeeper:3.4.9

  6. JDK:1.8.0_191

  7. spring boot:1.5.9.RELEASE

  8. spring-kafka:1.3.8.RELEASE

重点介绍

  • 本次实战有几处重点需要注意:


  1. spring-kafka 和 kafka 的版本匹配问题,请关注官方文档:https://spring.io/projects/spring-kafka

  2. kafka 的 kafka 的 advertised.listeners 配置,应用通过此配置来连接 broker;

  3. 应用所在服务器要配置 host,才能连接到 broker;


  • 接下来开始实战吧;

配置 host

  • 为了让生产和消费消息的应用能够连接 kafka 成功,需要配置应用所在服务器的/etc/hosts 文件,增加以下一行内容:


192.168.1.101 kafka1
复制代码


  • 192.168.1.101 是 docker 所在机器的 IP 地址;

  • 请注意,生产和消费消息的应用所在服务器都要做上述配置;

  • 可能有的读者在此会有疑问:为什么要配置 host 呢?我把 kafka 配置的 advertised.listeners 配置成 kafka 的 IP 地址不就行了么?这样的配置我试过,但是用 kafka-console-producer.sh 和 kafka-console-consumer.sh 连接 kafka 的时候会报错"LEADER_NOT_AVAILABLE"。

在 docker 上部署 kafka

  • 在 docker 机器上编写 docker-compose.yml 文件,内容如下:


version: '2'services:  zookeeper:    image: wurstmeister/zookeeper    ports:      - "2181:2181"  kafka1:    image: wurstmeister/kafka:2.11-0.11.0.3    ports:      - "9092:9092"    environment:      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092      KAFKA_LISTENERS: PLAINTEXT://:9092      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181      KAFKA_CREATE_TOPICS: "topic001:2:1"    volumes:      - /var/run/docker.sock:/var/run/docker.sock
复制代码


  • 上述配置中有两处需要注意:

  • 第一,KAFKA_ADVERTISED_LISTENERS 的配置,这个参数会写到 kafka 配置的 advertised.listeners 这一项中,应用会用来连接 broker;

  • 第二,KAFKA_CREATE_TOPICS 的配置,表示容器启动时会创建名为"topic001"的主题,并且 partition 等于 2,副本为 1;

  • 在 docker-compose.yml 所在目录执行命令 docker-compose up -d,启动容器;

  • 执行命令 docker ps,可见容器情况,kafka 的容器名为 temp_kafka1_1:


[root@hedy temp]# docker psCONTAINER ID        IMAGE                              COMMAND                  CREATED             STATUS              PORTS                                                NAMESba5374d6245c        wurstmeister/zookeeper             "/bin/sh -c '/usr/..."   About an hour ago   Up About an hour    22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   temp_zookeeper_12c58f46bb772        wurstmeister/kafka:2.11-0.11.0.3   "start-kafka.sh"         About an hour ago   Up About an hour    0.0.0.0:9092->9092/tcp                               temp_kafka1_1
复制代码


  • 执行以下命令可以查看 topic001 的基本情况:


docker exec temp_kafka1_1 \kafka-topics.sh \--describe \--topic topic001 \--zookeeper zookeeper:2181
复制代码


  • 看到的信息如下:


Topic:topic001  PartitionCount:2  ReplicationFactor:1  Configs:  Topic: topic001  Partition: 0  Leader: 1001  Replicas: 1001  Isr: 1001  Topic: topic001  Partition: 1  Leader: 1001  Replicas: 1001  Isr: 1001
复制代码

源码下载

  • 接下来的实战是编写生产消息和消费消息的两个应用的源码,您可以选择直接从 GitHub 下载这两个工程的源码,地址和链接信息如下表所示:



  • 这个 git 项目中有多个文件夹,本章源码在 kafka01103consumer 和 kafka01103producer 这两个文件夹下,如下图红框所示:


  • 接下来开始编码:

开发生产消息的应用

  • 创建一个 maven 工程,pom.xml 内容如下:


<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>    <parent>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-parent</artifactId>        <version>1.5.9.RELEASE</version>        <relativePath/> <!-- lookup parent from repository -->    </parent>    <groupId>com.bolingcavalry</groupId>    <artifactId>kafka01103producer</artifactId>    <version>0.0.1-SNAPSHOT</version>    <name>kafka01103producer</name>    <description>Demo project for Spring Boot</description>
<properties> <java.version>1.8</java.version> </properties>
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.3.8.RELEASE</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.28</version> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
<build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build></project>
复制代码


  • 再次强调 spring-kafka 版本和 kafka 版本的匹配很重要;

  • 配置文件 application.properties 内容:


#kafka相关配置spring.kafka.bootstrap-servers=kafka1:9092#设置一个默认组spring.kafka.consumer.group-id=0#key-value序列化反序列化spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer#每次批量发送消息的数量spring.kafka.producer.batch-size=65536spring.kafka.producer.buffer-memory=524288
复制代码


  • 发送消息的业务代码只有一个 MessageController 类:


package com.bolingcavalry.kafka01103producer.controller;
import com.alibaba.fastjson.JSONObject;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.util.concurrent.ListenableFuture;import org.springframework.web.bind.annotation.*;
import java.text.SimpleDateFormat;import java.util.Date;import java.util.UUID;
/** * @Description: 接收web请求,发送消息到kafka * @author: willzhao E-mail: zq2599@gmail.com * @date: 2019/1/1 11:44 */@RestControllerpublic class MessageController {
@Autowired private KafkaTemplate kafkaTemplate;
@RequestMapping(value = "/send/{name}/{message}", method = RequestMethod.GET) public @ResponseBody String send(@PathVariable("name") final String name, @PathVariable("message") final String message) { SimpleDateFormat simpleDateFormat=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String timeStr = simpleDateFormat.format(new Date());
JSONObject jsonObject = new JSONObject(); jsonObject.put("name", name); jsonObject.put("message", message); jsonObject.put("time", timeStr); jsonObject.put("timeLong", System.currentTimeMillis()); jsonObject.put("bizID", UUID.randomUUID());
String sendMessage = jsonObject.toJSONString();
ListenableFuture future = kafkaTemplate.send("topic001", sendMessage); future.addCallback(o -> System.out.println("send message success : " + sendMessage), throwable -> System.out.println("send message fail : " + sendMessage));
return "send message to [" + name + "] success (" + timeStr + ")"; }}
复制代码


  • 编码完成后,在 pom.xml 所在目录执行命令 mvn clean package -U -DskipTests,即可在 target 目录下发现文件 kafka01103producer-0.0.1-SNAPSHOT.jar,将此文件复制到 192.168.1.102 机器上;

  • 登录 192.168.1.102,在文件 kafka01103producer-0.0.1-SNAPSHOT.jar 所在目录执行命令 java -jar kafka01103producer-0.0.1-SNAPSHOT.jar,即可启动生产消息的应用;

开发消费消息的应用

  • 创建一个 maven 工程,pom.xml 内容如下:


<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>    <parent>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-parent</artifactId>        <version>1.5.9.RELEASE</version>        <relativePath/> <!-- lookup parent from repository -->    </parent>    <groupId>com.bolingcavalry</groupId>    <artifactId>kafka01103consumer</artifactId>    <version>0.0.1-SNAPSHOT</version>    <name>kafka01103consumer</name>    <description>Demo project for Spring Boot</description>
<properties> <java.version>1.8</java.version> </properties>
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.3.8.RELEASE</version> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
<build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build></project>
复制代码


  • 再次强调 spring-kafka 版本和 kafka 版本的匹配很重要;

  • 配置文件 application.properties 内容:


#kafka相关配置spring.kafka.bootstrap-servers=192.168.1.101:9092#设置一个默认组spring.kafka.consumer.group-id=0#key-value序列化反序列化spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer#每次批量发送消息的数量spring.kafka.producer.batch-size=65536spring.kafka.producer.buffer-memory=524288
复制代码


  • 消费消息的业务代码只有一个 Consumer 类,收到消息后,会将内容内容和消息的详情打印出来:


@Componentpublic class Consumer {    @KafkaListener(topics = {"topic001"})    public void listen(ConsumerRecord<?, ?> record) {        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println("----------------- record =" + record); System.out.println("------------------ message =" + message); } }}
复制代码


  • 编码完成后,在 pom.xml 所在目录执行命令 mvn clean package -U -DskipTests,即可在 target 目录下发现文件 kafka01103consumer-0.0.1-SNAPSHOT.jar,将此文件复制到 192.168.1.104 机器上;

  • 登录 192.168.1.104,在文件 kafka01103consumer-0.0.1-SNAPSHOT.jar 所在目录执行命令 java -jar kafka01103consumer-0.0.1-SNAPSHOT.jar,即可启动消费消息的应用,控制台输出如下:


2019-01-01 13:41:41.747  INFO 1422 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 0.11.0.22019-01-01 13:41:41.748  INFO 1422 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : 73be1e1168f91ee22019-01-01 13:41:41.787  INFO 1422 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService 2019-01-01 13:41:41.912  INFO 1422 --- [           main] c.b.k.Kafka01103consumerApplication      : Started Kafka01103consumerApplication in 11.876 seconds (JVM running for 16.06)2019-01-01 13:41:42.699  INFO 1422 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Discovered coordinator kafka1:9092 (id: 2147482646 rack: null) for group 0.2019-01-01 13:41:42.721  INFO 1422 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [] for group 02019-01-01 13:41:42.723  INFO 1422 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]2019-01-01 13:41:42.724  INFO 1422 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group 02019-01-01 13:41:42.782  INFO 1422 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group 0 with generation 52019-01-01 13:41:42.788  INFO 1422 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [topic001-1, topic001-0] for group 02019-01-01 13:41:42.805  INFO 1422 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[topic001-1, topic001-0]2019-01-01 13:48:00.938  INFO 1422 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [topic001-1, topic001-0] for group 02019-01-01 13:48:00.939  INFO 1422 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[topic001-1, topic001-0]
复制代码


  • 上述内容显示了当前应用消费了两个 partition;

  • 再启动一个同样的应用,这样每个应用负责一个 parititon 的消费,做法是在文件 kafka01103consumer-0.0.1-SNAPSHOT.jar 所在目录执行命令 java -jar kafka01103consumer-0.0.1-SNAPSHOT.jar --server.port=8081,看看控制台的输出:


2019-01-01 13:47:58.068  INFO 1460 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 0.11.0.22019-01-01 13:47:58.069  INFO 1460 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : 73be1e1168f91ee22019-01-01 13:47:58.103  INFO 1460 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService 2019-01-01 13:47:58.226  INFO 1460 --- [           main] c.b.k.Kafka01103consumerApplication      : Started Kafka01103consumerApplication in 11.513 seconds (JVM running for 14.442)2019-01-01 13:47:59.007  INFO 1460 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Discovered coordinator kafka1:9092 (id: 2147482646 rack: null) for group 0.2019-01-01 13:47:59.030  INFO 1460 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [] for group 02019-01-01 13:47:59.031  INFO 1460 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]2019-01-01 13:47:59.032  INFO 1460 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group 02019-01-01 13:48:00.967  INFO 1460 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group 0 with generation 62019-01-01 13:48:00.985  INFO 1460 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [topic001-0] for group 02019-01-01 13:48:01.015  INFO 1460 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[topic001-0]
复制代码


  • 可见新的进程消费的是 0 号 partition,此时再去看看先启动的进程的控制台,见到了新的日志,显示该进程只消费 1 号 pairtition 了:


2019-01-01 13:48:00.955  INFO 1422 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group 0 with generation 62019-01-01 13:48:00.960  INFO 1422 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [topic001-1] for group 02019-01-01 13:48:00.967  INFO 1422 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[topic001-1]
复制代码

验证消息的生产和消费

  • 在浏览器输入以下地址:192.168.1.102:8080/send/Tom/hello

  • 浏览器显示返回的结果是:send message to [Tom] success (2019-01-01 13:58:08),表示操作成功;

  • 去检查两个消费者进程的控制台,发现其中一个成功的消费了消息,如下:


----------------- record =ConsumerRecord(topic = topic001, partition = 0, offset = 0, CreateTime = 1546351226016, serialized key size = -1, serialized value size = 133, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"timeLong":1546351225804,"name":"Tom","bizID":"4f1b6cf6-78d4-455d-b530-3956723a074f","time":"2019-01-01 22:00:25","message":"hello"})------------------ message ={"timeLong":1546351225804,"name":"Tom","bizID":"4f1b6cf6-78d4-455d-b530-3956723a074f","time":"2019-01-01 22:00:25","message":"hello"}
复制代码


  • 至此,外部应用使用基于 Docker 的 kafa 服务实战就完成了,如果您也在用 Docker 部署 kafka 服务,给外部应用使用,希望本文能给您提供一些参考;

欢迎关注 InfoQ:程序员欣宸

学习路上,你不孤单,欣宸原创一路相伴...

发布于: 4 小时前阅读数: 12
用户头像

搜索"程序员欣宸",一起畅游Java宇宙 2018.04.19 加入

前腾讯、前阿里员工,从事Java后台工作,对Docker和Kubernetes充满热爱,所有文章均为作者原创,个人Github:https://github.com/zq2599/blog_demos

评论

发布
暂无评论
如何使用Docker内的kafka服务_Java_程序员欣宸_InfoQ写作社区