建设文明网 联盟网站的,wordpress增加广告,东莞找做网站的,美团网站除佣金表格怎么做使用转换算子是产生一个新的rdd#xff0c;此时在driver端会生成一个逻辑上的执行计划#xff0c;但任务还没有执行。但所谓的行动算子#xff0c;其实就是触发作业执行的方法#xff08;runJob#xff09;。底层代码调用的是环境对象的runJob方法。 1. reduce
函数源码此时在driver端会生成一个逻辑上的执行计划但任务还没有执行。但所谓的行动算子其实就是触发作业执行的方法runJob。底层代码调用的是环境对象的runJob方法。 1. reduce
函数源码
def reduce(f: (T, T) T): T withScope {val cleanF sc.clean(f)val reducePartition: Iterator[T] Option[T] iter {if (iter.hasNext) {Some(iter.reduceLeft(cleanF))} else {None}}var jobResult: Option[T] Noneval mergeResult (_: Int, taskResult: Option[T]) {if (taskResult.isDefined) {jobResult jobResult match {case Some(value) Some(f(value, taskResult.get))case None taskResult}}}sc.runJob(this, reducePartition, mergeResult)// Get the final result out of our Option, or throw an exception if the RDD was emptyjobResult.getOrElse(throw new UnsupportedOperationException(empty collection))}
函数说明
简而言之就是先聚合分区内的数据再聚合分区间的数据。
object Spark01_RDD_reduce_action {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(rdd_action)val sc new SparkContext(sparkConf)val rdd sc.makeRDD(List(1, 2, 3, 4), 2)val i rdd.reduce(_ _)println(i)// 10// 分区数据:[1, 2], [3, 4]// reduce聚集分区的所有元素先聚合分区内的数据再聚合分区间的数据// [1, 2]3, [3, 4]7 3 7 10}
}
2. collect
函数源码
def collect(): Array[T] withScope {val results sc.runJob(this, (iter: Iterator[T]) iter.toArray)Array.concat(results: _*)}
函数说明
在驱动程序中以数组Array的形式返回数据集的所有元素。
object Spark02_RDD_collect_action {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(rdd_action)val sc new SparkContext(sparkConf)val rdd sc.makeRDD(List(1, 2, 3, 4), 2)val mapRDD rdd.map(_ * 2)// 代码运行到collect时才开始触发执行任务。在这之前只是在driver构建一个逻辑的执行计划。// collect源码存在runJob函数。println(mapRDD.collect().mkString(,))// 将executor端的分区内的数据按分区有序的生成一个数组并返回到driver端// 调用collect函数的输出2,4,6,8mapRDD.foreach(println)// 不调用collect函数的输出// 2 6 8 4无序}
}
3. count 和 first
函数源码
def count(): Long sc.runJob(this, Utils.getIteratorSize _).sumdef first(): T withScope {take(1) match {case Array(t) tcase _ throw new UnsupportedOperationException(empty collection)}}
first函数底层调用了take函数take函数底层调用了runJob函数所以first也是行动算子。
函数说明
object Spark03_RDD_count_first_action {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(rdd_action)val sc new SparkContext(sparkConf)val rdd sc.makeRDD(List(1, 2, 3, 4), 2)// count函数获取rdd的数据的个数val cnt rdd.count()// 4println(cnt)// first获取数据源中数据的第一个元素val first rdd.first()println(first)// 1sc.stop()}
}
4. take和takeOrdered
函数源码
def take(num: Int): Array[T] withScope {val scaleUpFactor Math.max(conf.get(RDD_LIMIT_SCALE_UP_FACTOR), 2)if (num 0) {new Array[T](0)} else {val buf new ArrayBuffer[T]val totalParts this.partitions.lengthvar partsScanned 0while (buf.size num partsScanned totalParts) {// The number of partitions to try in this iteration. It is ok for this number to be// greater than totalParts because we actually cap it at totalParts in runJob.var numPartsToTry 1Lval left num - buf.sizeif (partsScanned 0) {// If we didnt find any rows after the previous iteration, quadruple and retry.// Otherwise, interpolate the number of partitions we need to try, but overestimate// it by 50%. We also cap the estimation in the end.if (buf.isEmpty) {numPartsToTry partsScanned * scaleUpFactor} else {// As left 0, numPartsToTry is always 1numPartsToTry Math.ceil(1.5 * left * partsScanned / buf.size).toIntnumPartsToTry Math.min(numPartsToTry, partsScanned * scaleUpFactor)}}val p partsScanned.until(math.min(partsScanned numPartsToTry, totalParts).toInt)val res sc.runJob(this, (it: Iterator[T]) it.take(left).toArray, p)res.foreach(buf _.take(num - buf.size))partsScanned p.size}buf.toArray}}
函数说明
object Spark05_RDD_take_takeOrdered_action {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(rdd_action)val sc new SparkContext(sparkConf)val rdd sc.makeRDD(List(4, 3, 2, 1, 6, 7), 3)println(rdd.take(3).mkString( ))// 4, 3, 2// take方法返回的是一个数组// 从数据源取前N个数据println(rdd.takeOrdered(3).mkString(, ))// 1 2 3// takeOrdered方法返回的是一个有序数组第二个参数可以传入排序规则。// 源码中的Ordering特质继承自Comparator[T]即相当于java中的比较器。// 从数据源获取前N个有序的数据sc.stop()}
} 5. aggregate
函数源码
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) U, combOp: (U, U) U): U withScope {// Clone the zero value since we will also be serializing it as part of tasksvar jobResult Utils.clone(zeroValue, sc.env.serializer.newInstance())val cleanSeqOp sc.clean(seqOp)val cleanCombOp sc.clean(combOp)val aggregatePartition (it: Iterator[T]) it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)val mergeResult (_: Int, taskResult: U) jobResult combOp(jobResult, taskResult)sc.runJob(this, aggregatePartition, mergeResult)jobResult}
函数说明
分区的数据通过初始值和分区内的数据进行聚合然后再和初始值进行分区间的数据聚合。
object Spark04_RDD_aggregate_action {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(rdd_action)val sc new SparkContext(sparkConf)val rdd sc.makeRDD(List(1, 2, 3, 4), 2)val i rdd.aggregate(0)(_ _, _ _)println(i)// 10// 0 1 2 3, 0 3 4 7// 0 3 7 10val i1 rdd.aggregate(10)(_ _, _ _)println(i1)// 40// 10 1 2 13, 10 3 4 17// 10 13 17 40sc.stop()}
}
6. fold
函数源码;
略。
函数说明
折叠操作aggregate的简化版操作。
即aggregate的分区内计算的逻辑和分区间计算的逻辑相同。
7. countByKey和countByValue
countByKey函数源码
def countByKey(): Map[K, Long] self.withScope {self.mapValues(_ 1L).reduceByKey(_ _).collect().toMap}
返回值类型是map集合。即Map[K, long]
K即是key-value的key类型Long则是key出现的次数。
countByValue函数源码
def countByValue()(implicit ord: Ordering[T] null): Map[T, Long] withScope {map(value (value, null)).countByKey()}
得知底层调用了map方法将rdd的每个数据映射为一个元组然后调用countByKey方法。
object Spark06_RDD_countByKey_countByValue_action {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(rdd_action)val sc new SparkContext(sparkConf)val rdd sc.makeRDD(List((a, 1), (b, 2), (c, 3), (a, 2)))// 返回的是Map集合key是key-value的keyvalue是key出现的countval rdd_key rdd.countByKey()println(rdd_key)// Map(a - 2, b - 1, c - 1)// a作为key出现了两次b作为key出现了一次...val rdd_value rdd.countByValue()println(rdd_value)// Map((a,2) - 1, (b,2) - 1, (c,3) - 1, (a,1) - 1)// 底层调用了map方法和countbyKey方法sc.stop()}
}
8. save相关算子
save相关算子包括
saveAsTextFile, saveAsObjectFile, saveAsSequenceFile
函数源码
def saveAsTextFile(path: String): Unit withScope {saveAsTextFile(path, null)}def saveAsObjectFile(path: String): Unit withScope {this.mapPartitions(iter iter.grouped(10).map(_.toArray)).map(x (NullWritable.get(), new BytesWritable(Utils.serialize(x)))).saveAsSequenceFile(path)}def saveAsSequenceFile(path: String,codec: Option[Class[_ : CompressionCodec]] None): Unit self.withScope {def anyToWritable[U % Writable](u: U): Writable u// TODO We cannot force the return type of anyToWritable be same as keyWritableClass and// valueWritableClass at the compile time. To implement that, we need to add type parameters to// SequenceFileRDDFunctions. however, SequenceFileRDDFunctions is a public class so it will be a// breaking change.val convertKey self.keyClass ! _keyWritableClassval convertValue self.valueClass ! _valueWritableClasslogInfo(Saving as sequence file of type s(${_keyWritableClass.getSimpleName},${_valueWritableClass.getSimpleName}) )val format classOf[SequenceFileOutputFormat[Writable, Writable]]val jobConf new JobConf(self.context.hadoopConfiguration)if (!convertKey !convertValue) {self.saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)} else if (!convertKey convertValue) {self.map(x (x._1, anyToWritable(x._2))).saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)} else if (convertKey !convertValue) {self.map(x (anyToWritable(x._1), x._2)).saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)} else if (convertKey convertValue) {self.map(x (anyToWritable(x._1), anyToWritable(x._2))).saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)}}
}函数说明
object Spark06_RDD_save_action {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(rdd_action)val sc new SparkContext(sparkConf)val rdd sc.makeRDD(List((a, 1), (b, 2), (c, 3), (a, 2)))// 保存在文件中rdd.saveAsTextFile(datas)// 序列化成对象保存在文件中rdd.saveAsObjectFile(datas)// 要求数据的格式必须为key-value类型rdd.saveAsSequenceFile(datas)sc.stop()}
}
9. foreach
函数源码
底层一直在调用runJob函数。
def foreach(f: T Unit): Unit withScope {val cleanF sc.clean(f)sc.runJob(this, (iter: Iterator[T]) iter.foreach(cleanF))}
函数说明
分布式遍历RDD中的每一个元素调用该函数。
object Spark07_RDD_foreach_action {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(rdd_action)val sc new SparkContext(sparkConf)val rdd sc.makeRDD(List((a, 1), (b, 2), (c, 3), (a, 2)), 2)rdd.collect().foreach(println)// collect会按分区建立的顺序把数据采集过来到driver端
// (a,1)
// (b,2)
// (c,3)
// (a,2)println(*************)// 而foreach直接在executor端内存数据的打印rdd.foreach(println)
// (c,3)//(a,2)//(a,1)//(b,2)sc.stop()}
}