深圳网站. 方维网络,手机网站功能开发方案,搭建网站的网站,wordpress给文章设置标签spark内存计算框架 一、目标
深入理解RDD弹性分布式数据集底层原理掌握RDD弹性分布式数据集的常用算子操作
二、要点
⭐️1. RDD是什么
RDD#xff08;Resilient Distributed Dataset#xff09;叫做**弹性分布式数据集#xff0c;是Spark中最基本的数据抽象#xff0c…spark内存计算框架 一、目标
深入理解RDD弹性分布式数据集底层原理掌握RDD弹性分布式数据集的常用算子操作
二、要点
⭐️1. RDD是什么
RDDResilient Distributed Dataset叫做**弹性分布式数据集是Spark中最基本的数据抽象它代表一个不可变、可分区、里面的元素可并行计算**的集合. Dataset: 就是一个集合存储很多数据.Distributed它内部的元素进行了分布式存储方便于后期进行分布式计算.Resilient 表示弹性rdd的数据是可以保存在内存或者是磁盘中.
⭐️2. RDD的五大属性 1A list of partitions 一个分区Partition列表数据集的基本组成单位。 这里表示一个rdd有很多分区每一个分区内部是包含了该rdd的部分数据
spark中任务是以task线程的方式运行 一个分区就对应一个task线程。用户可以在创建RDD时指定RDD的分区个数如果没有指定那么就会采用默认值。val rddsparkContext.textFile(/words.txt)如果该文件的block块个数小于等于2这里生产的RDD分区数就为2如果该文件的block块个数大于2这里生产的RDD分区数就与block块个数保持一致
2A function for computing each split 一个计算每个分区的函数 Spark中RDD的计算是以分区为单位的每个RDD都会实现compute计算函数以达到这个目的.3A list of dependencies on other RDDs 一个rdd会依赖于其他多个rdd 这里就涉及到rdd与rdd之间的依赖关系spark任务的容错机制就是根据这个特性血统而来。
4Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) 一个Partitioner即RDD的分区函数可选项
当前Spark中实现了两种类型的分区函数
一个是基于哈希的HashPartitioner(key.hashcode % 分区数 分区号)
另外一个是基于范围的RangePartitioner。
只有对于key-value的RDD,并且产生shuffle才会有Partitioner非key-value的RDD的Parititioner的值是None。5Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file) 一个列表存储每个Partition的优先位置(可选项)
这里涉及到数据的本地性数据块位置最优。
spark任务在调度的时候会优先考虑存有数据的节点开启计算任务减少数据的网络传输提升计算效率。3. 基于spark的单词统计程序剖析rdd的五大属性 需求 HDFS上有一个大小为300M的文件通过spark实现文件单词统计最后把结果数据保存到HDFS上代码 sc.textFile(/words.txt).flatMap(_.split( )).map((_,1)).reduceByKey(__).saveAsTextFile(/out)流程分析 4. RDD的创建方式 1、通过已经存在的scala集合去构建 val rdd1sc.parallelize(List(1,2,3,4,5))
val rdd2sc.parallelize(Array(hadoop,hive,spark))
val rdd3sc.makeRDD(List(1,2,3,4))2、加载外部的数据源去构建 val rdd1sc.textFile(/words.txt)3、从已经存在的rdd进行转换生成一个新的rdd val rdd2rdd1.flatMap(_.split( ))
val rdd3rdd2.map((_,1))⭐️5. RDD的算子分类
1、transformation转换 根据已经存在的rdd转换生成一个新的rdd, 它是延迟加载它不会立即执行例如 map / flatMap / reduceByKey 等 2、action (动作) 它会真正触发任务的运行 将rdd的计算的结果数据返回给Driver端或者是保存结果数据到外部存储介质中 例如 collect / saveAsTextFile 等
6. RDD常见的算子操作说明
6.1 transformation算子
转换含义map(func)返回一个新的RDD该RDD由每一个输入元素经过func函数转换后组成filter(func)返回一个新的RDD该RDD由经过func函数计算后返回值为true的输入元素组成flatMap(func)类似于map但是每一个输入元素可以被映射为0或多个输出元素所以func应该返回一个序列而不是单一元素mapPartitions(func)类似于map但独立地在RDD的每一个分片上运行因此在类型为T的RDD上运行时func的函数类型必须是Iterator[T] Iterator[U]mapPartitionsWithIndex(func)类似于mapPartitions但func带有一个整数参数表示分片的索引值因此在类型为T的RDD上运行时func的函数类型必须是(Int, Interator[T]) Iterator[U]union(otherDataset)对源RDD和参数RDD求并集后返回一个新的RDDintersection(otherDataset)对源RDD和参数RDD求交集后返回一个新的RDDdistinct([numTasks]))对源RDD进行去重后返回一个新的RDDgroupByKey([numTasks])在一个(K,V)的RDD上调用返回一个(K, Iterator[V])的RDDreduceByKey(func, [numTasks])在一个(K,V)的RDD上调用返回一个(K,V)的RDD使用指定的reduce函数将相同key的值聚合到一起与groupByKey类似reduce任务的个数可以通过第二个可选的参数来设置sortByKey([ascending], [numTasks])在一个(K,V)的RDD上调用K必须实现Ordered接口返回一个按照key进行排序的(K,V)的RDDsortBy(func,[ascending], [numTasks])与sortByKey类似但是更灵活join(otherDataset, [numTasks])在类型为(K,V)和(K,W)的RDD上调用返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDDcogroup(otherDataset, [numTasks])在类型为(K,V)和(K,W)的RDD上调用返回一个(K,(Iterable,Iterable))类型的RDDcoalesce(numPartitions)减少 RDD 的分区数到指定值。repartition(numPartitions)重新给 RDD 分区repartitionAndSortWithinPartitions(partitioner)重新给 RDD 分区并且每个分区内以记录的 key 排序
6.2 action算子
动作含义reduce(func)reduce将RDD中元素前两个传给输入函数产生一个新的return值新产生的return值与RDD中下一个元素第三个元素组成两个元素再被传给输入函数直到最后只有一个值为止。collect()在驱动程序中以数组的形式返回数据集的所有元素count()返回RDD的元素个数first()返回RDD的第一个元素类似于take(1)take(n)返回一个由数据集的前n个元素组成的数组takeOrdered(n, [ordering])返回自然顺序或者自定义顺序的前 n 个元素saveAsTextFile(path)将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统对于每个元素Spark将会调用toString方法将它装换为文件中的文本saveAsSequenceFile(path)将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下可以使HDFS或者其他Hadoop支持的文件系统。saveAsObjectFile(path)将数据集的元素以 Java 序列化的方式保存到指定的目录下countByKey()针对(K,V)类型的RDD返回一个(K,Int)的map表示每一个key对应的元素个数。⭐️foreach(func)在数据集的每一个元素上运行函数func⭐️foreachPartition(func)在数据集的每一个分区上运行函数func
7. RDD常用的算子操作演示 为了方便前期的测试和学习可以使用spark-shell进行演示 spark-shell --master local[2]7.1 mapTrans转换算子
**map(func)**返回一个新的RDD该RDD由每一个输入元素经过func函数转换后组成
val rdd1 sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))//把rdd1中每一个元素乘以10
rdd1.map(_*10).collect7.2 filterTrans转换算子
**filter(func)**返回一个新的RDD该RDD由经过func函数计算后返回值为true的输入元素组成
val rdd1 sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))//把rdd1中大于5的元素进行过滤
rdd1.filter(x x 5).collect7.3 flatMapTrans转换算子
flatMap(func) 类似于map但是每一个输入元素可以被映射为0或多个输出元素所以func应该返回一个序列而不是单一元素
val rdd1 sc.parallelize(Array(a b c, d e f, h i j))
//获取rdd1中元素的每一个字母
rdd1.flatMap(_.split( )).collect7.4 intersection、unionTrans转换算子
union(otherDataset) 对源RDD和参数RDD求并集后返回一个新的RDD
intersection(otherDataset) 对源RDD和参数RDD求交集后返回一个新的RDD
val rdd1 sc.parallelize(List(5, 6, 4, 3))
val rdd2 sc.parallelize(List(1, 2, 3, 4))
//求交集
rdd1.intersection(rdd2).collect//求并集
rdd1.union(rdd2).collect7.5 distinctTrans转换算子
distinct([numTasks])) 对源RDD进行去重后返回一个新的RDD
val rdd1 sc.parallelize(List(1,1,2,3,3,4,5,6,7))
//去重
rdd1.distinct7.6 join、groupByKeyTrans转换算子
join(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
groupByKey([numTasks]) 在一个(K,V)的RDD上调用返回一个(K, Iterator[V])的RDD
val rdd1 sc.parallelize(List((tom, 1), (jerry, 3), (kitty, 2)))
val rdd2 sc.parallelize(List((jerry, 2), (tom, 1), (shuke, 2)))
//求join
val rdd3 rdd1.join(rdd2)
rdd3.collect
//求并集
val rdd4 rdd1 union rdd2
rdd4.groupByKey.collect7.7 cogroupTrans转换算子
cogroup(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用返回一个(K,(Iterable,Iterable))类型的RDD
collect() 在驱动程序中以数组的形式返回数据集的所有元素
val rdd1 sc.parallelize(List((tom, 1), (tom, 2), (jerry, 3), (kitty, 2)))
val rdd2 sc.parallelize(List((jerry, 2), (tom, 1), (jim, 2)))
//分组
val rdd3 rdd1.cogroup(rdd2)
rdd3.collect
//
//res0: Array[(String, (Iterable[Int], Iterable[Int]))]
//Array(
// (jim,(CompactBuffer(),CompactBuffer(2))),
// (tom,(CompactBuffer(1, 2),CompactBuffer(1))),
// (jerry,(CompactBuffer(3),CompactBuffer(2))),
// (kitty,(CompactBuffer(2),CompactBuffer()))
// )7.8 reduce Action动作算子
reduce(func) reduce将RDD中元素前两个传给输入函数产生一个新的return值新产生的return值与RDD中下一个元素第三个元素组成两个元素再被传给输入函数直到最后只有一个值为止。
val rdd1 sc.parallelize(List(1, 2, 3, 4, 5))//reduce聚合
val rdd2 rdd1.reduce(_ _)
rdd2.collectval rdd3 sc.parallelize(List(1,2,3,4,5))
rdd3.reduce(__)这里可能会出现多个不同的结果由于元素在不同的分区中每一个分区都是一个独立的task线程去运行。这些task运行有先后关系7.9 reduceByKey、sortByKeyTrans转换算子
groupByKey([numTasks]) 在一个(K,V)的RDD上调用返回一个(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]) 在一个(K,V)的RDD上调用返回一个(K,V)的RDD使用指定的reduce函数将相同key的值聚合到一起与groupByKey类似reduce任务的个数可以通过第二个可选的参数来设置 不同于groupByKey()reduceByKey会在map端join
sortByKey([ascending], [numTasks]) 在一个(K,V)的RDD上调用K必须实现Ordered接口返回一个按照key进行排序的(K,V)的RDD
val rdd1 sc.parallelize(List((tom, 1), (jerry, 3), (kitty, 2), (shuke, 1)))
val rdd2 sc.parallelize(List((jerry, 2), (tom, 3), (shuke, 2), (kitty, 5)))
val rdd3 rdd1.union(rdd2)//按key进行聚合
val rdd4 rdd3.reduceByKey(_ _)
rdd4.collect//按value的降序排序
val rdd5 rdd4.map(t (t._2, t._1)).sortByKey(false).map(t (t._2, t._1))
rdd5.collect7.10 repartition、coalesceTrans转换算子
coalesce(numPartitions) 减少 RDD 的分区数到指定值默认不会产生shuffle传入true开启shuffle
repartition(numPartitions) 重新给 RDD 分区会产生shuffle 相当于coalesce(numPatitions,true)**
val rdd1 sc.parallelize(1 to 10,3)
//打印rdd1的分区数
rdd1.partitions.size//利用repartition改变rdd1分区数
//减少分区
rdd1.repartition(2).partitions.size//增加分区
rdd1.repartition(4).partitions.size//利用coalesce改变rdd1分区数
//减少分区
rdd1.coalesce(2).partitions.size//repartition: 重新分区 有shuffle
//coalesce: 合并分区 / 减少分区 默认不shuffle
//默认 coalesce 不能扩大分区数量。除非添加true的参数或者使用repartition。//适用场景//1、如果要shuffle都用 repartition//2、不需要shuffle仅仅是做分区的合并coalesce//3、repartition常用于扩大分区。
⭐️7.11 map、mapPartitions 、mapPartitionsWithIndexTrans转换算子
map(func) 返回一个新的RDD该RDD由每一个输入元素经过func函数转换后组成
mapPartitions(func) 类似于map但独立地在RDD的每一个分片上运行因此在类型为T的RDD上运行时func的函数类型必须是Iterator[T] Iterator[U]
mapPartitionsWithIndex(func) 类似于mapPartitions但func带有一个整数参数表示分片的索引值因此在类型为T的RDD上运行时func的函数类型必须是(Int, Interator[T]) Iterator[U]
val rdd1sc.parallelize(1 to 10,5)
rdd1.map(x x*10)).collect
rdd1.mapPartitions(iter iter.map(xx*10)).collect//index表示分区号 可以获取得到每一个元素属于哪一个分区
rdd1.mapPartitionsWithIndex((index,iter)iter.map(x(index,x)))map用于遍历RDD,将函数f应用于每一个元素返回新的RDD(transformation算子)。
mapPartitions:用于遍历操作RDD中的每一个分区返回生成一个新的RDDtransformation算子。总结
如果在映射的过程中需要频繁创建额外的对象使用mapPartitions要比map高效
比如将RDD中的所有数据通过JDBC连接写入数据库如果使用map函数可能要为每一个元素都创建一个connection这样开销很大如果使用mapPartitions那么只需要针对每一个分区建立一个connection。⭐️7.12 foreach、foreachPartition Action动作算子
foreach(func) 在数据集的每一个元素上运行函数func
foreachPartition(func) 在数据集的每一个分区上运行函数func
val rdd1 sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))//foreach实现对rdd1里的每一个元素乘10然后打印输出
rdd1.foreach(xprintln(x * 10))//foreachPartition实现对rdd1里的每一个元素乘10然后打印输出
rdd1.foreachPartition(iter iter.foreach(xprintln(x * 10)))foreach:用于遍历RDD,将函数f应用于每一个元素无返回值(action算子)。
foreachPartition: 用于遍历操作RDD中的每一个分区。无返回值(action算子)。总结
一般使用mapPartitions或者foreachPartition算子比map和foreach更加高效推荐使用。