写点什么

Flink 常见数据源

发布于: 2021 年 05 月 18 日
Flink常见数据源

从套接字获取数据

    val source: DataStream[String] = env.socketTextStream("node01", 9999)
复制代码

从文件中获取数据

 //从指定的文件中加载数据    val source: DataStream[String] = env.readTextFile("hello.txt")
复制代码

自定义数据源

单并行度的数据源

如果设置并行度超过 1,会报错.


开发步骤:


1. 先定义一个自定义的类继承SourceFunction.
复制代码


  1. 实现里面的 run 方法的逻辑,通过调用 ctx.collect()方法来产生数据


/**  * 自定义数据源加载数据  */object MySourceDemo {
def main(args: Array[String]): Unit = { //1.获取流处理的执行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //加载自定义数据源 val source: DataStream[Int] = env.addSource(new MySourceFunction) //打印 source.print() //执行程序 env.execute() }}//自定义数据源class MySourceFunction extends SourceFunction[Int] { /** * 自定义数据源,通过这个方法来生成我们的数据 * @param ctx 采集器. */ override def run(ctx: SourceFunction.SourceContext[Int]): Unit = { var i = 0 while (true) { //使用采集器,将数据采集走.外部source就可以获取到数据 ctx.collect(i) i = i + 1 Thread.sleep(500) } }
/** * 如果需要取消,调用此方法. */ override def cancel(): Unit = ???}
复制代码

多并行度数据源

我们可以通过让自定义数据源继承 ParallelSourceFunction 的方式来实现多并行度的数据源.


//加载多并行度数据源    val source: DataStream[Int] = env.addSource(new MyParallelSourceFunction).setParallelism(2)
复制代码


//自定义多并行度数据源class MyParallelSourceFunction extends ParallelSourceFunction[Int] {
复制代码


发布于: 2021 年 05 月 18 日阅读数: 10
用户头像

还未添加个人签名 2021.03.07 加入

还未添加个人简介

评论

发布
暂无评论
Flink常见数据源