写点什么

Flink 获取 kafka 中每条消息对应的 topic

用户头像
shengjk1
关注
发布于: 2020 年 04 月 23 日

工作中经常会需要使用 Flink 来消费 kafka 中的数据并获取每条 msg 对应 topic 的信息,现总结如下:



  1. 首先自定义个 KafkaDeserializationSchema

public class CustomKafkaDeserializationSchema implements KafkaDeserializationSchema<Tuple2<String, String>> {
@Override
//nextElement 是否表示流的最后一条元素,我们要设置为 false ,因为我们需要 msg 源源不断的被消费
public boolean isEndOfStream(Tuple2<String, String> nextElement) {
return false;
}
@Override
// 反序列化 kafka 的 record,我们直接返回一个 tuple2<kafkaTopicName,kafkaMsgValue>
public Tuple2<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
return new Tuple2<>(record.topic(), new String(record.value(), "UTF-8"));
}
@Override
//告诉 Flink 我输入的数据类型, 方便 Flink 的类型推断
public TypeInformation<Tuple2<String, String>> getProducedType() {
return new TupleTypeInfo<>(BasicTypeInfo.STRINGTYPEINFO, BasicTypeInfo.STRINGTYPEINFO);
}
}
  1. 使用自定义的 KafkaDeserializationSchema 进行消费

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer<Tuple2<String, String>> kafkaConsumer = new FlinkKafkaConsumer<>("test", new CustomKafkaDeserializationSchema(), properties);
kafkaConsumer.setStartFromEarliest();
env.addSource(kafkaConsumer).flatMap(new FlatMapFunction<Tuple2<String, String>, Object>() {
@Override
public void flatMap(Tuple2<String, String> value, Collector<Object> out) throws Exception {
System.out.println("topic==== " + value.f0);
}
});
// execute program
env.execute("Flink Streaming Java API Skeleton");
}
  1. 具体的data types

https://blog.csdn.net/jsjsjs1789/article/details/104903409

发布于: 2020 年 04 月 23 日阅读数: 66
用户头像

shengjk1

关注

还未添加个人签名 2018.04.26 加入

博客 https://blog.csdn.net/jsjsjs1789

评论

发布
暂无评论
Flink获取kafka中每条消息对应的topic