写点什么

Java Kafka 简单示例

用户头像
关注
发布于: 2021 年 01 月 10 日

Java Kafka 简单示例




简介


    Java kafka 简单代码示例


maven 依赖配置


<!-- kafka --><dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka-clients</artifactId>    <version>0.11.0.0</version></dependency>
复制代码


kakfa 生产和消费者生成


import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.*;
/** * @author lw1243925457 */public class KafkaUtil {
public static KafkaConsumer<String, String> createConsumer(String servers, String topic) { Properties properties = new Properties(); properties.put("bootstrap.servers", servers); properties.put("group.id", "group-1"); properties.put("enable.auto.commit", "false"); properties.put("auto.commit.interval.ms", "1000"); properties.put("auto.offset.reset", "earliest"); properties.put("session.timeout.ms", "30000"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties); kafkaConsumer.subscribe(Arrays.asList(topic)); return kafkaConsumer; }
public static void readMessage(KafkaConsumer<String, String> kafkaConsumer, int timeout) { while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(timeout); for (ConsumerRecord<String, String> record : records) { String value = record.value(); kafkaConsumer.commitAsync(); System.out.println(value); } } }
public static KafkaProducer<String, String> createProducer(String servers) { Properties properties = new Properties(); properties.put("bootstrap.servers", servers); properties.put("acks", "all"); properties.put("retries", 0); properties.put("batch.size", 16384); properties.put("linger.ms", 1); properties.put("buffer.memory", 33554432); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); return new KafkaProducer<String, String>(properties); }
public static void send(KafkaProducer<String, String> producer, String topic, String message) { producer.send(new ProducerRecord<String, String>(topic, message)); }}
复制代码


运行


public class Main {
public static void main(String[] args) { String servers = "localhost:9092,localhost:9093,localhost:9094"; String topic = "TestTopic"; String message = "test";
KafkaProducer<String, String> producer = KafkaUtil.createProducer(servers); KafkaUtil.send(producer, topic, message);
KafkaConsumer<String, String> consumer = KafkaUtil.createConsumer(servers, topic); KafkaUtil.readMessage(consumer, 100); }}
复制代码


使用心得


总是读取最老的消息


    可能是 group-id 的问题,新起一个 group-id 名称


  • earliest:当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,从头开始消费

  • latest:当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,消费新产生的该分区下的数据

  • none:topic 各分区都存在已提交的 offset 时,从 offset 后开始消费;只要有一个分区不存在已提交的 offset,则抛出异常


参考链接


java 实现kafka消息生产者和消费者kafka(三)—Kafka的Java代码示例和配置说明Kafka - 偏移量提交Kafka系列(四)Kafka消费者:从Kafka中读取数据Kafka auto.offset.reset值详解


发布于: 2021 年 01 月 10 日阅读数: 27
用户头像

关注

还未添加个人签名 2018.09.09 加入

代码是门手艺活,也是门艺术活

评论

发布
暂无评论
Java Kafka 简单示例