写点什么

第一个 spark 应用开发详解 (java 版)

作者:程序员欣宸
  • 2022 年 8 月 12 日
    广东
  • 本文字数:4921 字

    阅读完需:约 16 分钟

第一个spark应用开发详解(java版)

欢迎访问我的 GitHub

这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos


  • WordCount 是大数据学习最好的入门 demo,今天就一起开发 java 版本的 WordCount,然后提交到 Spark2.3.2 环境运行;

版本信息

  1. 操作系统:CentOS7;

  2. JDK:1.8.0_191;

  3. Spark:2.3.3;

  4. Scala:2.11.12;

  5. Hadoop:2.7.7;

  6. Maven:3.5.0;

关于 hadoop 环境

关于 spark 环境

  • 本次实战用到了 spark2.3.3,关于 spark 集群的部署,请参考《部署spark2.2集群(standalone模式)》;请注意,由于 2.3.3 版本的 spark-core 的 jar 包不支持 scala2.12,所以在部署 spark 的时候,scala 版本请使用 2.11;

关于本次实战开发的应用

  • 本次实战开发的应用是经典的 WorkCount,也就是指定一个文本文件,统计其中每个单词出现的次数,再取出现次数最多的 10 个,打印出来,并保存在 hdfs 文件中;

本次统计单词数用到的文本

  • 本次用到的 txt 文件,是我在网上找到的 pdf 版本的《乱世佳人》英文版,然后导出为 txt,读者您可以自行选择适合的 txt 文件来测试;

  • 在 hdfs 服务所在的机器上执行以下命令,创建 input 文件夹:


~/hadoop-2.7.7/bin/hdfs dfs -mkdir /input
复制代码


  • 在 hdfs 服务所在的机器上执行以下命令,创建 output 文件夹:


~/hadoop-2.7.7/bin/hdfs dfs -mkdir /output
复制代码


  • 把本次用到的 text 文件上传到 hdfs 服务所在的机器,再执行以下命令将文本文件上传到 hdfs 的/input 文件夹下:


~/hadoop-2.7.7/bin/hdfs dfs -put ~/GoneWiththeWind.txt /input
复制代码

源码下载

  • 接下来详细讲述应用的编码过程,如果您不想自己写代码,也可以在 GitHub 下载完整的应用源码,地址和链接信息如下表所示:



  • 这个 git 项目中有多个文件夹,本章源码在 sparkwordcount 这个文件夹下,如下图红框所示:

开发应用

  • 基于 maven 创建一个 java 应用 sparkwordcount,pom.xml 的内容如下:


<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>
<groupId>com.bolingcavalry</groupId> <artifactId>sparkwordcount</artifactId> <version>1.0-SNAPSHOT</version>
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties>
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.3.2</version> </dependency> </dependencies>
<build> <sourceDirectory>src/main/java</sourceDirectory> <testSourceDirectory>src/test/java</testSourceDirectory> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass></mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> <version>1.2.1</version> <executions> <execution> <goals> <goal>exec</goal> </goals> </execution> </executions> <configuration> <executable>java</executable> <includeProjectDependencies>false</includeProjectDependencies> <includePluginDependencies>false</includePluginDependencies> <classpathScope>compile</classpathScope> <mainClass>com.bolingcavalry.sparkwordcount.WordCount</mainClass> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build></project>
复制代码


  • 创建 WrodCount 类,关键代码位置都有注释,就不再赘述了:


package com.bolingcavalry.sparkwordcount;
import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import scala.Tuple2;
import java.text.SimpleDateFormat;import java.util.Arrays;import java.util.Date;import java.util.List;
/** * @Description: spark的WordCount实战 * @author: willzhao E-mail: zq2599@gmail.com * @date: 2019/2/8 17:21 */public class WordCount {
public static void main(String[] args) { String hdfsHost = args[0]; String hdfsPort = args[1]; String textFileName = args[2];
SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
String hdfsBasePath = "hdfs://" + hdfsHost + ":" + hdfsPort; //文本文件的hdfs路径 String inputPath = hdfsBasePath + "/input/" + textFileName;
//输出结果文件的hdfs路径 String outputPath = hdfsBasePath + "/output/" + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
System.out.println("input path : " + inputPath);
System.out.println("output path : " + outputPath);
//导入文件 JavaRDD<String> textFile = javaSparkContext.textFile(inputPath);
JavaPairRDD<String, Integer> counts = textFile //每一行都分割成单词,返回后组成一个大集合 .flatMap(s -> Arrays.asList(s.split(" ")).iterator()) //key是单词,value是1 .mapToPair(word -> new Tuple2<>(word, 1)) //基于key进行reduce,逻辑是将value累加 .reduceByKey((a, b) -> a + b);
//先将key和value倒过来,再按照key排序 JavaPairRDD<Integer, String> sorts = counts //key和value颠倒,生成新的map .mapToPair(tuple2 -> new Tuple2<>(tuple2._2(), tuple2._1())) //按照key倒排序 .sortByKey(false);
//取前10个 List<Tuple2<Integer, String>> top10 = sorts.take(10);
//打印出来 for(Tuple2<Integer, String> tuple2 : top10){ System.out.println(tuple2._2() + "\t" + tuple2._1()); }
//分区合并成一个,再导出为一个txt保存在hdfs javaSparkContext.parallelize(top10).coalesce(1).saveAsTextFile(outputPath);
//关闭context javaSparkContext.close(); }}
复制代码


  • 在 pom.xml 目录下执行以下命令,编译构建 jar 包:


mvn clean package -Dmaven.test.skip=true
复制代码


  • 构建成功后,在 target 目录下生成文件 sparkwordcount-1.0-SNAPSHOT.jar,上传到 spark 服务器的**~/jars/**目录下;

  • 假设 spark 服务器的 IP 地址为 192.168.119.163,在 spark 服务器执行以下命令即可提交任务:


~/spark-2.3.2-bin-hadoop2.7/bin/spark-submit \--master spark://192.168.119.163:7077 \--class com.bolingcavalry.sparkwordcount.WordCount \--executor-memory 512m \--total-executor-cores 2 \~/jars/sparkwordcount-1.0-SNAPSHOT.jar \192.168.119.163 \8020 \GoneWiththeWind.txt
复制代码


  • 上述命令的最后三个参数,是 java 的 main 方法的入参,具体的使用请参照 WordCount 类的源码;

  • 提交成功后立即开始执行任务,看到类似如下信息表示任务完成:


2019-02-08 21:26:04 INFO  BlockManagerMaster:54 - BlockManagerMaster stopped2019-02-08 21:26:04 INFO  OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 - OutputCommitCoordinator stopped!2019-02-08 21:26:04 INFO  SparkContext:54 - Successfully stopped SparkContext2019-02-08 21:26:04 INFO  ShutdownHookManager:54 - Shutdown hook called2019-02-08 21:26:04 INFO  ShutdownHookManager:54 - Deleting directory /tmp/spark-c3e2ea9e-7daf-4cab-a207-26f0a03940172019-02-08 21:26:04 INFO  ShutdownHookManager:54 - Deleting directory /tmp/spark-d60e4d75-4189-4f33-a5e2-fbe9b06bdae7
复制代码


  • 往前翻滚一下控制台输出的信息,如下所示,可以见到单词统计的前十名已经输出在控制台了:


2019-02-08 21:36:15 INFO  DAGScheduler:54 - Job 1 finished: take at WordCount.java:61, took 0.313008 sthe  18264and  14150to  10020of  8615a  7571her  7086she  6217was  5912in  5751had  45022019-02-08 21:36:15 INFO  deprecation:1173 - mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir2019-02-08 21:36:15 INFO  FileOutputCommitter:108 - File Output Committer Algorithm version is 1
复制代码


  • 在 hdfs 服务器执行查看文件的命令,可见/output 下新建了子目录 20190208213610:


[hadoop@node0 ~]$ ~/hadoop-2.7.7/bin/hdfs dfs -ls /outputFound 1 itemsdrwxr-xr-x   - hadoop supergroup          0 2019-02-08 21:36 /output/20190208213610
复制代码


  • 查看子目录,发现里面有两个文件:


[hadoop@node0 ~]$ ~/hadoop-2.7.7/bin/hdfs dfs -ls /output/20190208213610Found 2 items-rw-r--r--   3 hadoop supergroup          0 2019-02-08 21:36 /output/20190208213610/_SUCCESS-rw-r--r--   3 hadoop supergroup        108 2019-02-08 21:36 /output/20190208213610/part-00000
复制代码


  • 上面看到的**/output/20190208213610/part-00000**就是输出结果,用 cat 命令查看其内容:


[hadoop@node0 ~]$ ~/hadoop-2.7.7/bin/hdfs dfs -cat /output/20190208213610/part-00000(18264,the)(14150,and)(10020,to)(8615,of)(7571,a)(7086,her)(6217,she)(5912,was)(5751,in)(4502,had)
复制代码


  • 可见与前面控制台输出的一致;

  • 在 spark 的 web 页面,可见刚刚执行的任务信息:



  • 至此,第一个 spark 应用的开发和运行就完成了,接下来的文章中,咱们一起来完成更多的 spark 实战;

欢迎关注 InfoQ:程序员欣宸

学习路上,你不孤单,欣宸原创一路相伴...


发布于: 刚刚阅读数: 3
用户头像

搜索"程序员欣宸",一起畅游Java宇宙 2018.04.19 加入

前腾讯、前阿里员工,从事Java后台工作,对Docker和Kubernetes充满热爱,所有文章均为作者原创,个人Github:https://github.com/zq2599/blog_demos

评论

发布
暂无评论
第一个spark应用开发详解(java版)_Java_程序员欣宸_InfoQ写作社区