Spark SQL 之 RDD 转换 DataFrame 的方法
RDD 转换 DataFrame 之 Reflection 方法
第一种方式是使用反射的方式,用反射去推倒出来 RDD 里面的 schema。这个方式简单,但是不建议使用,因为在工作当中,使用这种方式是有限制的。
对于以前的版本来说,case class 最多支持 22 个字段如果超过了 22 个字段,大数据培训我们就必须要自己开发一个类,实现 product 接口才行。因此这种方式虽然简单,但是不通用;因为生产中的字段是非常非常多的,是不可能只有 20 来个字段的。
//Javaimport org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.VoidFunction;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.SparkSession;
import javax.jnlp.PersistenceService;import javax.xml.crypto.Data;
public class rddtoDFreflectionJava {public static void main(String[] args) {SparkSession spark = SparkSession.builder().appName("program").master("local").config("spark.sql.warehouse.dir", "file:/Users/zhangjingyu/Desktop/Spark 架构/spark-warehouse").getOrCreate();String Path = "file:/Users/zhangjingyu/Desktop/spark-2.4.0/examples/src/main/resources/people.txt";
/**
*/Dataset<Row> personDF = Spark.createDataFrame(personRDD,PersonJava.class);personDF.createOrReplaceTempView("test");Dataset<Row> ResultDF = Spark.sql("select * from test a where a.age < 30");ResultDF.show();
/**
*/}}
//Scalaobject rddtoDFreflectionScala {case class Person(name : String , age : Long)
def main(args: Array[String]): Unit = {val spark = CommSparkSessionScala.getSparkSession()val path = "file:/Users/zhangjingyu/Desktop/spark-2.4.0/examples/src/main/resources/people.txt"import spark.implicits._;val personDF = spark.sparkContext.textFile(path).map(row => row.split(",")).map(line => {Person(line(0),line(1).trim.toLong)}).toDFpersonDF.createOrReplaceTempView("test")val resultDF = spark.sql("select * from test a where a.age > 20")val resultrdd = resultDF.rdd.map(x =>{val name = x.getAsStringval age = x.getAsLongPerson(name,age)})
}}
RDD 转换 DataFrame 之 Programm 方式
创建一个 DataFrame,使用编程的方式,这个方式用的非常多。通过编程方式指定 schema ,对于第一种方式的 schema 其实定义在了 case class 里面了。
//Javaimport org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.api.java.function.VoidFunction;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.RowFactory;import org.apache.spark.sql.SparkSession;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.StructField;import org.apache.spark.sql.types.StructType;import scala.Tuple2;
import java.util.ArrayList;import java.util.List;
public class rddtoDFprogrammJava {public static void main(String[] args) {
}
//Scala
import org.apache.spark.sql.Rowimport org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
object rddtoDFprogrammScala {def main(args: Array[String]): Unit = {val spark = CommSparkSessionScala.getSparkSession()val path = "file:/Users/zhangjingyu/Desktop/spark-2.4.0/examples/src/main/resources/people.txt"val scheme = StructType(Array(StructField("name",StringType,true),StructField("age",LongType,true)))val rdd = spark.sparkContext.textFile(path).map(line => line.split(",")).map(x => {Row(x(0),x(1).trim.toLong)})val PersonDF = spark.createDataFrame(rdd,scheme)PersonDF.createOrReplaceTempView("person")val resultDF = spark.sql("select * from person a where a.age < 30")for (elem <- resultDF.collect()) {System.out.println(elem.get(0)+":"+elem.get(1))}}}
原创作者:张景宇
评论