写点什么

从零开始搭建 Kafka+SpringBoot 分布式消息系统

用户头像
小Q
关注
发布于: 2020 年 12 月 03 日

前言


由于 kafka 强依赖于 zookeeper,所以需先搭建好 zookeeper 集群。由于 zookeeper 是由 java 编写的,需运行在 jvm 上,所以首先应具备 java 环境。(ps:默认您的 centos 系统可联网,本教程就不教配置 ip 什么的了)(ps2:没有 wget 的先装一下:yum install wget)(ps3:人啊,就是要条理。东边放一点,西边放一点,过段时间就不知道自己装在哪里了。本教程所有下载均放在/usr/local 目录下)(ps4:kafka 可能有内置 zookeeper,感觉可以越过 zookeeper 教程,但是这里也配置出来了。我没试过)


文章首发公众号:Java 架构师联盟,每日更新技术好文


一、配置 jdk


因为 oracle 公司不允许直接通过 wget 下载官网上的 jdk 包。所以你直接 wget 以下地址下载下来的是一个只有 5k 的网页文件而已,并不是需要的 jdk 包。(垄断地位就是任性)。(请通过 java -version 判断是否自带 jdk,我的没带)


1、官网下载


下面是 jdk8 的官方下载地址:


https://www.oracle.com/technetwork/java/javase/downloads/java-archive-javase8u211-later-5573849.html
复制代码



2、上传解压


这里通过 xftp 上传到服务器指定位置:/usr/local



对压缩文件进行解压:


tar -zxvf jdk-8u221-linux-x64.tar.gz
复制代码

对解压后的文件夹进行改名:


mv jdk1.8.0_221 jdk1.8
复制代码

3、配置环境变量


vim /etc/profile
复制代码


#java environment
复制代码


export JAVA_HOME=/usr/local/jdk1.8
复制代码


export CLASSPATH=.:${JAVA_HOME}/jre/lib/rt.jar:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/lib/tools.jar
复制代码


export PATH=$PATH:${JAVA_HOME}/bin
复制代码

操作之后的界面如下:



运行命令使环境生效


source /etc/profile
复制代码



二、搭建 zookeeper 集群


1、下载 zookeeper


创建 zookeeper 目录,在该目录下进行下载:


mkdir /usr/local/zookeeper
复制代码

这一步如果出现连接被拒绝时可多试几次,我就是第二次请求才成功的。


wget http://archive.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
复制代码



等待下载完成之后解压:


tar -zxvf zookeeper-3.4.6.tar.gz
复制代码



重命名为 zookeeper1


mv zookeeper-3.4.6 zookeeper1cp -r zookeeper1 zookeeper2cp -r zookeeper1 zookeeper3
复制代码

2、创建 data、logs 文件夹


在 zookeeper1 目录下创建



在 data 目录下新建 myid 文件。内容为 1




3、修改 zoo.cfg 文件


cd /usr/local/zookeeper/zookeeper1/conf/cp zoo_sample.cfg zoo.cfg
复制代码

进行过上面两步之后,有 zoo.cfg 文件了,现在修改内容为:



dataDir=/usr/local/zookeeper/zookeeper1/datadataLogDir=/usr/local/zookeeper/zookeeper1/logsserver.1=192.168.233.11:2888:3888server.2=192.168.233.11:2889:3889server.3=192.168.233.11:2890:3890
复制代码

4、搭建 zookeeper2


首先,复制改名。


cd /usr/local/zookeeper/cp -r zookeeper1 zookeeper2
复制代码

然后修改具体的某些配置:


vim zookeeper2/conf/zoo.cfg
复制代码

将下图三个地方 1 改成 2



vim zookeeper2/data/myid
复制代码

同时将 myid 中的值改成 2



5、搭建 zookeeper3


同上,复制改名


cp -r zookeeper1 zookeeper3
复制代码



vim zookeeper3/conf/zoo.cfg
复制代码

修改为 3



vim zookeeper3/data/myid
复制代码

修改为 3



6、测试 zookeeper 集群


cd /usr/local/zookeeper/zookeeper1/bin/
复制代码

由于启动所需代码比较多,这里简单写了一个启动脚本:


vim start
复制代码

start 的内容如下


cd /usr/local/zookeeper/zookeeper1/bin/./zkServer.sh start ../conf/zoo.cfgcd /usr/local/zookeeper/zookeeper2/bin/./zkServer.sh start ../conf/zoo.cfgcd /usr/local/zookeeper/zookeeper3/bin/./zkServer.sh start ../conf/zoo.cfg
复制代码

下面是连接脚本:


vim login
复制代码

login 内容如下:


./zkCli.sh -server 192.168.233.11:2181,192.168.233.11:2182,192.168.233.11:2183
复制代码

脚本编写完成,接下来启动:


sh startsh login
复制代码

启动集群成功,如下图:



这里 zookeeper 就告一段落了,由于 zookeeper 占用着输入窗口,这里可以在 xshell 右键标签,新建 ssh 渠道。然后就可以在新窗口继续操作 kafka 了!



三、搭建 kafka 集群


1、下载 kafka


首先创建 kafka 目录:


mkdir /usr/local/kafka
复制代码

然后在该目录下载


cd /usr/local/kafka/wget https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz
复制代码

下载成功之后解压:


tar -zxvf kafka_2.11-1.1.0.tgz
复制代码

2、修改集群配置


首先进入 conf 目录下:


cd /usr/local/kafka/kafka_2.11-1.1.0/config
复制代码

修改 server.properties 修改内容:


broker.id=0log.dirs=/tmp/kafka-logslisteners=PLAINTEXT://192.168.233.11:9092
复制代码

复制两份 server.properties


cp server.properties server2.propertiescp server.properties server3.properties
复制代码

修改 server2.properties


vim server2.properties
复制代码

修改主要内容为:


broker.id=1log.dirs=/tmp/kafka-logs1listeners=PLAINTEXT://192.168.233.11:9093
复制代码

如上,修改 server3.properties 修改内容为:


broker.id=2log.dirs=/tmp/kafka-logs2listeners=PLAINTEXT://192.168.233.11:9094
复制代码

3、启动 kafka


这里还是在 bin 目录编写一个脚本:


cd ../bin/vim start
复制代码

脚本内容为:


./kafka-server-start.sh ../config/server.properties &./kafka-server-start.sh ../config/server2.properties &./kafka-server-start.sh ../config/server3.properties &
复制代码

通过 jps 命令可以查看到,共启动了 3 个 kafka。



4、创建 Topic


cd /usr/local/kafka/kafka_2.11-1.1.0bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
复制代码



kafka 打印了几条日志



在启动的 zookeeper 中可以通过命令查询到这条 topic!


ls /brokers/topics
复制代码



查看 kafka 状态


bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
复制代码



可以看到此时有三个节点 1 , 2 , 0


Leader 是 1 ,因为分区只有一个 所以在 0 上面,Replicas:主从备份是 1,2,0,ISR(in-sync):现在存活的信息也是 1,2,0


5、启动生产者


bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
复制代码

由于不能按删除,不能按左右键去调整,所以语句有些乱啊。em…



6、启动消费者


bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
复制代码

可以看出,启动消费者之后就会自动消费。



在生产者又造了一条。



消费者自动捕获成功!



四、集成 springboot


先贴一张 kafka 兼容性目录:



不满足的话启动 springboot 的时候会抛异常的!!!ps:该走的岔路我都走了 o(╥﹏╥)o(我的 kafka-clients 是 1.1.0,spring-kafka 是 2.2.2,中间那列暂时不用管)



回归正题,搞了两个小时,终于搞好了,想哭…遇到的问题基本就是 jar 版本不匹配。上面的步骤我也都会相应的去修改,争取大家按照本教程一遍过!!!


1、pom 文件


<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>    <parent>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-parent</artifactId>        <version>2.1.1.RELEASE</version>        <relativePath/> <!-- lookup parent from repository -->    </parent>    <groupId>com.gzky</groupId>    <artifactId>study</artifactId>    <version>0.0.1-SNAPSHOT</version>    <name>study</name>    <description>Demo project for Spring Boot</description>
<properties> <java.version>1.8</java.version> </properties>
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-redis</artifactId> <version>1.3.8.RELEASE</version> </dependency>
<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.0.RELEASE</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency>
</dependencies>
<build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
</project>
复制代码

pom 文件中,重点是下面这两个版本。


<parent>       <groupId>org.springframework.boot</groupId>       <artifactId>spring-boot-starter-parent</artifactId>       <version>2.1.1.RELEASE</version>       <relativePath/> <!-- lookup parent from repository -->   </parent><dependency>      <groupId>org.springframework.kafka</groupId>      <artifactId>spring-kafka</artifactId>      <version>2.2.0.RELEASE</version></dependency>
复制代码

2、application.yml


spring:  redis:    cluster:      #设置key的生存时间,当key过期时,它会被自动删除;      expire-seconds: 120      #设置命令的执行时间,如果超过这个时间,则报错;      command-timeout: 5000      #设置redis集群的节点信息,其中namenode为域名解析,通过解析域名来获取相应的地址;      nodes: 192.168.233.11:9001,192.168.233.11:9002,192.168.233.11:9003,192.168.233.11:9004,192.168.233.11:9005,192.168.233.11:9006  kafka:    # 指定kafka 代理地址,可以多个    bootstrap-servers: 192.168.233.11:9092,192.168.233.11:9093,192.168.233.11:9094    producer:      retries: 0      # 每次批量发送消息的数量      batch-size: 16384      buffer-memory: 33554432      # 指定消息key和消息体的编解码方式      key-serializer: org.apache.kafka.common.serialization.StringSerializer      value-serializer: org.apache.kafka.common.serialization.StringSerializer    consumer:      # 指定默认消费者group id      group-id: test-group      auto-offset-reset: earliest      enable-auto-commit: true      auto-commit-interval: 100      # 指定消息key和消息体的编解码方式      key-serializer: org.apache.kafka.common.serialization.StringSerializer      value-serializer: org.apache.kafka.common.serialization.StringSerializer
server: port: 8085 servlet: #context-path: /redis context-path: /kafka
复制代码

没有配置 Redis 的可以把 Redis 部分删掉,也就是下图:想学习配置 Redis 集群的可以参考:《Redis 集群 redis-cluster 的搭建及集成 springboot》



3、生产者


package com.gzky.study.utils;
import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Component;
/** * kafka生产者工具类 * * @author biws * @date 2019/12/17 **/@Componentpublic class KfkaProducer {
private static Logger logger = LoggerFactory.getLogger(KfkaProducer.class);
@Autowired private KafkaTemplate<String, String> kafkaTemplate;
/** * 生产数据 * @param str 具体数据 */ public void send(String str) { logger.info("生产数据:" + str); kafkaTemplate.send("testTopic", str); }}
复制代码

4、消费者


package com.gzky.study.utils;
import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;
/** * kafka消费者监听消息 * * @author biws * @date 2019/12/17 **/@Componentpublic class KafkaConsumerListener {
private static Logger logger = LoggerFactory.getLogger(KafkaConsumerListener.class);
@KafkaListener(topics = "testTopic") public void onMessage(String str){ //insert(str);//这里为插入数据库代码 logger.info("监听到:" + str); System.out.println("监听到:" + str); }
}
复制代码

5、对外接口


package com.gzky.study.controller;
import com.gzky.study.utils.KfkaProducer;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.*;
/** * kafka对外接口 * * @author biws * @date 2019/12/17 **/@RestControllerpublic class KafkaController {
@Autowired KfkaProducer kfkaProducer;
/** * 生产消息 * @param str * @return */ @RequestMapping(value = "/sendKafkaWithTestTopic",method = RequestMethod.GET) @ResponseBody public boolean sendTopic(@RequestParam String str){ kfkaProducer.send(str); return true; }}
复制代码

6、postman 测试


这里首先应该在服务器启动监听器(kafka 根目录),下面命令必须是具体的服务器 ip,不能是 localhost,是我踩过的坑:


推荐此处重启一下集群关闭 kafka 命令:


cd /usr/local/kafka/kafka_2.11-1.1.0/bin./kafka-server-stop.sh ../config/server.properties &./kafka-server-stop.sh ../config/server2.properties &./kafka-server-stop.sh ../config/server3.properties &
复制代码

此处应该 jps 看一下,等待所有的 kafka 都关闭(关不掉的 kill 掉),再重新启动 kafka:


./kafka-server-start.sh ../config/server.properties &./kafka-server-start.sh ../config/server2.properties &./kafka-server-start.sh ../config/server3.properties &
复制代码

等待 kafka 启动成功后,启动消费者监听端口:


cd /usr/local/kafka/kafka_2.11-1.1.0bin/kafka-console-consumer.sh --bootstrap-server 192.168.233.11:9092 --from-beginning --topic testTopic
复制代码



曾经我乱输的测试信息全部被监听过来了!


启动 springboot 服务



然后用 postman 生产消息:



然后享受成果,服务器端监听成功。



项目中也监听成功!


发布于: 2020 年 12 月 03 日阅读数: 54
用户头像

小Q

关注

还未添加个人签名 2020.06.30 加入

小Q 公众号:Java架构师联盟 作者多年从事一线互联网Java开发的学习历程技术汇总,旨在为大家提供一个清晰详细的学习教程,侧重点更倾向编写Java核心内容。如果能为您提供帮助,请给予支持(关注、点赞、分享)!

评论

发布
暂无评论
从零开始搭建Kafka+SpringBoot分布式消息系统