public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer<Tuple2<String, String>> kafkaConsumer = new FlinkKafkaConsumer<>("test", new CustomKafkaDeserializationSchema(), properties);
kafkaConsumer.setStartFromEarliest();
env.addSource(kafkaConsumer).flatMap(new FlatMapFunction<Tuple2<String, String>, Object>() {
@Override
public void flatMap(Tuple2<String, String> value, Collector<Object> out) throws Exception {
System.out.println("topic==== " + value.f0);
}
});
env.execute("Flink Streaming Java API Skeleton");
}
评论