写点什么

用什么承受全部的数据 -RDD、DataFrame 还是 Dataset

  • 2022 年 7 月 14 日
  • 本文字数:2564 字

    阅读完需:约 8 分钟

大家好,我是怀瑾握瑜,一只大数据萌新,家有两只吞金兽,嘉与嘉,上能 code 下能 teach 的全能奶爸<br/>如果您喜欢我的文章,可以[关注⭐]+[点赞👍]+[评论📃],您的三连是我前进的动力,期待与您共同成长~



前言

Spark 提供了三种主要的与数据相关的 API:


  • RDD:全称 Resilient Distributed Dataset,弹性分布式数据集,Spark 中最基础的数据抽象,特点是 RDD 只包含数据本身,没有数据结构。

  • DataFrame:也是一个分布式数据容器,除数据本身,还记录了数据的结构信息,即 schema;结构信息便于 Spark 知道该数据集中包含了哪些列,每一列的类型和数据是什么。

  • DataSet:Spark 中最上层的数据抽象,不仅包含数据本身,记录了数据的结构信息 schema,还包含了数据集的类型,也就是真正把数据集做成了一个 java 对象的形式,需要先创建一个样例类 case class,把数据做成样例类的格式,每一列就是样例类里的属性。


从版本的产生上来看:


RDD (Spark1.0) —> Dataframe(Spark1.3) —> Dataset(Spark1.6)


如果同样的数据都给到这三个数据结构,他们分别计算之后,都会给出相同的结果。不同是的他们的执行效率和执行方式。在后期的 Spark 版本中,DataSet 有可能会逐步取代 RDD 和 DataFrame 成为唯一的 API 接口。

三者的共性

  1. RDD、DataFrame、DataSet 全都是 spark 平台下的分布式弹性数据集,为处理超大型数据提供便利

  2. 三者都有惰性机制,在进行创建、转换,如 map 方法时,不会立即执行,只有在遇到 Action 如 foreach 时,三者才会开始遍历运算

  3. 三者有许多共同的函数,如 filter,排序等

  4. 在对 DataFrame 和 Dataset 进行操作许多操作都需要这个包:import spark.implicits._(在创建好 SparkSession 对象后尽量直接导入)

  5. 三者都有 partition 的概念

  6. DataFrame 和 DataSet 均可使用模式匹配获取各个字段的值和类型

三者的区别

RDD

  1. RDD 一般和 spark mlib 同时使用

  2. RDD 不支持 sparksql 操作

DataFrame

  1. 与 RDD 和 Dataset 不同,DataFrame 每一行的类型固定为 Row,只有通过解析才能获取各个字段的值,每一列的值没法直接访问

  2. DataFrame 与 Dataset 一般不与 spark ml 同时使用

  3. DataFrame 与 Dataset 均支持 sparksql 的操作,比如 select,groupby 之类,还能注册临时表/视窗,进行 sql 语句操作

  4. DataFrame 与 Dataset 支持一些特别方便的保存方式,比如保存成 csv,可以带上表头,这样每一列的字段名一目了然

  5. 处理支持结构或者非结构化的格式(比如 Avro, CSV, elastic search, 以及 Cassandra)以及不同的文件系统(HDFS, HIVE tables, MySQL, etc)。它支持非常多的数据源

  6. 使用 Spark 的 SQL 可以无修改的支持 Hive 查询在已经存在的 Hive warehouses。它重用了 Hive 的前端、MetaStore 并且对已经存在的 Hive 数据、查询和 UDF 提供完整的兼容性。

Dataset

  1. 使用 Spark 的 SQL 可以无修改的支持 Hive 查询在已经存在的 Hive warehouses。它重用了 Hive 的前端、MetaStore 并且对已经存在的 Hive 数据、查询和 UDF 提供完整的兼容性。

  2. Dataset 和 DataFrame 拥有完全相同的成员函数,区别只是每一行的数据类型不同

  3. DataFrame 也可以叫 Dataset[Row],每一行的类型是 Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用 getAS 方法拿出特定字段

JAVA 中的相互转换

直接构建 JavaRDD<Model>

JavaRDD<Model> rdd = jsc.parallelize(list);rdd.foreach(x -> System.out.println(x.toString()));
复制代码

直接构建 Dataset<Model>

Encoder<Model> encoder = Encoders.bean(Model.class);Dataset<Model> ds = spark.createDataset(list, encoder);ds.show();ds.printSchema();
复制代码

直接构建 Dataset<Row>

Dataset<Row> df = spark.createDataFrame(list, Model.class);df.show();df.printSchema();
复制代码

JavaRDD<Model> => Dataset<Model>

ds = spark.createDataset(rdd.rdd(), encoder);ds.show();ds.printSchema();
复制代码

JavaRDD<Model> => Dataset<Row>

df = spark.createDataFrame(rdd, Model.class);df.show();df.printSchema();
复制代码

JavaRDD<Row>到 Dataset<Row>

JavaRDD<Row> rowRdd = rdd.map(x -> new Model(id));List<StructField> fieldList = new ArrayList<>();fieldList.add(DataTypes.createStructField("id", DataTypes.IntegerType, false));StructType schema = DataTypes.createStructType(fieldList);df = spark.createDataFrame(rowRdd, schema);df.show();df.printSchema();
复制代码

Dataset<Model> => JavaRDD<Model>

rdd = ds.toJavaRDD();rdd.foreach(x -> System.out.println(x.toString()));
复制代码

Dataset<Row> => JavaRDD<Model>

rdd = df.toJavaRDD().map(row -> {    String id = row.getAs("id");    return new Model(id);});rdd.foreach(element -> System.out.println(element.toString()));
复制代码

Dataset<Model> => Dataset<Row>

List<StructField> fieldList = new ArrayList<>();fieldList.add(DataTypes.createStructField("id", DataTypes.IntegerType, false));StructType rowSchema = DataTypes.createStructType(fieldList);ExpressionEncoder<Row> rowEncoder = RowEncoder.apply(rowSchema);Dataset<Row> df = ds.map(        (MapFunction<Model, Row>) x -> {            List<Object> objectList = new ArrayList<>();            objectList.add(x.id);            return RowFactory.create(objectList.toArray());        },        rowEncoder);df.show();
复制代码

Dataset<Row> => Dataset<Model>

ds= df.map(new MapFunction<Row, Model>() {    @Override    public Model call(Row value) throws Exception {        return new Model(value.getAs("id"));    }}, encoder);ds.show();ds.printSchema();
复制代码

转换总结:

其实 RDD 的 Map 和 Dataset 的 Map 只有一点不同,就是 Dataset 的 Map 要指定一个 Encoder 的参数。


列一个最简单的数据查询,到落盘的简单流程


数据处理:


// 通过sql查询数据,生成dfDataset<Row> df = spark.sql(sql);// 遍历数据生成rddJavaRDD<Model> rdd = basicList.toJavaRDD().mapPartitions();//创建临时表保存数据// 通过bean生成dfDataset<Row> dfds = spark.createDataFrame(rdd, Model.class);dfds.createOrReplaceTempView("spark_tmp");spark.sql("insert XXX SELECT FROM spark_tmp");
复制代码



结束语

如果您喜欢我的文章,可以[关注⭐]+[点赞👍]+[评论📃],您的三连是我前进的动力,期待与您共同成长~

发布于: 刚刚阅读数: 3
用户头像

还未添加个人签名 2022.07.01 加入

还未添加个人简介

评论

发布
暂无评论
用什么承受全部的数据-RDD、DataFrame还是Dataset_spark_怀瑾握瑜的嘉与嘉_InfoQ写作社区