kafka 的 JavaAPI 操作
一、创建 maven 工程并添加 jar 包
创建 maven 工程并添加以下依赖 jar 包的坐标到 pom.xml
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- java 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
二、生产者代码
1、使用生产者,生产数据
/**
* 订单的生产者代码,
*/
public class OrderProducer {
public static void main(String[] args) throws InterruptedException {
/* 1、连接集群,通过配置文件的方式
* 2、发送数据-topic:order,value
*/
Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092"); props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>
(props);
for (int i = 0; i < 1000; i++) {
// 发送数据 ,需要一个 producerRecord 对象,最少参数 String topic, V value kafkaProducer.send(new ProducerRecord<String, String>("order", "订单信
息!"+i));
Thread.sleep(100);
}
}
}
2、kafka 当中的数据分区
kafka 生产者发送的消息,都是保存在 broker 当中,我们可以自定义分区规则,决定消息发送到哪个 partition 里面去进行保存
查看 ProducerRecord 这个类的源码,就可以看到 kafka 的各种不同分区策略
kafka 当中支持以下四种数据的分区方式:
第一种分区策略,如果既没有指定分区号,也没有指定数据 key,那么就会使用轮询的方式将数据均匀的发送到不同的分区里面去
//ProducerRecord<String, String> producerRecord1 = new ProducerRecord<>("mypartition", "mymessage" + i);
//kafkaProducer.send(producerRecord1);
第二种分区策略 如果没有指定分区号,指定了数据 key,通过 key.hashCode % numPartitions 来计算数据究竟会保存在哪一个分区里面
//注意:如果数据 key,没有变化 key.hashCode % numPartitions = 固定值 所有的数据都会写入到某一个分区里面去
//ProducerRecord<String, String> producerRecord2 = new ProducerRecord<>("mypartition", "mykey", "mymessage" + i);
//kafkaProducer.send(producerRecord2);
第三种分区策略:如果指定了分区号,那么就会将数据直接写入到对应的分区里面去
// ProducerRecord<String, String> producerRecord3 = new ProducerRecord<>("mypartition", 0, "mykey", "mymessage" + i);
// kafkaProducer.send(producerRecord3);
第四种分区策略:自定义分区策略。如果不自定义分区规则,那么会将数据使用轮询的方式均匀的发送到各个分区里面去
kafkaProducer.send(new ProducerRecord<String, String>("mypartition","mymessage"+i));
自定义分区策略
public class KafkaCustomPartitioner implements Partitioner {
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public int partition(String topic, Object arg1, byte[] keyBytes, Object arg3, byte[] arg4, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int partitionNum = partitions.size();
Random random = new Random();
int partition = random.nextInt(partitionNum);
return partition;
}
@Override
public void close() {
}
}
主代码中添加配置
@Test
public void kafkaProducer() throws Exception {
//1、准备配置文件
Properties props = new Properties();
props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("partitioner.class", "cn.itcast.kafka.partitioner.KafkaCustomPartitioner");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//2、创建 KafkaProducer
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
for (int i=0;i<100;i++){
//3、发送数据
kafkaProducer.send(new ProducerRecord<String, String>("testpart","0","value"+i));
}
kafkaProducer.close();
}
三、消费者代码
消费必要条件
消费者要从 kafka Cluster 进行消费数据,必要条件有以下四个
#1、地址
bootstrap.servers=node01:9092
#2、序列化
key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer
#3、主题(topic) 需要制定具体的某个 topic(order)即可。
#4、消费者组 group.id=test
1、 自动提交 offset
消费完成之后,自动提交 offset
/**
* 消费订单数据--- javaben.tojson
*/
public class OrderConsumer {
public static void main(String[] args) {
// 1\连接集群
Properties props = new Properties(); props.put("bootstrap.servers", "hadoop-01:9092"); props.put("group.id", "test");
//以下两行代码 ---消费者自动提交 offset 值
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>
(props);
// 2、发送数据 发送数据需要,订阅下要消费的 topic。 order kafkaConsumer.subscribe(Arrays.asList("order"));
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(100);// jdk queue offer 插入、poll 获取元素。 blockingqueue put 插入原生, take 获取元素
for (ConsumerRecord<String, String> record : consumerRecords) { System.out.println("消费的数据为:" + record.value());
}
}
}
}
2、手动提交 offset
如果 Consumer 在获取数据后,需要加入处理,数据完毕后才确认 offset,需要程序来控制 offset 的确认? 关闭自动提交确认选项
props.put("enable.auto.commit", "false");
手动提交 o?set 值
kafkaConsumer.commitSync();
完整代码如下所示:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
//关闭自动提交确认选项
props.put("enable.auto.commit", "false");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
// 手动提交 offset 值
consumer.commitSync();
buffer.clear();
}
}
3、消费完每个分区之后手动提交 offset
上面的示例使用 commitSync 将所有已接收的记录标记为已提交。大数据培训 在某些情况下,您可能希望通过明确指定偏移量 来更好地控制已提交的记录。 在下面的示例中,在完成处理每个分区中的记录后提交偏移量。
try {
while(running) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() -1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
}
finally { consumer.close();
注意事项:
提交的偏移量应始终是应用程序将读取的下一条消息的偏移量。 因此,在调用 commitSync(偏移量)时,应该 在最后处理的消息的偏移量中添加一个
4、指定分区数据进行消费
1、如果进程正在维护与该分区关联的某种本地状态(如本地磁盘上的键值存储),那么它应该只获取它在磁盘上 维护的分区的记录。
2、如果进程本身具有高可用性,并且如果失败则将重新启动(可能使用 YARN,Mesos 或 AWS 工具等集群管理框 架,或作为流处理框架的一部分)。 在这种情况下,Kafka 不需要检测故障并重新分配分区,因为消耗过程将在另 一台机器上重新启动。
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//consumer.subscribe(Arrays.asList("foo", "bar"));
//手动指定消费指定分区的数据---start
String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1); consumer.assign(Arrays.asList(partition0, partition1));
//手动指定消费指定分区的数据---end
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
注意事项:
1、要使用此模式,您只需使用要使用的分区的完整列表调用 assign(Collection),而不是使用 subscribe 订阅 主题。
2、主题与分区订阅只能二选一
5、重复消费与数据丢失
已经消费的数据对于 kafka 来说,会将消费组里面的 offset 值进行修改,那什么时候进行修改了?是在数据消费 完成之后,比如在控制台打印完后自动提交;
提交过程:是通过 kafka 将 offset 进行移动到下个 message 所处的 offset 的位置。
拿到数据后,存储到 hbase 中或者 mysql 中,如果 hbase 或者 mysql 在这个时候连接不上,就会抛出异常,如果在处理数据的时候已经进行了提交,那么 kafka 伤的 offset 值已经进行了修改了,但是 hbase 或者 mysql 中没有数据,这个时候就会出现数据丢失。
什么时候提交 offset 值?在 Consumer 将数据处理完成之后,再来进行 offset 的修改提交。默认情况下 offset 是 自动提交,需要修改为手动提交 offset 值。
如果在处理代码中正常处理了,但是在提交 offset 请求的时候,没有连接到 kafka 或者出现了故障,那么该次修 改 offset 的请求是失败的,那么下次在进行读取同一个分区中的数据时,会从已经处理掉的 offset 值再进行处理一 次,那么在 hbase 中或者 mysql 中就会产生两条一样的数据,也就是数据重复
6、consumer 消费者消费数据流程
流程描述
Consumer 连接指定的 Topic partition 所在 leader broker,采用 pull 方式从 kafkalogs 中获取消息。对于不同的消费模式,会将 offset 保存在不同的地方
官网关于 high level API 以及 low level API 的简介
http://kafka.apache.org/0100/documentation.html#impl_consumer
高阶 API(High Level API)
kafka 消费者高阶 API 简单;隐藏 Consumer 与 Broker 细节;相关信息保存在 zookeeper 中。
/* create a connection to the cluster */
ConsumerConnector connector = Consumer.create(consumerConfig);
interface ConsumerConnector {
/**
This method is used to get a list of KafkaStreams, which are iterators over
MessageAndMetadata objects from which you can obtain messages and their
associated metadata (currently only topic).
Input: a map of <topic, #streams>
Output: a map of <topic, list of message streams>
*/
public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap);
/**
You can also obtain a list of KafkaStreams, that iterate over messages
from topics that match a TopicFilter. (A TopicFilter encapsulates a
whitelist or a blacklist which is a standard Java regex.)
*/
public List<KafkaStream> createMessageStreamsByFilter( TopicFilter topicFilter, int numStreams);
/* Commit the offsets of all messages consumed so far. */ public commitOffsets()
/* Shut down the connector */ public shutdown()
}
说明:大部分的操作都已经封装好了,比如:当前消费到哪个位置下了,但是不够灵活(工作过程推荐使用)
低级 API(Low Level API)
kafka 消费者低级 API 非常灵活;需要自己负责维护连接 Controller Broker。保存 offset,Consumer Partition 对应 关系。
class SimpleConsumer {
/* Send fetch request to a broker and get back a set of messages. */
public ByteBufferMessageSet fetch(FetchRequest request);
/* Send a list of fetch requests to a broker and get back a response set. */ public MultiFetchResponse multifetch(List<FetchRequest> fetches);
/**
Get a list of valid offsets (up to maxSize) before the given time.
The result is a list of offsets, in descending order.
@param time: time in millisecs,
if set to OffsetRequest$.MODULE$.LATEST_TIME(), get from the latest
offset available. if set to OffsetRequest$.MODULE$.EARLIEST_TIME(), get from the earliest
available. public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
* offset
*/
说明:没有进行包装,所有的操作有用户决定,如自己的保存某一个分区下的记录,你当前消费到哪个位置。
四、kafka Streams API 开发
需求:使用 StreamAPI 获取 test 这个 topic 当中的数据,然后将数据全部转为大写,写入到 test2 这个 topic 当中去
第一步:创建一个 topic
node01 服务器使用以下命令来常见一个 topic 名称为 test2
cd /export/servers/kafka_2.11-1.0.0/
bin/kafka-topics.sh --create --partitions 3 --replication-factor 2 --topic test2 --zookeeper node01:2181,node02:2181,node03:2181
第二步:开发 StreamAPI
public class StreamAPI {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
KStreamBuilder builder = new KStreamBuilder();
builder.stream("test").mapValues(line -> line.toString().toUpperCase()).to("test2");
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
}
}
第三步:生产数据
node01 执行以下命令,向 test 这个 topic 当中生产数据
cd /export/servers/kafka_2.11-1.0.0
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test
第四步:消费数据
node02 执行一下命令消费 test2 这个 topic 当中的数据
cd /export/servers/kafka_2.11-1.0.0
bin/kafka-console-consumer.sh --from-beginning --topic test2 --zookeeper node01:2181,node02:2181,node03:2181
评论