写点什么

SparkSQL 的入门实践教程

  • 2022 年 1 月 26 日
  • 本文字数:13640 字

    阅读完需:约 45 分钟

摘要:Spark SQL 是用于处理结构化数据的模块。与 Spark RDD 不同的是,Spark SQL 提供数据的结构信息(源数据)和性能更好,可以通过 SQL 和 DataSet API 与 Spark SQL 进行交互。


本文分享自华为云社区《【SparkSQL笔记】SparkSQL的入门实践教程(一)》,作者:Copy 工程师。

1.Spark SQL 概述​


Spark SQL 是用于处理结构化数据的模块。与 Spark RDD 不同的是,Spark SQL 提供数据的结构信息(源数据)和性能更好,可以通过 SQL 和 DataSet API 与 Spark SQL 进行交互。

2.Spark SQL 编程入门​


Spark SQL 模块的编程主入口点是 SparkSession,SparkSession 对象不仅为用户提供了创建 DataFrame 对象、读取外部数据源并转化为 DataFrame 对象以及执行 sql 查询的 API,还负责记录着用户希望 Spark 应用如何在 Spark 集群运行的控制、调优参数,是 Spark SQL 的上下文环境,是运行的基础。

2.1 创建 SparkSession

SparkSession sparkSession = SparkSession.builder().master("local").appName("SparkSQLDemo1").config("spark.testing.memory", 471859200).getOrCreate();
复制代码


master("local")new SparkConf().setMaster("local")一个样子,SparkSession包含了SparkContextSqlContext等,是更强大的入口对象,也是更统一的入口。

appName("SparkSQLDemo1")设置任务名称

config():设置配置属性,并且有多个重载方法:

ublic synchronized SparkSession.Builder config(String key, String value)public synchronized SparkSession.Builder config(String key, long value)public synchronized SparkSession.Builder config(String key, double value)public synchronized SparkSession.Builder config(String key, boolean value)public synchronized SparkSession.Builder config(SparkConf conf)
复制代码


Spark 2.0 中的 SparkSession 为 Hive 提供了强大的内置支持,包括使用 HiveQL 编写查询语句,访问 Hive UDF 以及从 Hive 表读取数据的功能。若是仅以学习为目的去测试这些功能时,并不需要在集群中特意安装 Hive 即可在 Spark 本地模式下测试 Hive 支持。

2.2 创建 DataFrame


SparkSession 对象提供的 API,可以从现有的 RDD,Hive 表或其他结构化数据源中创建 DataFrame 对象。


在这里说明一下,DataSet 是 DataFrame 的替代品,比 DataFrame 更强大。DataFrame 等价于 DataSet[Row]


public static void main(String[] args) {    SparkSession sparkSession = SparkSession.builder().master("local").appName("SparkSQLDemo1").config("spark.testing.memory", 471859200).getOrCreate();    System.out.println(sparkSession.version());    Dataset<Row> json = sparkSession.read().json("D:\\sparksqlfile\\jsondata\\student.json");	//show方法是展示所有的数据,也可以show(int rownums) 展示前N条数据    json.show();    sparkSession.close();}
复制代码


样例数据:


{"id":1,"name":"小红","age":"19","phone":"111"}{"id":2,"name":"王明","age":"20","phone":"222"}{"id":3,"name":"诸葛亮","age":"21","phone":"333"}{"id":4,"name":"王茂","age":"23","phone":"444"}{"id":5,"name":"三毛","age":"17","phone":"555"}{"id":6,"name":"老张","age":"16","phone":"666"}
复制代码


日志打印:


INFO  2019-11-25 20:26 - org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator[main] - Code generated in 18.6628 ms+---+---+----+-----+|age| id|name|phone|+---+---+----+-----+| 19|  1|  小红|  111|| 20|  2|  王明|  222|| 21|  3| 诸葛亮|  333|| 23|  4|  王茂|  444|| 17|  5|  三毛|  555|| 16|  6|  老张|  666|+---+---+----+-----+
INFO 2019-11-25 20:26 - org.spark_project.jetty.server.ServerConnector[main] - Stopped Spark@23db87{HTTP/1.1}{0.0.0.0:4040}
复制代码


可以看到,已经解析了 json 数据文件,并且还解析 json 中的字段名称,解析成表的字段名称,而且如果你的 json 的 key 值中有不一致的,都会解析成字段名称,只不过没有值的默认为 null


例如:



日志打印:



看到没有,所有的不同 key 值都有。

2.3 DataFrame 基本操作


​DataFrame 为我们提供了灵活、强大且底层自带优化的 API,例如 select、where、orderBy、groupBy、limit、union 这样的算子操作,DataFrame 提供这一系列算子对开发者来说非常熟悉,而 DataFrame 正是将 SQL select 语句的各个组成部分封装为同名 API,用以帮助程序员通过 select、where、orderBy 等 DataFrame API 灵活地组合实现 sql 一样的逻辑表达。因此,DataFrame 编程仅需像 SQL 那样简单地对计算条件、计算需求、最终所需结果进行声明式的描述即可,而不需要像 RDD 编程那样一步步地对数据集进行原始操作。


DataFrame API 的使用实例(以上面的 json 数据为例):


1、以树形格式输出 DataSet 对象的结构信息


Dataset<Row> json = sparkSession.read().json("D:\\sparksqlfile\\jsondata\\student.json");// 展示DataSet结构信息json.printSchema();
复制代码


日志打印:


root |-- age: string (nullable = true) |-- id: long (nullable = true) |-- name: string (nullable = true) |-- phone: string (nullable = true)
复制代码


表的结构信息已经按照树形结构出来了,是根据 json 文件的 value 值判定的。


2、通过 DataSet 的 Select()方法查询数据集中一列或者多列


SparkSession sparkSession = SparkSession.builder().master("local").appName("SparkSQLDemo1").config("spark.testing.memory", 471859200).getOrCreate();Dataset<Row> json = sparkSession.read().json("D:\\sparksqlfile\\jsondata\\student.json");// 定义字段Column id = new Column("id");Column name = new Column("name");// 查询单个字段json.select("id").show();// 查询多个字段json.select(id,name).show();// 关闭saprkSesison 这里的close和stop是一个样 2.1.X开始用close 2.0.X使用的stopsparkSession.close();
复制代码


Select 还有一个 select(String col, String... cols)第一个参数不是很明白,后面再补充吧。还有这里有个对象 Column 是非常重要的,以后所有的 java 开发 sparkSQL 都会用到这个对象,这个对象就是我们在数据库中用到的字段,并且该对象有丰富的方法对字段做操作。


3、组合使用 DataSet 对象的 select(),where(),orderBy()方法查找 id 大于 3 的同学的 id,姓名,年龄以及电话,按 id 的降序排列


这里为了更能直观的展示结果,我修改了 json 文件,添加了几行数据:


{"id":1,"name":"小红","age":"19","phone":"111"}{"id":2,"name":"王明","age":"20","phone":"222"}{"id":3,"name":"诸葛亮","age":"21","phone":"333"}{"id":4,"name":"王茂","age":"23","phone":"444"}{"id":5,"name":"三毛","age":"17","phone":"555"}{"id":6,"name":"老张","age":"16","phone":"666"}{"id":7,"name":"张好","age":"16","phone":"777"}{"id":8,"name":"王流","age":"16","phone":"888"}
复制代码


where 条件查询有两种形式:


public Dataset<T> where(Column condition)public Dataset<T> where(String conditionExpr)
复制代码


第一种是通过 Column 对象操作条件查询,第二种是通过直接写条件查询


// 条件查询// id > 3Column id = new Column("id").gt(3);// age = 16Column name = new Column("age").equalTo("16");// id > 3 and age =16Column select = id.and(name);// 直接书写条件json.select("id","name","age","phone").where("id > 3 and age = 16").orderBy(new Column("id").desc()).show();// 通过多个where生成 id > 3 and age =16json.select("id","name","age","phone").where(id).where(name).orderBy(new Column("id").desc()).show();// 通过Column操作转换得到 id > 3 and age =16json.select("id","name","age","phone").where(select).orderBy(new Column("id").desc()).show();
复制代码


这三个的写法是一样的,结果也是一样的。



4、image.png 使用 DataSet 对象提供的 groupBy()方法进而学生年龄分布


// 分组查询// 单个字段分组查询json.groupBy(new Column("age")).count().show();// 多个字段分组查询ArrayStack<Column> stack = new ArrayStack<>();stack.push(new Column("id"));stack.push(new Column("age"));json.groupBy(stack).count().show();// 多个字段分组查询json.groupBy(new Column("id"),new Column("age")).count().show();
复制代码


group()分组有很多形式:



至于你想怎么写,只要正确就可以。




如果你想对字段操作,比如我们经常会这样写 sqlselect age+1 from student,呢么 sparkSQL 完全可以实现,只需要这样既可:new Column("age").plus(1) 这就代表着 age + 1


上面的实例中很好地展示了通过灵活组合使用 DataSet 提供的 API 可以实现 SQL 一样清晰简明的逻辑表达,如果采用 RDD 编程,首先 RDD 对 JSON 这种文件格式并不敏感,会像读取文本文件一样按行读取 JSON 文件,转化为 RDD[String],而不会像 DataSet 那样自动解析 JSON 格式数据并且自动推断出结构信息(Schema),因此我们必须在程序中首先实例化一个 JSON 解析器用于解析 JSON 字符串得到真实数据组成的数组,实际是将 RDD[String]转化为由一行行记录着多个共有字段数值的数组组成的 RDD[Array[String]],进而使用 map、filter、takeOrdered、distinct、union 等 RDD 算子操作进行具体一步步地数据操作来实现业务逻辑。


相比之下,我们看出有时候同样的数据量,同样的分析需求,用 RDD 编程实现不仅代码量更大,而且会极有可能因为程序员不良操作加重集群的开销,而采用 DataFrame API 组合编程有时仅需一行代码即可实现复杂的分析需求。

2.4 执行 SQL 查询


SparkSession 为用户提供了直接执行 sql 语句的 SparkSession.sql(String sqlText)方法,sql 语句可直接作为字符串传入 sql()方法中,sql 查询所得到结果依然为 DataFrame 对象。在 Spark SQL 模块上直接执行 sql 语句的查询需要首先将标志着结构化数据源的 DataSet 对象注册成临时表,进而在 sql 语句中对该临时表进行查询操作,具体步骤如下例所示:


SparkSession sparkSession = SparkSession.builder().master("local").appName("SparkSQLDemo1").config("spark.testing.memory", 471859200).getOrCreate();Dataset<Row> json = sparkSession.read().json("D:\\sparksqlfile\\jsondata\\student.json");// 注册临时表json.createOrReplaceTempView("student");// sql查询 用select * from student 也可以Dataset<Row> sql = sparkSession.sql("select id,name,age,phone from student");sql.show();// 关闭saprkSesison 这里的close和stop是一个样 2.1.X开始用close 2.0.X使用的stopsparkSession.close();
复制代码


结果显示:



由上述操作,可看出 DataSet 是 Spark SQL 核心的数据抽象,读取的数据源需要转化成 DataSet 对象,才能利用 DataSet 各种 API 进行丰富操作,也可将 DataSet 注册成临时表,从而直接执行 SQL 查询,而 DataFrame 上的操作之后返回的也是 DataFrame 对象。


另外,因为本小结所讲述的是如何通过 SparkSession 提供的 SQL 接口直接进行 SQL 查询,而关于具体完成业务需求所需的 SQL 语句如何来编写,大家可以直接百度查询相关 SQL 教程进行学习。Spark SQL 的 SQL 接口全面支持 SQL 的 select 标准语法,包括 SELECT DISTINCT、from 子句、where 子句、order by 字句、group by 子句、having 子句、join 子句,还有典型的 SQL 函数,例如 avg()、count()、max()、min()等,除此之外,Spark SQL 在此基础上还提供了大量功能强大的可用函数,可嵌入 sql 语句中使用,有聚合类函数、时间控制类函数、数学统计类函数、字符串列控制类函数等,感兴趣或有这方面分析需求的读者具体可查看官方文档http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql

2.5 全局临时表


​全局临时表(globe temporary view )于临时表(temporary view)是相对的,全局临时表的作用范围是某个 Spark 应用程序内所有会话(SparkSession),它会持续存在,在所有会话中共享,直到该 Spark 应用程序终止


因此,在同一个应用中,在不同的 session 中都需要用到一张临时表,呢么该临时表可以注册为全局临时表,避免多余 I/O,提高系统执行效率,当然如果某个临时表只在整个应用中的某个 session 中使用,仅需要注册为局部临时表,避免不必要的内存存储全局临时表


注意,全局临时表与系统保留的数据库 global_temp 相关联,引用时需要使用 global_temp 标识。


实例:


SparkSession sparkSession = SparkSession.builder().master("local").appName("SparkSQLDemo1").config("spark.testing.memory", 471859200).getOrCreate();Dataset<Row> json = sparkSession.read().json("D:\\sparksqlfile\\jsondata\\student.json");// 注册临时表json.createOrReplaceTempView("student");// sql查询Dataset<Row> sql = sparkSession.sql("select * from student");sql.show();// 注册为全局临时表try {    json.createGlobalTempView("student_glob");} catch (AnalysisException e) {    e.printStackTrace();}// 当前session查询全局临时表Dataset<Row> sqlGlob = sparkSession.sql("select * from global_temp.student_glob");sqlGlob.show();
// 创建新的SparkSeesion 查询全局临时表Dataset<Row> newSqlGlob = sparkSession.newSession().sql("select * from global_temp.student_glob");newSqlGlob.show();
// 关闭saprkSesison 这里的close和stop是一个样 2.1.X开始用close 2.0.X使用的stopsparkSession.close();
复制代码


2.6 DataSet 实现 WordCount


​Dataset[T]中对象的序列化并不使用 Java 标准序列化或 Kryo,而是使用专门的编码器对对象进行序列化以便通过网络进行处理或传输。虽然编码器和标准序列化都负责将对象转换为字节,但编码器是根据 Dataset[T]的元素类型(T)动态生成,并且允许 Spark 无须将字节反序列化回对象的情况下即可执行许多操作(如过滤、排序和散列),因此避免了不必要的反序列化导致的资源浪费,更加高效。


SparkSession sparkSession = SparkSession.builder().master("local").appName("SparkSQLDemo1").config("spark.testing.memory", 471859200).getOrCreate();Dataset<String> stringDataset = sparkSession.read().textFile("D:\\sparksqlfile\\jsondata\\word.txt");// 分割每行的字符串Dataset<String> dataset = stringDataset.flatMap(new FlatMapFunction<String, String>() {    @Override    public Iterator<String> call(String s) throws Exception {        String[] split = s.split("\t", -1);        return Arrays.asList(split).iterator();    }},Encoders.STRING());// 根据key值分组KeyValueGroupedDataset<String, String> groupByKey = dataset.groupByKey(new MapFunction<String, String>() {    @Override    public String call(String value) throws Exception {        return value.toLowerCase();    }},Encoders.STRING());// 求和展示数据groupByKey.count().show();
// 关闭saprkSesison 这里的close和stop是一个样 2.1.X开始用close 2.0.X使用的stopsparkSession.close();
复制代码


日志打印:



元数据:


im	runnig	manyou	are	yes	hahayes	you	wein	im	niuyou	are	yes	hahayou	are	yes	haha
复制代码

2.7 将 RDDs 转化为 DataFrame


​除了调用 SparkSesion.read().json/csv/orc/parquet/jdbc 方法从各种外部结构化数据源创建 DataFrame 对象外,Spark SQL 还支持将已有的 RDD 转化为 DataSet 对象,但是需要注意的是,并不是由任意类型对象组成的 RDD 均可转化为 DataSet 对象,只有当组成 RDD[T]的每一个 T 对象内部具有公有且鲜明的字段结构时,才能隐式或显式地总结出创建 DataSet 对象所必要的结构信息(Schema)进行转化,进而在 DataSet 上调用 RDD 所不具备的强大丰富的 API,或执行简洁的 SQL 查询。


Spark SQL 支持将现有 RDDs 转换为 DataSet 的两种不同方法,其实也就是隐式推断或者显式指定 DataSet 对象的 Schema。


实例数据:


王明	13	17865321121	南京市天龙寺小区一栋	五年级一班刘红	14	15643213452	南京市天龙寺小区二栋	五年级一班张三	15	15678941247	南京市天龙寺小区三栋	五年级二班诸葛刘芳	14	14578654123	南京市天龙寺小区一栋	五年级一班
复制代码


1、使用反射机制(Reflection)推理出 schema 结构信息


第一种将 RDDs 转化为 DataFrame 的方法是使用 SparkSQL 内部反射机制自动推断包含特定类型对象的 RDD 的 schema(RDD 的结构信息)进行隐士转化。采用这种方式转化为 DataSet 对象,往往是因为被转化的 RDD[T]所包含的 T 对象本身就是具有典型一维表严格的字段结构的对象,因此 SparkSQL 很容易就可以自动推断出合理的 Schema。这种基于反射机制隐式地创建 DataSet 的方法往往仅需要简洁的代码即可完成转化,并且运行效果良好。


SparkSQL 的 Scala 接口支持自动包含样例类(case class)对象的 RDD 转换为 DataSet 对象。在样例类的声明中已预先定义了表的结构信息,内部通过反射机制即可读取样例类的参数的名称,类型,转化为 DataSet 对象的 Schema。样例类不仅可以包含 Int,Double,String,这样的简单数据类型,也可以嵌套或包含复杂类型,例如 Seq 或 Arrays。


实例:将学生样例对象的 RDD 隐式转换为 DataSet 对象


public static void main(String[] args) {    SparkSession sparkSession = SparkSession.builder().master("local").appName("SparkSQLDemo1").config("spark.testing.memory", 471859200).getOrCreate();    // 读取文件转成JavaRDD    JavaRDD<String> stringRDD = sparkSession.sparkContext().textFile("D:\\sparksqlfile\\jsondata\\student.txt",1).toJavaRDD();    // JavaRDD<String> 转为 JavaRDD<Person>    JavaRDD<Person> personRDD = stringRDD.map(new Function<String, Person>() {        @Override        public Person call(String v1) {            String[] split = v1.split("\t", -1);            return new Person(split[0],Integer.valueOf(split[1]),split[2],split[3],split[4]);        }    });    // RDD 转换为 DataSet    Dataset<Row> personDataSet = sparkSession.createDataFrame(personRDD, Person.class);    personDataSet.show();    // 注册临时表    personDataSet.createOrReplaceTempView("person");    // 查询临时表    Dataset<Row> selectDataSet = sparkSession.sql("select * from person where age between 14 and 15");    selectDataSet.show();    // 遍历DataSet 通过下标获取 name的Ds    Dataset<String> nameDs = selectDataSet.map(new MapFunction<Row, String>() {        @Override        public String call(Row value) {            // Row 的 字段排序是按照字典排序的 所以 第四个才是name字段            return "name:"+value.getString(3);        }    }, Encoders.STRING());    nameDs.show();
// Row通过指定字段名获取字段值 返回Object对象 Dataset<String> nameDs2 = selectDataSet.map(new MapFunction<Row, String>() { @Override public String call(Row value) { return "name:"+value.getAs("name"); } }, Encoders.STRING()); nameDs2.show();
// 关闭saprkSesison 这里的close和stop是一个样 2.1.X开始用close 2.0.X使用的stop sparkSession.close();}
复制代码


日志截图:



2、开发者指定 Schema


RDD 转化为 Dataset 的第二种方法是通过编程接口,允许先创建一个 schema,然后将其应用到现有的 RDD[Row],较前一种方法由样例类或基本数据类型(Int,String)对象组成的 RDD 通过 sparkSession.createDataFrame 直接隐式转换为 iDataset 不同,不仅需要根据需求以及数据结构构建 schema,而且需要将 RDD[T]转化为 Row 对象组成的 RDD[Row],这样方法虽然代码多了一些,但也提供了更高的自由度和灵活性。


当 case 类不能提前定义时(例如:数据集结构信息已经包含在每一行,一个文本数据集的字段对不同用户来说需要被解析成不同的字段名),这时就可以通过以下三部完成 Dataset 的转换:


(1):根据需求从源 RDD 转化为 RDD of Rows


(2):创建由符合在步骤 1 中创建的 RDD 中的 Rows 结构的 StructType 表示的模式。


(3):通过 SparkSession 提供的 createDataFrame 方法将模式应用于行的 RDD。


由此可见,将 RDD 转化为 Dataset 的实质就是,赋予 RDD 内部包含特定类型对象的结构信息,使 Dataset 掌握更丰富的结构与信息(可以理解为传统数据库的表头,表头包含个字段名称,类型等信息),如此一来,便更好地说明 Dateset 支持 sql 查询了。


实例:


SparkSession sparkSession = SparkSession.builder().master("local").appName("SparkSQLDemo1").config("spark.testing.memory", 471859200).getOrCreate();// 读取文件转成JavaRDDJavaRDD<String> peopleRDD = sparkSession.sparkContext().textFile("D:\\sparksqlfile\\jsondata\\student.txt",1).toJavaRDD();
String[] schemaString = {"name", "age"};// 创建自定义schemaList<StructField> fields = new ArrayList<>();for (String s : schemaString) { fields.add(DataTypes.createStructField(s,DataTypes.StringType,true));}StructType schema = DataTypes.createStructType(fields);// JavaRDD<String> 转为JavaRDD<Row>行记录JavaRDD<Row> rowRdd = peopleRDD.map(new Function<String, Row>() { @Override public Row call(String v1) { String[] split = v1.split("\t", -1); return RowFactory.create(split[0],split[1]); }});// JavaRDD转DataSetDataset<Row> personDataset = sparkSession.createDataFrame(rowRdd, schema);personDataset.show();
复制代码

2.8 用户自定义函数

除了利用 Dataset 丰富的内置函数变成外,还可以自己编程满足特定分析需求的用户自定义函数(UDF)并加以使用,SparkSQL 中主要支持创建用户自定义无类型聚合函数和用户自定义强类型聚合函数


1、用户自定义无类型聚合函数


用户自定义的无类型聚合函数必须继承 UserDefinedAggregateFunction 抽象类,进而重写父类中的抽象成员变量和成员方法。其实重写父类抽象成员变量,方法的过程即是实现用户自定义函数的输入,输出规范以及计算逻辑的过程。


实例:求取平均值的函数


UDF 函数代码:


public class MyAverage extends UserDefinedAggregateFunction {
private StructType inputSchema; private StructType bufferSchema;
public MyAverage() { ArrayList<StructField> inputFields = new ArrayList<>(); inputFields.add(DataTypes.createStructField("inputColumn",DataTypes.LongType,true)); inputSchema = DataTypes.createStructType(inputFields);
ArrayList<StructField> bufferFields = new ArrayList<>(); bufferFields.add(DataTypes.createStructField("sum",DataTypes.LongType,true)); bufferFields.add(DataTypes.createStructField("count", DataTypes.LongType,true)); bufferSchema = DataTypes.createStructType(bufferFields);
} // Data types of input arguments of this aggregate function // 聚合函数输入参数的数据类型(其实是该函数所作用的Dataset指定列的数据类型) @Override public StructType inputSchema() { return inputSchema;
} // Data types of values in the aggregation buffer // 聚合函数的缓冲器结构,返回之前定义了用于记录累加值和累加数的字段结构 @Override public StructType bufferSchema() { return bufferSchema; } // The data type of the returned value // 聚合函数返回值的数据类型 @Override public DataType dataType() { return DataTypes.DoubleType; } // Whether this function always returns the same output on the identical input // 此函数是否始终在相同输入上返回相同输出 @Override public boolean deterministic() { return true; } // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides // the opportunity to update its values. Note that arrays and maps inside the buffer are still // immutable. // 初始化给定的buffer聚合缓冲器 // buffer 聚合缓冲器其本身是一个Row对象,因此可以调用其标准方法访问buffer内的元素,例如在索引处检索一个值 @Override public void initialize(MutableAggregationBuffer buffer) { buffer.update(0,0L); buffer.update(1,0L); } // Updates the given aggregation buffer `buffer` with new input data from `input` @Override public void update(MutableAggregationBuffer buffer, Row input) { if (!input.isNullAt(0)){ long updatedSum = buffer.getLong(0) + input.getLong(0); long updatedCount = buffer.getLong(1)+1; buffer.update(0,updatedSum); buffer.update(1,updatedCount); } } // Merges two aggregation buffers and stores the updated buffer values back to `buffer1` @Override public void merge(MutableAggregationBuffer buffer1, Row buffer2) { long mergedSum = buffer1.getLong(0) + buffer2.getLong(0); long mergedCount = buffer1.getLong(1) + buffer2.getLong(1); buffer1.update(0,mergedSum); buffer1.update(1,mergedCount); } @Override public Object evaluate(Row buffer) { return ((double)buffer.getLong(0))/buffer.getLong(1); }}
复制代码


运行代码:


public static void main(String[] args) {    SparkSession sparkSession = SparkSession.builder().master("local").appName("XXXXXXXXXX").config("spark.testing.memory", 471859200).getOrCreate();    // 读取文件    Dataset<Row> df = sparkSession.read().json("D:\\sparksqlfile\\jsondata\\student5.json");    // 注册自定义函数    sparkSession.udf().register("myAverage",new MyAverage());    // 显示原始数据    df.createOrReplaceTempView("student");    df.show();    // 使用自定义UDF求平均值    Dataset<Row> result = sparkSession.sql("SELECT myAverage(age) as average_salary FROM student");    result.show();}
复制代码


日志打印:



2、用户自定义强类型聚合函数


用户自定义强类型聚合函数需继承 Aggregator 抽象类,同样需要重写父类抽象方法(reduce,merge,finish)以实现自定义聚合函数的计算逻辑。用户定义的强类型聚合函数相比于前一种 UDF,内部与特定数据集的数据类型紧密结合,增强了紧密型,安全性,但降低了适用性。


实例:求用户平均值的强类型聚合函数


数据实体类:


// 定义Employee样例类型规范聚合函数输入数据的数据类型public class Employee implements Serializable {    private String name;    private long age;    private String sex;    private String institute;    private String phone;    public Employee() {    }
public Employee(String name, long age, String sex, String institute, String phone) { this.name = name; this.age = age; this.sex = sex; this.institute = institute; this.phone = phone; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public long getAge() { return age; }
public void setAge(long age) { this.age = age; }
public String getSex() { return sex; }
public void setSex(String sex) { this.sex = sex; }
public String getInstitute() { return institute; }
public void setInstitute(String institute) { this.institute = institute; }
public String getPhone() { return phone; }
public void setPhone(String phone) { this.phone = phone; }
@Override public String toString() { return "Employee{" + "name='" + name + '\'' + ", age=" + age + ", sex='" + sex + '\'' + ", institute='" + institute + '\'' + ", phone='" + phone + '\'' + '}'; }}
复制代码


定义聚合函数缓冲器:


// 定义Average样例类规范buffer聚合缓冲器的数据类型public class Average implements Serializable {    private long sum;    private long count;
public Average() { }
public Average(long sum, long count) { this.sum = sum; this.count = count; }
public long getSum() { return sum; }
public void setSum(long sum) { this.sum = sum; }
public long getCount() { return count; }
public void setCount(long count) { this.count = count; }
@Override public String toString() { return "Average{" + "sum=" + sum + ", count=" + count + '}'; }}
复制代码


UDF 代码:


// 用户自定义的强类型聚合函数必须继承Aggregator抽象类,注意需要传入聚合函数输入数据,buffer缓冲器以及返回的结果的泛型参数public class MyAverage2 extends Aggregator<Employee,Average,Double> {    // A zero value for this aggregation. Should satisfy the property that any b + zero = b    // 定义聚合的零值,应该满足任何b + zero = b    @Override    public Average zero() {        return new Average(0L, 0L);    }
// Combine two values to produce a new value. For performance, the function may modify `buffer` // and return it instead of constructing a new object // 定义作为Average对象的buffer聚合缓冲器如何处理每一条输入数据(Employee对象)的聚合逻辑, // 与上例的求取平均值的无类型聚合函数的update方法一样,每一次调用reduce都会更新buffer聚合函数的缓冲器 // 并将更新后的buffer作为返回值 @Override public Average reduce(Average buffer, Employee employee) { long newSum = buffer.getSum() + employee.getAge(); long newCount = buffer.getCount() + 1; buffer.setSum(newSum); buffer.setCount(newCount); return buffer; }
// Merge two intermediate values // 与上例的求取平均值的无类型聚合函数的merge方法实现的逻辑相同 @Override public Average merge(Average b1, Average b2) { long mergeSum = b1.getSum() + b2.getSum(); long mergeCount = b1.getCount() + b2.getCount(); b1.setSum(mergeSum); b1.setCount(mergeCount); return b1; }
// Transform the output of the reduction // 定义输出结果的逻辑,reduction表示buffer聚合缓冲器经过多次reduce,merge之后的最终聚合结果 // 仍为Average对象记录着所有数据的累加,累加次数 @Override public Double finish(Average reduction) { System.out.println("////////////////"+((double) reduction.getSum()) / reduction.getCount()); return ((double)reduction.getSum())/reduction.getCount(); }
// Transform the output of the reduction // 指定中间值的编码器类型 @Override public Encoder<Average> bufferEncoder() { return Encoders.bean(Average.class); }
// Specifies the Encoder for the final output value type // 指定最终输出的编码器类型 @Override public Encoder<Double> outputEncoder() { return Encoders.DOUBLE(); }}
复制代码


运行代码:


public static void main(String[] args) {    SparkSession sparkSession = SparkSession.builder().master("local").appName("XXXXXXXXXX").config("spark.testing.memory", 471859200).getOrCreate();    Encoder<Employee> employeeEncoder = Encoders.bean(Employee.class);    // 读取文件    Dataset<Employee> employeeDataset = sparkSession.read().json("D:\\sparksqlfile\\jsondata\\student5.json").as(employeeEncoder);    employeeDataset.show();
// 将函数转换为'TypedColumn' 并给他一个名字 MyAverage2 myAverage2 = new MyAverage2(); TypedColumn<Employee, Double> average_salary = myAverage2.toColumn().name("average_salary"); // 使用自定义强类型UDF求平均值 Dataset<Double> result = employeeDataset.select(average_salary); result.show();}
复制代码


日志:



点击关注,第一时间了解华为云新鲜技术~

发布于: 刚刚阅读数: 2
用户头像

提供全面深入的云计算技术干货 2020.07.14 加入

华为云开发者社区,提供全面深入的云计算前景分析、丰富的技术干货、程序样例,分享华为云前沿资讯动态,方便开发者快速成长与发展,欢迎提问、互动,多方位了解云计算! 传送门:https://bbs.huaweicloud.com/

评论

发布
暂无评论
SparkSQL的入门实践教程