有南昌网站优化公司,鼓楼做网站公司哪家好,温州网站优化定制,seo关键词排名技巧DataSet DataFrame 的出现#xff0c;让 Spark 可以更好地处理结构化数据的计算#xff0c;但存在一个问题#xff1a;编译时的类型安全问题#xff0c;为了解决它#xff0c;Spark 引入了 DataSet API#xff08;DataFrame API 的扩展#xff09;。DataSet 是分布式的数…DataSet DataFrame 的出现让 Spark 可以更好地处理结构化数据的计算但存在一个问题编译时的类型安全问题为了解决它Spark 引入了 DataSet APIDataFrame API 的扩展。DataSet 是分布式的数据集合它提供了强类型支持也就是给 RDD 的每行数据都添加了类型约束。 在 Spark 2.0 中DataFrame 和 DataSet 被合并为 DataSet 。DataSet包含 DataFrame 的功能DataFrame 表示为 DataSet[Row] 即DataSet 的子集。 
三种 API 的选择 RDD 是DataFrame 和 DataSet 的底层如果需要更多的控制功能比如精确控制Spark 怎么执行一条查询尽量使用 RDD。 如果希望在编译时获得更高的类型安全建议使用 DataSet。 如果想统一简化 Spark 的API 则使用 DataFrame 和 DataSet。 基于 DataFrame API 和 DataSet API 开发的程序会被自动优化开发人员不需要操作底层的RDD API 来手动优化大大提高了开发效率。但是RDD API 对于非结构化数据的处理有独特的优势比如文本数据流而且更方便我们做底层的操作。 
DataSet 的创建 
1、使用createDataset方法创建 
def main(args: Array[String]): Unit  {//local代表本地单线程模式 local[*]代表本地多线程模式val spark  SparkSession.builder().appName(create dataset).master(local[*]).getOrCreate()//一定要导入它 不然无法创建DataSet对象import spark.implicits._val ds1  spark.createDataset(1 to 5)ds1.show()val ds2  spark.createDataset(spark.sparkContext.textFile(data/sql/people.txt))ds2.show()spark.stop()} 
运行结果 
-----
|value|
-----
|    1|
|    2|
|    3|
|    4|
|    5|
-------------
|   value|
--------
| Tom, 21|
|Mike, 25|
|Andy, 18|
-------- 
2、通过 toDS 方法生成 
import org.apache.spark.sql.{Dataset, SparkSession}object DataSetCreate {//case类一定要写到main方法之外case class Person(name:String,age:Int)def main(args: Array[String]): Unit  {//local代表本地单线程模式 local[*]代表本地多线程模式val spark  SparkSession.builder().appName(create dataset).master(local[*]).getOrCreate()//一定要导入 SparkSession对象下的implicitsimport spark.implicits._val data  List(Person(Tom,21),Person(Andy,22))val ds: Dataset[Person]  data.toDS()ds.show()spark.stop()}
}运行结果 
-------
|name|age|
-------
| Tom| 21|
|Andy| 22|
------- 
3、通过DataFrame 转换生成 
需要注意json中的数 object DataSetCreate{case class Person(name:String,age:Long,sex:String)
def main(args: Array[String]): Unit  {//local代表本地单线程模式 local[*]代表本地多线程模式val spark  SparkSession.builder().appName(create dataset).master(local[*]).getOrCreate()import spark.implicits._val df  spark.read.json(data/sql/people.json)val ds  df.as[Person]ds.show()spark.stop()}
} 
RDD、DataFrame 和 DataSet 之间的相互转换 
RDD  DataFrame 
RDD 转 DataFrame 也就是上一篇博客中介绍的两种方法 能创建case类就直接映射出一个RDD[Person]然后调用toDF方法利用反射机制推断RDD模式。无法创建case类就使用编程方式定义RDD模式使用 createDataFrame(rowRDD,schema) 指定rowRDDRDD[Row]和schemaStructType。DataFrame 转 RDD直接使用 rdd() 方法。 
package com.study.spark.core.sqlimport org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}object TransForm {case class Person(name:String,age:Int)  //txt文件age字段可以用Int,但json文件尽量用Longdef main(args: Array[String]): Unit  {val spark  SparkSession.builder().appName(transform).master(local[*]).getOrCreate()import spark.implicits._//1.RDD和DataFrame之间互相转换//1.1 创建RDD对象val rdd: RDD[Person]  spark.sparkContext.textFile(data/sql/people.txt).map(_.split(,)).map(attr  Person(attr(0), attr(1).trim.toInt))rdd.foreach(println)/*Person(Andy,18)Person(Tom,21)Person(Mike,25)*///1.2 RDD转DataFrameval df  rdd.toDF()df.show()/*-------|name|age|-------| Tom| 21||Mike| 25||Andy| 18|-------*///1.3 DataFrame转RDDval res: RDD[Row]  df.rdd/*[Andy,18][Tom,21][Mike,25]*/res.foreach(println)spark.stop()}
}可以看到RDD[Person]转为DataFrame后再从DataFrame转回RDD就变成了RDD[Row] 类型了。 RDD  DataSet 
RDD 和 DataSet 之间的转换比较简单 
RDD 转 DataSet 直接使用case 类比如Person然后映射出 RDD[Person] 直接调用 toDS方法。DataSet 转 RDD 直接调用 rdd方法即可。 import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}object TransForm {case class Person(name:String,age:Int)  //txt文件age字段可以用Int,但json文件尽量用Longdef main(args: Array[String]): Unit  {val spark  SparkSession.builder().appName(transform).master(local[*]).getOrCreate()import spark.implicits._//1.RDD和DataSet之间互相转换//1.1 创建RDD对象val rdd: RDD[Person]  spark.sparkContext.textFile(data/sql/people.txt).map(_.split(,)).map(attr  Person(attr(0), attr(1).trim.toInt))rdd.foreach(println)/*Person(Andy,18)Person(Tom,21)Person(Mike,25)*///1.2 RDD转DataSetval ds  rdd.toDS()ds.show()/*-------|name|age|-------| Tom| 21||Mike| 25||Andy| 18|-------*///1.3 DataFrame转RDDval res: RDD[Person]  ds.rddres.foreach(println)/*Person(Andy,18)Person(Tom,21)Person(Mike,25)*/spark.stop()}
}可以看到相比RDD和DataFrame互相转换RDD和DataSet转换的过程中不会有数据类型的变化而DataFrame转RDD的过程就会把我们定义的case类转为Row对象。 
DataFrame  DataSet 
DataFrame 转 DataSet 先使用case类然后直接使用 as[case 类] 方法。DataSet 转 DataFrame 直接使用 toDF 方法。 
import org.apache.spark.sql.{DataFrame, Row, SparkSession}object TransForm {case class Person(name:String,age:Long,sex:String)  //txt文件age字段可以用Int,但json文件尽量用Longdef main(args: Array[String]): Unit  {val spark  SparkSession.builder().appName(transform).master(local[*]).getOrCreate()import spark.implicits._//1.DataFrame和DataSet之间互相转换//1.1 创建DataFrame对象val df  spark.read.json(data/sql/people.json)df.show()/*----------------|age|      name|sex|----------------| 30|   Michael| 男|| 19|      Andy| 女|| 19|    Justin| 男|| 20|Bernadette| 女|| 23|  Gretchen| 女|| 27|     David| 男|| 33|    Joseph| 女|| 27|     Trish| 女|| 33|      Alex| 女|| 25|       Ben| 男|----------------*///1.2 DataFrame转DataSetval ds  df.as[Person]ds.show()/*----------------|age|      name|sex|----------------| 30|   Michael| 男|| 19|      Andy| 女|| 19|    Justin| 男|| 20|Bernadette| 女|| 23|  Gretchen| 女|| 27|     David| 男|| 33|    Joseph| 女|| 27|     Trish| 女|| 33|      Alex| 女|| 25|       Ben| 男|----------------*///1.3 DataSet转DataFrameval res: DataFrame  ds.toDF()res.show()/*----------------|age|      name|sex|----------------| 30|   Michael| 男|| 19|      Andy| 女|| 19|    Justin| 男|| 20|Bernadette| 女|| 23|  Gretchen| 女|| 27|     David| 男|| 33|    Joseph| 女|| 27|     Trish| 女|| 33|      Alex| 女|| 25|       Ben| 男|----------------*/spark.stop()}
}DataSet 实现 WordCount 
def main(args: Array[String]): Unit  {val spark  SparkSession.builder().appName(create dataset).master(local[*]).getOrCreate()import spark.implicits._val res: Dataset[(String, Long)]  spark.read.text(data/word.txt).as[String].flatMap(_.split( )).groupByKey(_.toLowerCase).count()res.show()spark.stop()} 
运行结果: 
|   key|count(1)|
--------------
|  fast|       1|
|    is|       3|
| spark|       2|
|better|       1|
|  good|       1|
|hadoop|       1|
-------------- 总结 剩下来就是不断练习各种DataFrame和DataSet的操作、熟悉各种转换和行动操作。