对于 kafka 真的是又爱又恨,作为架构和大数据两个方面的通用者, 在这个数据量称雄的时代,越来越起到至关重要的作用,在和同事进行交流的时候,kafka 在开发的过程中如何使用能起到最大的效果成为话题之一,那没有用过 kafka 的你,又该怎么整,没关系,我的粉丝怎么可以有这种尴尬,这里我从环境准备开始搭建一套 kafka 开发的 api,旧版本和新版本的代码联合使用,看完不说你成为大神,起码你在跟同事交流的时候不至于窘迫,但是,一定要自己去实践一下啊
个人公众号:Java 架构师联盟,每日更新技术好文
环境准备
1)在 eclipse 中创建一个 java 工程
2)在工程的根目录创建一个 lib 文件夹
3)解压 kafka 安装包,将安装包 libs 目录下的 jar 包拷贝到工程的 lib 目录下,并 build path。
4)启动 zk 和 kafka 集群,在 kafka 集群中打开一个消费者
[root@hadoop102 kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic first
Kafka 生产者 Java API
创建生产者(过时的 API)
import java.util.Properties;
复制代码
import kafka.javaapi.producer.Producer;
复制代码
import kafka.producer.KeyedMessage;
复制代码
import kafka.producer.ProducerConfig;
复制代码
public class OldProducer {
复制代码
@SuppressWarnings("deprecation")
复制代码
public static void main(String[] args) {
复制代码
Properties properties = new Properties();
复制代码
properties.put("metadata.broker.list", "hadoop102:9092");
复制代码
properties.put("request.required.acks", "1");
复制代码
properties.put("serializer.class", "kafka.serializer.StringEncoder");
复制代码
Producer<Integer, String> producer = new Producer<Integer,String>(new ProducerConfig(properties));
复制代码
KeyedMessage<Integer, String> message = new KeyedMessage<Integer, String>("first", "hello world");
复制代码
4.2.2 创建生产者(新 API**)
import java.util.Properties;
复制代码
import org.apache.kafka.clients.producer.KafkaProducer;
复制代码
import org.apache.kafka.clients.producer.Producer;
复制代码
import org.apache.kafka.clients.producer.ProducerRecord;
复制代码
public class NewProducer {
复制代码
public static void main(String[] args) {
复制代码
Properties props = new Properties();
复制代码
props.put("bootstrap.servers", "hadoop103:9092");
复制代码
props.put("acks", "all");
复制代码
props.put("batch.size", 16384);
复制代码
props.put("linger.ms", 1);
复制代码
props.put("buffer.memory", 33554432);
复制代码
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
复制代码
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
复制代码
Producer<String, String> producer = new KafkaProducer<>(props);
复制代码
for (int i = 0; i < 50; i++) {
复制代码
producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), "hello world-" + i));
复制代码
创建生产者带回调函数(新 API)
import java.util.Properties;
复制代码
import org.apache.kafka.clients.producer.Callback;
复制代码
import org.apache.kafka.clients.producer.KafkaProducer;
复制代码
import org.apache.kafka.clients.producer.ProducerRecord;
复制代码
import org.apache.kafka.clients.producer.RecordMetadata;
复制代码
public class CallBackProducer {
复制代码
public static void main(String[] args) {
复制代码
Properties props = new Properties();
复制代码
props.put("bootstrap.servers", "hadoop103:9092");
复制代码
props.put("acks", "all");
复制代码
props.put("batch.size", 16384);
复制代码
props.put("linger.ms", 1);
复制代码
props.put("buffer.memory", 33554432);
复制代码
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
复制代码
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
复制代码
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
复制代码
for (int i = 0; i < 50; i++) {
复制代码
kafkaProducer.send(new ProducerRecord<String, String>("first", "hello" + i), new Callback() {
复制代码
public void onCompletion(RecordMetadata metadata, Exception exception) {
复制代码
System.err.println(metadata.partition() + "---" + metadata.offset());
复制代码
4.2.4 自定义分区生产者
0)需求:将所有数据存储到 topic 的第 0 号分区上
1)定义一个类实现 Partitioner 接口,重写里面的方法(过时 API)
import kafka.producer.Partitioner;
复制代码
public class CustomPartitioner implements Partitioner {
复制代码
public CustomPartitioner() {
复制代码
public int partition(Object key, int numPartitions) {
复制代码
2)自定义分区(新 API)
import org.apache.kafka.clients.producer.Partitioner;
复制代码
import org.apache.kafka.common.Cluster;
复制代码
public class CustomPartitioner implements Partitioner {
复制代码
public void configure(Map<String, ?> configs) {
复制代码
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
复制代码
3)在代码中调用
import java.util.Properties;
复制代码
import org.apache.kafka.clients.producer.KafkaProducer;
复制代码
import org.apache.kafka.clients.producer.Producer;
复制代码
import org.apache.kafka.clients.producer.ProducerRecord;
复制代码
public class PartitionerProducer {
复制代码
public static void main(String[] args) {
复制代码
Properties props = new Properties();
复制代码
props.put("bootstrap.servers", "hadoop103:9092");
复制代码
props.put("acks", "all");
复制代码
props.put("batch.size", 16384);
复制代码
props.put("linger.ms", 1);
复制代码
props.put("buffer.memory", 33554432);
复制代码
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
复制代码
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
复制代码
props.put("partitioner.class", "com.root.kafka.CustomPartitioner");
复制代码
Producer<String, String> producer = new KafkaProducer<>(props);
复制代码
producer.send(new ProducerRecord<String, String>("first", "1", "root"));
复制代码
4)测试
(1)在 hadoop102 上监控/opt/module/kafka/logs/目录下 first 主题 3 个分区的 log 日志动态变化情况
[root@hadoop102 first-0]$ tail -f 00000000000000000000.log
复制代码
[root@hadoop102 first-1]$ tail -f 00000000000000000000.log
复制代码
[root@hadoop102 first-2]$ tail -f 00000000000000000000.log
复制代码
(2)发现数据都存储到指定的分区了。
Kafka 消费者 Java API
0)在控制台创建发送者
[root@hadoop104 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first
>hello world
1)创建消费者(过时 API)
package com.root.kafka.consume;
复制代码
import java.util.HashMap;
复制代码
import java.util.Properties;
复制代码
import kafka.consumer.Consumer;
复制代码
import kafka.consumer.ConsumerConfig;
复制代码
import kafka.consumer.ConsumerIterator;
复制代码
import kafka.consumer.KafkaStream;
复制代码
import kafka.javaapi.consumer.ConsumerConnector;
复制代码
public class CustomConsumer {
复制代码
@SuppressWarnings("deprecation")
复制代码
public static void main(String[] args) {
复制代码
Properties properties = new Properties();
复制代码
properties.put("zookeeper.connect", "hadoop102:2181");
复制代码
properties.put("group.id", "g1");
复制代码
properties.put("zookeeper.session.timeout.ms", "500");
复制代码
properties.put("zookeeper.sync.time.ms", "250");
复制代码
properties.put("auto.commit.interval.ms", "1000");
复制代码
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
复制代码
HashMap<String, Integer> topicCount = new HashMap<>();
复制代码
topicCount.put("first", 1);
复制代码
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCount);
复制代码
KafkaStream<byte[], byte[]> stream = consumerMap.get("first").get(0);
复制代码
ConsumerIterator<byte[], byte[]> it = stream.iterator();
复制代码
System.out.println(new String(it.next().message()));
复制代码
2)官方提供案例(自动维护消费情况)(新 API)
package com.root.kafka.consume;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class CustomNewConsumer {
public static void main(String[] args) {
Properties props = new Properties();
// 定义kakfa 服务的地址,不需要将所有broker指定上
props.put("bootstrap.servers", "hadoop102:9092");
// 制定consumer group
props.put("group.id", "test");
// 是否自动确认offset
props.put("enable.auto.commit", "true");
// 自动确认offset的时间间隔
props.put("auto.commit.interval.ms", "1000");
// key的序列化类
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// value的序列化类
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 定义consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 消费者订阅的topic, 可同时订阅多个
consumer.subscribe(Arrays.asList("first", "second","third"));
while (true) {
// 读取数据,读取超时时间为100ms
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}`
复制代码
Kafka producer 拦截器(interceptor)
拦截器原理
Producer 拦截器(interceptor)是在 Kafka 0.10 版本被引入的,主要用于实现 clients 端的定制化控制逻辑。
对于 producer 而言,interceptor 使得用户在消息发送前以及 producer 回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer 允许用户指定多个 interceptor 按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor 的实现接口是 org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:
(1)configure(configs)
获取配置信息和初始化数据时调用。
(2)onSend(ProducerRecord):
该方法封装进 KafkaProducer.send 方法中,即它运行在用户主线程中。Producer 确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的 topic 和分区,否则会影响目标分区的计算
(3)onAcknowledgement(RecordMetadata, Exception):
该方法会在消息被应答或消息发送失败时调用,并且通常都是在 producer 回调逻辑触发之前。onAcknowledgement 运行在 producer 的 IO 线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢 producer 的消息发送效率
(4)close:
关闭 interceptor,主要用于执行一些资源清理工作
如前所述,interceptor 可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个 interceptor,则 producer 将按照指定顺序调用它们,并仅仅是捕获每个 interceptor 可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。
拦截器案例
1)需求:
实现一个简单的双 interceptor 组成的拦截链。第一个 interceptor 会在消息发送前将时间戳信息加到消息 value 的最前部;第二个 interceptor 会在消息发送后更新成功发送消息数或失败发送消息数。
2)案例实操
(1)增加时间戳拦截器
package com.root.kafka.interceptor;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class TimeInterceptor implements ProducerInterceptor<String, String> {
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
// 创建一个新的record,把时间戳写入消息体的最前部
return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
System.currentTimeMillis() + "," + record.value().toString());
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
}
复制代码
(2)统计发送消息成功和发送失败消息数,并在 producer 关闭时打印这两个计数器
package com.root.kafka.interceptor;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class CounterInterceptor implements ProducerInterceptor<String, String>{
private int errorCounter = 0;
private int successCounter = 0;
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
// 统计成功和失败的次数
if (exception == null) {
successCounter++;
} else {
errorCounter++;
}
}
@Override
public void close() {
// 保存结果
System.out.println("Successful sent: " + successCounter);
System.out.println("Failed sent: " + errorCounter);
}
}
复制代码
(3)producer 主程序
package com.root.kafka.interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
public class InterceptorProducer {
public static void main(String[] args) throws Exception {
// 1 设置配置信息
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop102:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 2 构建拦截链
List<String> interceptors = new ArrayList<>();
interceptors.add("com.root.kafka.interceptor.TimeInterceptor"); interceptors.add("com.root.kafka.interceptor.CounterInterceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
String topic = "first";
Producer<String, String> producer = new KafkaProducer<>(props);
// 3 发送消息
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "message" + i);
producer.send(record);
}
// 4 一定要关闭producer,这样才会调用interceptor的close方法
producer.close();
}
}
复制代码
3)测试
(1)在 kafka 上启动消费者,然后运行客户端 java 程序。
[root@hadoop102 kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --from-beginning --topic first
1501904047034,message0
1501904047225,message1
1501904047230,message2
1501904047234,message3
1501904047236,message4
1501904047240,message5
1501904047243,message6
1501904047246,message7
1501904047249,message8
1501904047252,message9
复制代码
(2)观察 java 平台控制台输出数据如下:
Successful sent: 10
Failed sent: 0
复制代码
kafka Streams
概述
Kafka Streams
Kafka Streams。Apache Kafka 开源项目的一个组成部分。是一个功能强大,易于使用的库。用于在 Kafka 上构建高可分布式、拓展性,容错的应用程序。
Kafka Streams 特点
1)功能强大
高扩展性,弹性,容错
2)轻量级
无需专门的集群
一个库,而不是框架
3)完全集成
100%的 Kafka 0.10.0 版本兼容
易于集成到现有的应用程序
4)实时性
毫秒级延迟
并非微批处理
窗口允许乱序数据
允许迟到数据
为什么要有 Kafka Stream
当前已经有非常多的流式处理系统,最知名且应用最多的开源流式处理系统有 Spark Streaming 和 Apache Storm。Apache Storm 发展多年,应用广泛,提供记录级别的处理能力,当前也支持 SQL on Stream。而 Spark Streaming 基于 Apache Spark,可以非常方便与图计算,SQL 处理等集成,功能强大,对于熟悉其它 Spark 应用开发的用户而言使用门槛低。另外,目前主流的 Hadoop 发行版,如 Cloudera 和 Hortonworks,都集成了 Apache Storm 和 Apache Spark,使得部署更容易。
既然 Apache Spark 与 Apache Storm 拥有如此多的优势,那为何还需要 Kafka Stream 呢?主要有如下原因。
第一,Spark 和 Storm 都是流式处理框架,而 Kafka Stream 提供的是一个基于 Kafka 的流式处理类库。框架要求开发者按照特定的方式去开发逻辑部分,供框架调用。开发者很难了解框架的具体运行方式,从而使得调试成本高,并且使用受限。而 Kafka Stream 作为流式处理类库,直接提供具体的类给开发者调用,整个应用的运行方式主要由开发者控制,方便使用和调试。
第二,虽然 Cloudera 与 Hortonworks 方便了 Storm 和 Spark 的部署,但是这些框架的部署仍然相对复杂。而 Kafka Stream 作为类库,可以非常方便的嵌入应用程序中,它对应用的打包和部署基本没有任何要求。
第三,就流式处理系统而言,基本都支持 Kafka 作为数据源。例如 Storm 具有专门的 kafka-spout,而 Spark 也提供专门的 spark-streaming-kafka 模块。事实上,Kafka 基本上是主流的流式处理系统的标准数据源。换言之,大部分流式系统中都已部署了 Kafka,此时使用 Kafka Stream 的成本非常低。
第四,使用 Storm 或 Spark Streaming 时,需要为框架本身的进程预留资源,如 Storm 的 supervisor 和 Spark on YARN 的 node manager。即使对于应用实例而言,框架本身也会占用部分资源,如 Spark Streaming 需要为 shuffle 和 storage 预留内存。但是 Kafka 作为类库不占用系统资源。
第五,由于 Kafka 本身提供数据持久化,因此 Kafka Stream 提供滚动部署和滚动升级以及重新计算的能力。
第六,由于 Kafka Consumer Rebalance 机制,Kafka Stream 可以在线动态调整并行度。
Kafka Stream 数据清洗案例
0)需求:
实时处理单词带有”>>>”前缀的内容。例如输入”root>>>ximenqing”,最终处理成“ximenqing”
1)需求分析:
2)案例实操
(1)创建一个工程,并添加 jar 包
(2)创建主类
package com.root.kafka.stream;
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;
public class Application {
public static void main(String[] args) {
// 定义输入的topic
String from = "first";
// 定义输出的topic
String to = "second";
// 设置参数
Properties settings = new Properties();
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
StreamsConfig config = new StreamsConfig(settings);
// 构建拓扑
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("SOURCE", from)
.addProcessor("PROCESS", new ProcessorSupplier<byte[], byte[]>() {
@Override
public Processor<byte[], byte[]> get() {
// 具体分析处理
return new LogProcessor();
}
}, "SOURCE")
.addSink("SINK", to, "PROCESS");
// 创建kafka stream
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
}
}
复制代码
(3)具体业务处理
package com.root.kafka.stream;
复制代码
import org.apache.kafka.streams.processor.Processor;
复制代码
import org.apache.kafka.streams.processor.ProcessorContext;
复制代码
public class LogProcessor implements Processor<byte[], byte[]> {
复制代码
private ProcessorContext context;
复制代码
public void init(ProcessorContext context) {
复制代码
public void process(byte[] key, byte[] value) {
复制代码
String input = new String(value);
复制代码
if (input.contains(">>>")) {
复制代码
input = input.split(">>>")[1].trim();
复制代码
context.forward("logProcessor".getBytes(), input.getBytes());
复制代码
context.forward("logProcessor".getBytes(), input.getBytes());
复制代码
public void punctuate(long timestamp) {
复制代码
(4)运行程序
(5)在 hadoop104 上启动生产者
[root@hadoop104 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first
复制代码
(6)在 hadoop103 上启动消费者
[root@hadoop103 kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --from-beginning --topic second
复制代码
评论