写点什么

kafka SpringBoot

用户头像
Rubble
关注
发布于: 48 分钟前
kafka SpringBoot

启动 zookeeper

./bin/zookeeper-server-start.sh ./config/zookeeper.properties


启动 kafka

./bin/kafka-server-start.sh ./config/server.properties


创建 topic

bin/kafka-topics.sh --create --topic topic1 --bootstrap-server localhost:9092


SpringBoot 引入依赖

<dependency>    <groupId>org.springframework.kafka</groupId>    <artifactId>spring-kafka</artifactId></dependency>
复制代码


application.yml 配置


server:  port: 8080

spring: application: name: paw-kafka #kafka kafka: # 连接kafka的地址,多个地址用逗号分隔 bootstrap-servers: localhost:9092 #producer producer: #若设置大于0的值,客户端会将发送失败的记录重新发送 retries: 0 #当将多个记录被发送到同一个分区时Producer将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。16384是缺省的配置 batch-size: 16384 #Producer 用来缓冲等待被发送到服务器的记录的总字节数,33554432是缺省配置 buffer-memory: 33554432 #关键字的序列化类 key-serializer: org.apache.kafka.common.serialization.StringSerializer #值的序列化类 value-serializer: org.apache.kafka.common.serialization.StringSerializer properties: linger.ms: 1
#cousumer consumer: enable-auto-commit: false auto-commit-interval: 100ms key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer group-id: test-group-id properties: session.timeout.ms: 15000
复制代码


创建生产者、消费者

生产者通过 KafkaTemplate.send 发送消息,send 可以指定 key, 同一 key 发往同一分区,会发给同一消费者保证时序性。消费者通过 @KafkaListener 监听 topic,groupId 指定分组,kafka 将消息发往所有的分组,同一分组会选择一个消费者。

@Service@Slf4jpublic class KfkService {  @Autowired  private KafkaTemplate<Integer,String> kafkaTemplate;

//消费者:监听topic1,groupId1 @KafkaListener(topics = {"topic1"},groupId = "groupId1") public void consumer1(ConsumerRecord<Integer,String> record){ log.info("consumer1 kfk consume message start..."); log.info("consumer1 kfk consume message topic:{},msg:{}",record.topic(),record.value()); log.info("consumer1 kfk consume message end..."); } //消费者:监听topic1,groupId2 @KafkaListener(topics = {"topic1"},groupId = "groupId2") public void consumer3(ConsumerRecord<Integer,String> record){ log.info("consumer3 kfk consume message start..."); log.info("consumer3 kfk consume message topic:{},msg:{}",record.topic(),record.value()); log.info("consumer3 kfk consume message end..."); } //消费者:监听topic1,groupId2 @KafkaListener(topics = {"topic1"},groupId = "groupId2") public void consumer2(ConsumerRecord<Integer,String> record){ log.info("consumer2 kfk consume message start..."); log.info("consumer2 kfk consume message topic:{},msg:{}",record.topic(),record.value()); log.info("consumer2 kfk consume message end..."); }
//生产者 public void sendMsg(String topic , String msg){ log.info("开始发送kfk消息,topic:{},msg:{}",topic,msg);
ListenableFuture<SendResult<Integer, String>> sendMsg = kafkaTemplate.send(topic, msg); //消息确认 sendMsg.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() { @Override public void onFailure(Throwable throwable) { log.error("send error,ex:{},topic:{},msg:{}",throwable,topic,msg); }
@Override public void onSuccess(SendResult<Integer, String> stringStringSendResult) { log.info("send success,topic:{},msg:{}",topic,msg); } }); log.info("kfk send end!"); }
}
复制代码


测试类

@RestControllerpublic class KfkController {    @Autowired    private KfkService kfkService;    @GetMapping("/send")    public String send(){        kfkService.sendMsg("topic1","I am topic msg");        return "success-topic1";    }}
复制代码


gitee: https://gitee.com/tg_seahorse/paw-demos paw-kafka 项目

用户头像

Rubble

关注

还未添加个人签名 2021.06.01 加入

还未添加个人简介

评论

发布
暂无评论
kafka SpringBoot