Flink 消费 Kafka
发布于: 2021 年 05 月 17 日
kafka 常用命令
# 启动Kafka
nohup /export/servers/kafka_2.11-1.0.0/bin/kafka-server-start.sh /export/servers/kafka_2.11-1.0.0/config/server.properties > /dev/null 2>&1 &
# 创建Topic
/export/servers/kafka_2.11-1.0.0/bin/kafka-topics.sh --zookeeper node01:2181 --create --replication-factor 2 --partitions 3 --topic bj31
# 查看Topic信息
/export/servers/kafka_2.11-1.0.0/bin/kafka-topics.sh --describe --zookeeper node01:2181 --topic bj31
# 生产者
/export/servers/kafka_2.11-1.0.0/bin/kafka-console-producer.sh --broker-list node01:9092 --topic bj31
# 消费者
/export/servers/kafka_2.11-1.0.0/bin/kafka-console-consumer.sh --zookeeper node01:2181 --topic bj31
# 获取消费组的信息
/export/servers/kafka_2.11-1.0.0/bin/kafka-consumer-groups.sh --group bigdata031 --bootstrap-server node01:9092 --describe
复制代码
Flink 消费 Kafka 数据
使用方式:创建一个FlinkKafkaConsumer09
def main(args: Array[String]): Unit = {
//1.获取流处理的执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//定义topic和kafka相关配置
val topic = "bj31"
val props = new Properties()
//添加kafka 相关配置,节点信息.groupid...
props.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092")
props.setProperty("group.id", "bigdata31")
//创建消费者
val counsumer = new FlinkKafkaConsumer09[String](topic, new SimpleStringSchema(),props)
//加载Kafka数据源
val source: DataStream[String] = env.addSource(counsumer)
//打印
source.print()
//执行
env.execute()
}
复制代码
Kafka 的不同的消费方式
//创建消费者
val consumer = new FlinkKafkaConsumer011[String](topic, new SimpleStringSchema(),props)
//从头开始消费
// consumer.setStartFromEarliest()
//从尾部开始消费
// consumer.setStartFromLatest()
//从当前消费组的最后一条开始消费,09版本并不支持这个操作.
// consumer.setStartFromGroupOffsets()
val map = new util.HashMap[KafkaTopicPartition, lang.Long]()
//设置指定的分区以及偏移量
map.put(new KafkaTopicPartition("bj31", 0), 5L)
map.put(new KafkaTopicPartition("bj31", 1), 5L)
map.put(new KafkaTopicPartition("bj31", 2), 5L)
consumer.setStartFromSpecificOffsets(map)
复制代码
Kafka 动态分区检测
利用 FlinkKafka 提供的动态检测功能,能够发现分区的变化,实时调整
//动态感知kafka主题分区的增加 单位毫秒
properties.setProperty("flink.partition-discovery.interval-millis", "5000");
复制代码
Kafka 分区和 Flink 并行度关系
一般我们都会让 Kafka 的分区数和 Flink 的并行数相等,让每个线程都能够消费数据.
从 MySQL 中加载数据
object MySQLSourceDemo {
def main(args: Array[String]): Unit = {
//1.获取流处理的执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//加载数据源 用,号分割的数据.
val source: DataStream[String] = env.addSource(new MySQLSource)
}
}
class MySQLSource extends RichSourceFunction[String] {
var resultSet: ResultSet = _
/**
* 在open中加载数据源
* @param parameters
*/
override def open(parameters: Configuration): Unit = {
//jdbc
//注册驱动
var className ="com.jdbc.mysql.Driver"
Class.forName(className)
val conn: Connection = DriverManager.getConnection("jdbc:mysql://node01:3306/db", "root", "123456")
//构建一个预处理对象
val preparedStatement: PreparedStatement = conn.prepareStatement("select * from user")
//执行
resultSet = preparedStatement.executeQuery()
}
/**
* 获取数据
* @param ctx
*/
override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
//遍历resultSet获取数据
while (resultSet.next()){
//获取值
val username: String = resultSet.getString("username")
val address: String = resultSet.getString("address")
val tel: String = resultSet.getString("tel")
ctx.collect(username + "," + address + "," + tel)
}
}
override def cancel(): Unit = ???
}
复制代码
划线
评论
复制
发布于: 2021 年 05 月 17 日阅读数: 10
版权声明: 本文为 InfoQ 作者【大数据技术指南】的原创文章。
原文链接:【http://xie.infoq.cn/article/447a51161556948d0829648a0】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
大数据技术指南
关注
还未添加个人签名 2021.03.07 加入
还未添加个人简介
评论