写点什么

Spark SQL 之 RDD 转换 DataFrame 的方法

  • 2021 年 12 月 02 日
  • 本文字数:2792 字

    阅读完需:约 9 分钟

Spark SQL之RDD转换DataFrame的方法

RDD 转换 DataFrame 之 Reflection 方法


第一种方式是使用反射的方式,用反射去推倒出来 RDD 里面的 schema。这个方式简单,但是不建议使用,因为在工作当中,使用这种方式是有限制的。


对于以前的版本来说,case class 最多支持 22 个字段如果超过了 22 个字段,大数据培训我们就必须要自己开发一个类,实现 product 接口才行。因此这种方式虽然简单,但是不通用;因为生产中的字段是非常非常多的,是不可能只有 20 来个字段的。


//Javaimport org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.VoidFunction;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.SparkSession;


import javax.jnlp.PersistenceService;import javax.xml.crypto.Data;


public class rddtoDFreflectionJava {public static void main(String[] args) {SparkSession spark = SparkSession.builder().appName("program").master("local").config("spark.sql.warehouse.dir", "file:/Users/zhangjingyu/Desktop/Spark 架构/spark-warehouse").getOrCreate();String Path = "file:/Users/zhangjingyu/Desktop/spark-2.4.0/examples/src/main/resources/people.txt";


    JavaRDD<PersonJava> personRDD = Spark.read().textFile(Path).javaRDD().map(line -> {        String name = line.split(",")[0];        Long age = Long.valueOf(line.split(",")[1].trim());        PersonJava person = new PersonJava();        person.setName(name);        person.setAge(age);        return person;    });
复制代码


/**



    */Dataset<Row> personDF = Spark.createDataFrame(personRDD,PersonJava.class);personDF.createOrReplaceTempView("test");Dataset<Row> ResultDF = Spark.sql("select * from test a where a.age < 30");ResultDF.show();


        JavaRDD<PersonJava> ResultRDD = ResultDF.javaRDD().map(line -> {        PersonJava person = new PersonJava();        person.setName(line.getAs("name"));        person.setAge(line.getAs("age"));        return person;    });

    for (PersonJava personJava : ResultRDD.collect()) { System.out.println(personJava.getName()+":"+personJava.getAge()); }
    复制代码


    /**



      */}}


      //Scalaobject rddtoDFreflectionScala {case class Person(name : String , age : Long)


      def main(args: Array[String]): Unit = {val spark = CommSparkSessionScala.getSparkSession()val path = "file:/Users/zhangjingyu/Desktop/spark-2.4.0/examples/src/main/resources/people.txt"import spark.implicits._;val personDF = spark.sparkContext.textFile(path).map(row => row.split(",")).map(line => {Person(line(0),line(1).trim.toLong)}).toDFpersonDF.createOrReplaceTempView("test")val resultDF = spark.sql("select * from test a where a.age > 20")val resultrdd = resultDF.rdd.map(x =>{val name = x.getAsStringval age = x.getAsLongPerson(name,age)})


      for (elem <- resultrdd.collect()) {  System.out.println(elem.name+" : "+ elem.age)}
      复制代码


      }}


      RDD 转换 DataFrame 之 Programm 方式


      创建一个 DataFrame,使用编程的方式,这个方式用的非常多。通过编程方式指定 schema ,对于第一种方式的 schema 其实定义在了 case class 里面了。


      //Javaimport org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.api.java.function.VoidFunction;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.RowFactory;import org.apache.spark.sql.SparkSession;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.StructField;import org.apache.spark.sql.types.StructType;import scala.Tuple2;


      import java.util.ArrayList;import java.util.List;


      public class rddtoDFprogrammJava {public static void main(String[] args) {


          SparkSession spark = SparkSession            .builder()            .appName("program")            .master("local").config("spark.sql.warehouse.dir", "file:/Users/zhangjingyu/Desktop/Spark架构/spark-warehouse")            .getOrCreate();    String Path = "file:/Users/zhangjingyu/Desktop/spark-2.4.0/examples/src/main/resources/people.txt";

      //创建列属性 List<StructField> fields = new ArrayList<>(); StructField structField_name = DataTypes.createStructField("name", DataTypes.StringType, true); StructField structField_age = DataTypes.createStructField("age", DataTypes.LongType, true); fields.add(structField_name); fields.add(structField_age); StructType scheme = DataTypes.createStructType(fields);

      JavaRDD PersonRdd = spark.read().textFile(Path).javaRDD().map(x -> { String[] lines = x.split(","); return RowFactory.create(lines[0], Long.valueOf(lines[1].trim())); });

      Dataset<Row> PersonDF = spark.createDataFrame(PersonRdd, scheme); PersonDF.createOrReplaceTempView("program"); Dataset<Row> ResultDF = spark.sql("select * from program "); ResultDF.show();

      for (Row row : ResultDF.javaRDD().collect()) { System.out.println(row); }}
      复制代码


      }


      //Scala


      import org.apache.spark.sql.Rowimport org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}


      object rddtoDFprogrammScala {def main(args: Array[String]): Unit = {val spark = CommSparkSessionScala.getSparkSession()val path = "file:/Users/zhangjingyu/Desktop/spark-2.4.0/examples/src/main/resources/people.txt"val scheme = StructType(Array(StructField("name",StringType,true),StructField("age",LongType,true)))val rdd = spark.sparkContext.textFile(path).map(line => line.split(",")).map(x => {Row(x(0),x(1).trim.toLong)})val PersonDF = spark.createDataFrame(rdd,scheme)PersonDF.createOrReplaceTempView("person")val resultDF = spark.sql("select * from person a where a.age < 30")for (elem <- resultDF.collect()) {System.out.println(elem.get(0)+":"+elem.get(1))}}}


      原创作者:张景宇

      用户头像

      关注尚硅谷,轻松学IT 2021.11.23 加入

      还未添加个人简介

      评论

      发布
      暂无评论
      Spark SQL之RDD转换DataFrame的方法