写点什么

Flink 初体验

用户头像
数据社
关注
发布于: 2020 年 04 月 22 日
Flink初体验

一、flink介绍

Apache Flink 是一个开源的分布式流处理和批处理系统。Flink 的核心是在数据流上提供数据分发、通信、具备容错的分布式计算。同时,Flink 在流处理引擎上构建了批处理引擎,原生支持了迭代计算、内存管理和程序优化。

二、部署环境

操作系统环境:

flink支持Linux, Mac OS X, 和 Windows环境部署,本次部署选择Linux环境部署。

JDK: 要求Java 7或者更高

三、下载软件

  • jdk1.8.0_144

  • flink-1.4.2-bin-hadoop26-scala_2.11.tgz

四、部署步骤

1、JDK安装步骤此处省略,安装后验证下JDK环境

$ java -versionopenjdk version "1.8.0_144"OpenJDK Runtime Environment (build 1.8.0_144-b01)OpenJDK 64-Bit Server VM (build 25.144-b01, mixed mode)

2、安装部署flink 本文介绍flink部署分为两种模式:local,standalone。下面依次介绍这两种模式的部署方式。

找到下载的flink压缩包,进行解压

$ tar -zxvf flink-1.4.2-bin-hadoop26-scala_2.11.tgz

首先是local模式,最为简单。

$ cd flink-1.4.2$ bin/start-local.shStarting job manager

我们可以通过查看日志确认是否启动成功

$ tailf flink-csap-taskmanager-0-XXXX.log2018-05-03 10:07:53,718 INFO org.apache.flink.runtime.filecache.FileCache                 - User file cache uses directory /tmp/flink-dist-cache-4c371de9-0f85-4889-b4d9-4a522641549c2018-05-03 10:07:53,725 INFO org.apache.flink.runtime.taskmanager.TaskManager             - Starting TaskManager actor at akka://flink/user/taskmanager#-524742300.2018-05-03 10:07:53,725 INFO org.apache.flink.runtime.taskmanager.TaskManager             - TaskManager data connection information: 2c358d6f38949f9aae31c5bddb0cc1dc @ LY1F-R021707-VM14.local (dataPort=55234)2018-05-03 10:07:53,726 INFO org.apache.flink.runtime.taskmanager.TaskManager             - TaskManager has 1 task slot(s).2018-05-03 10:07:53,727 INFO org.apache.flink.runtime.taskmanager.TaskManager             - Memory usage stats: [HEAP: 111/1024/1024 MB, NON HEAP: 35/36/-1 MB (used/committed/max)]2018-05-03 10:07:53,730 INFO org.apache.flink.runtime.taskmanager.TaskManager             - Trying to register at JobManager akka.tcp://flink@localhost:6123/user/jobmanager (attempt 1, timeout: 500 milliseconds)2018-05-03 10:07:53,848 INFO org.apache.flink.runtime.taskmanager.TaskManager             - Successful registration at JobManager (akka.tcp://flink@localhost:6123/user/jobmanager), starting network stack and library cache.2018-05-03 10:07:53,851 INFO org.apache.flink.runtime.taskmanager.TaskManager             - Determined BLOB server address to be localhost/127.0.0.1:52382. Starting BLOB cache.2018-05-03 10:07:53,858 INFO org.apache.flink.runtime.blob.PermanentBlobCache             - Created BLOB cache storage directory /tmp/blobStore-c07b9e80-41f0-490f-8126-7008144c4b0b2018-05-03 10:07:53,861 INFO org.apache.flink.runtime.blob.TransientBlobCache             - Created BLOB cache storage directory /tmp/blobStore-e0d1b687-1c47-41c4-b5bc-10ceaa39e778​

JobManager进程将会在8081端口上启动一个WEB页面,我们可以通过浏览器到hostname:8081中查看相关的信息。 可以打开页面查看到相关信息,说明local模式部署是没问题的。



下面来看一下standlone部署方式。

安装JDK,解压压缩包,都是一样的。不一样的是我们要修改解压后的flink配置文件。然后在集群主机间做免密,免密操作方法。

修改conf/flink-conf.yaml,我们将jobmanager.rpc.address的值设置成你master节点的IP地址。此外,我们通过jobmanager.heap.mb和taskmanager.heap.mb配置参数来设置每个节点的JVM能够分配的最大内存。从配置参数名字可以看出,这个参数的单位是MB,如果某些节点拥有比你之前设置的值更多的内存时,我们可以在那个节通过FLINK_TM_HEAP参数类覆盖值钱的设置。

我们需要把所有将要作为worker节点的IP地址存放在conf/slaves文件中,在conf/slaves文件中,每个IP地址必须放在一行,如下:

192.168.0.100192.168.0.101...192.168.0.150

然后将修改好的flink包整理复制到集群各个节点。每个节点flink路径保持一致。然后启动集群

$ bin/start-cluster.sh​

查看日志是否成功。

以上是部署方法,部署成功后,我们来跑一个demo程序,验证一下Flink的流处理功能,对其有个初步的了解。

flink为了更好的让大家理解,已经给大家提供了一些demo代码,demo的jar包可以在/examples/streaming首先看一下demo代码:

object SocketWindowWordCount {​   def main(args: Array[String]) : Unit = {​       // the port to connect to       val port: Int = try {           ParameterTool.fromArgs(args).getInt("port")       } catch {           case e: Exception => {               System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'")               return           }       }​       // get the execution environment       val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment​       // get input data by connecting to the socket       val text = env.socketTextStream("localhost", port, '\n')​       // parse the data, group it, window it, and aggregate the counts       val windowCounts = text           .flatMap { w => w.split("\\s") }           .map { w => WordWithCount(w, 1) }           .keyBy("word")           .timeWindow(Time.seconds(5), Time.seconds(1))           .sum("count")​       // print the results with a single thread, rather than in parallel       windowCounts.print().setParallelism(1)​       env.execute("Socket Window WordCount")   }​   // Data type for words with count   case class WordWithCount(word: String, count: Long)}

这个demo是监控端口,然后对端口输入单子进行wordcount的程序。

运行demo,首先打开一个窗口进行端口数据输入:

$ nc -l 9001hellohellowordworld​

然后运行demo监控端口单词输入统计:

$ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9001

运行后可以看到结果统计:

$ more flink-csap-taskmanager-0-XXX.out.1
hello : 1
hello : 1
word : 1
world : 1



五、IEDA开发环境

1、安装maven

参考博客Maven安装与配置

2、配置IDEA

参考博客如何使用IntelliJ IDEA 配置Maven

3、pom文件设置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>flink</groupId>
<artifactId>flink-dev</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.12</scala.version>
<scala.binary.version>2.11</scala.binary.version>
<hadoop.version>2.7.6</hadoop.version>
<flink.version>1.6.1</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.22</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<!-- <arg>-make:transitive</arg> -->
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.spark.WordCount</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

4、代码示例0

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* Author: qincf
* Date: 2018/11/02
* Desc: 使用flink对指定窗口内的数据进行实时统计,最终把结果打印出来
* 先在目标主机1.1.1.1机器上执行nc -l 9000
*/
public class StreamingWindowWordCount {
public static void main(String[] args) throws Exception {
//定义socket的端口号
int port;
try{
ParameterTool parameterTool = ParameterTool.fromArgs(args);
port = parameterTool.getInt("port");
}catch (Exception e){
System.err.println("没有指定port参数,使用默认值9000");
port = 9000;
}
//获取运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//连接socket获取输入的数据
DataStreamSource<String> text = env.socketTextStream("1.1.1.1", port, "\n");
//计算数据
DataStream<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
String[] splits = value.split("\\s");
for (String word:splits) {
out.collect(new WordWithCount(word,1L));
}
}
})//打平操作,把每行的单词转为<word,count>类型的数据
//针对相同的word数据进行分组
.keyBy("word")
//指定计算数据的窗口大小和滑动窗口大小
.timeWindow(Time.seconds(2),Time.seconds(1))
.sum("count");
//获取可视化JSON
System.out.println(env.getExecutionPlan());
//把数据打印到控制台,使用一个并行度
windowCount.print().setParallelism(1);
//注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行
env.execute("streaming word count");
}
/**
* 主要为了存储单词以及单词出现的次数
*/
public static class WordWithCount{
public String word;
public long count;
public WordWithCount(){}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return "WordWithCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}

5、测试步骤

首先在1.1.1.1机器上使用nc命令模拟数据发送

nc -l 9000

然后在IEDA中运营StreamingWindowWordCount程序 在主机上输入字符

[root@data01]# nc -l 9000
a
a
b
c
d
d

此时运行程序后,IDEA中会打印处结果

PassThroughWindowFunction)","pact":"Operator","contents":"Window(SlidingProcessingTimeWindows(2000, 1000), ProcessingTimeTrigger, SumAggregator, PassThroughWindowFunction)","parallelism":8,"predecessors":[{"id":2,"ship_strategy":"HASH","side":"second"}]}]}
WordWithCount{word='a', count=1}
WordWithCount{word='a', count=2}
WordWithCount{word='b', count=1}
WordWithCount{word='d', count=1}
WordWithCount{word='c', count=1}
WordWithCount{word='c', count=1}
WordWithCount{word='a', count=1}
WordWithCount{word='d', count=1}
WordWithCount{word='b', count=1}

大家会看到,wordcount的结果。 仔细看还有一串json输出,这部分是什么呢? 代码中加了一个打印执行计划的部分:

/获取可视化JSON
System.out.println(env.getExecutionPlan());

Flink提供了一个可视化执行计划的结果,类似Spark的DAG图,把json粘贴到Flink Plan Visualizer可以看到执行计划图:








发布于: 2020 年 04 月 22 日阅读数: 197
用户头像

数据社

关注

微信公众号:数据社 2018.04.26 加入

专注大数据架构,数据仓库,MPP数据库分享,微信公众号数据社

评论

发布
暂无评论
Flink初体验