写点什么

Flink 消费 Kafka

发布于: 2021 年 05 月 17 日
Flink消费Kafka

kafka 常用命令

# 启动Kafkanohup /export/servers/kafka_2.11-1.0.0/bin/kafka-server-start.sh /export/servers/kafka_2.11-1.0.0/config/server.properties > /dev/null 2>&1 &# 创建Topic/export/servers/kafka_2.11-1.0.0/bin/kafka-topics.sh --zookeeper node01:2181 --create --replication-factor 2 --partitions 3 --topic bj31# 查看Topic信息/export/servers/kafka_2.11-1.0.0/bin/kafka-topics.sh --describe --zookeeper node01:2181 --topic bj31# 生产者/export/servers/kafka_2.11-1.0.0/bin/kafka-console-producer.sh --broker-list node01:9092 --topic bj31# 消费者/export/servers/kafka_2.11-1.0.0/bin/kafka-console-consumer.sh --zookeeper node01:2181 --topic bj31# 获取消费组的信息/export/servers/kafka_2.11-1.0.0/bin/kafka-consumer-groups.sh --group bigdata031 --bootstrap-server node01:9092 --describe
复制代码

Flink 消费 Kafka 数据

使用方式:创建一个FlinkKafkaConsumer09


  def main(args: Array[String]): Unit = {    //1.获取流处理的执行环境    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//定义topic和kafka相关配置 val topic = "bj31" val props = new Properties() //添加kafka 相关配置,节点信息.groupid... props.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092") props.setProperty("group.id", "bigdata31") //创建消费者 val counsumer = new FlinkKafkaConsumer09[String](topic, new SimpleStringSchema(),props) //加载Kafka数据源 val source: DataStream[String] = env.addSource(counsumer) //打印 source.print() //执行 env.execute()
}
复制代码

Kafka 的不同的消费方式

    //创建消费者    val consumer = new FlinkKafkaConsumer011[String](topic, new SimpleStringSchema(),props)    //从头开始消费//    consumer.setStartFromEarliest()    //从尾部开始消费//    consumer.setStartFromLatest()    //从当前消费组的最后一条开始消费,09版本并不支持这个操作.//    consumer.setStartFromGroupOffsets()    val map = new util.HashMap[KafkaTopicPartition, lang.Long]()    //设置指定的分区以及偏移量    map.put(new KafkaTopicPartition("bj31", 0), 5L)    map.put(new KafkaTopicPartition("bj31", 1), 5L)    map.put(new KafkaTopicPartition("bj31", 2), 5L)    consumer.setStartFromSpecificOffsets(map)
复制代码

Kafka 动态分区检测

利用 FlinkKafka 提供的动态检测功能,能够发现分区的变化,实时调整


//动态感知kafka主题分区的增加 单位毫秒properties.setProperty("flink.partition-discovery.interval-millis", "5000");
复制代码

Kafka 分区和 Flink 并行度关系

一般我们都会让 Kafka 的分区数和 Flink 的并行数相等,让每个线程都能够消费数据.

从 MySQL 中加载数据

object MySQLSourceDemo {
def main(args: Array[String]): Unit = { //1.获取流处理的执行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//加载数据源 用,号分割的数据. val source: DataStream[String] = env.addSource(new MySQLSource) }}
class MySQLSource extends RichSourceFunction[String] { var resultSet: ResultSet = _ /** * 在open中加载数据源 * @param parameters */ override def open(parameters: Configuration): Unit = { //jdbc //注册驱动 var className ="com.jdbc.mysql.Driver" Class.forName(className) val conn: Connection = DriverManager.getConnection("jdbc:mysql://node01:3306/db", "root", "123456") //构建一个预处理对象 val preparedStatement: PreparedStatement = conn.prepareStatement("select * from user") //执行 resultSet = preparedStatement.executeQuery() }
/** * 获取数据 * @param ctx */ override def run(ctx: SourceFunction.SourceContext[String]): Unit = { //遍历resultSet获取数据 while (resultSet.next()){ //获取值 val username: String = resultSet.getString("username") val address: String = resultSet.getString("address") val tel: String = resultSet.getString("tel") ctx.collect(username + "," + address + "," + tel) } }
override def cancel(): Unit = ???}
复制代码


发布于: 2021 年 05 月 17 日阅读数: 10
用户头像

还未添加个人签名 2021.03.07 加入

还未添加个人简介

评论

发布
暂无评论
Flink消费Kafka