内网门户网站 建设方案,游戏开发大亨最佳搭配,wordpress全自动赚钱,广告传媒公司介绍使用转换算子是产生一个新的rdd#xff0c;此时在driver端会生成一个逻辑上的执行计划#xff0c;但任务还没有执行。但所谓的行动算子#xff0c;其实就是触发作业执行的方法#xff08;runJob#xff09;。底层代码调用的是环境对象的runJob方法。 1. reduce 
函数源码此时在driver端会生成一个逻辑上的执行计划但任务还没有执行。但所谓的行动算子其实就是触发作业执行的方法runJob。底层代码调用的是环境对象的runJob方法。 1. reduce 
函数源码 
def reduce(f: (T, T)  T): T  withScope {val cleanF  sc.clean(f)val reducePartition: Iterator[T]  Option[T]  iter  {if (iter.hasNext) {Some(iter.reduceLeft(cleanF))} else {None}}var jobResult: Option[T]  Noneval mergeResult  (_: Int, taskResult: Option[T])  {if (taskResult.isDefined) {jobResult  jobResult match {case Some(value)  Some(f(value, taskResult.get))case None  taskResult}}}sc.runJob(this, reducePartition, mergeResult)// Get the final result out of our Option, or throw an exception if the RDD was emptyjobResult.getOrElse(throw new UnsupportedOperationException(empty collection))} 
函数说明 
简而言之就是先聚合分区内的数据再聚合分区间的数据。 
object Spark01_RDD_reduce_action {def main(args: Array[String]): Unit  {val sparkConf  new SparkConf().setMaster(local[*]).setAppName(rdd_action)val sc  new SparkContext(sparkConf)val rdd  sc.makeRDD(List(1, 2, 3, 4), 2)val i  rdd.reduce(_  _)println(i)// 10// 分区数据:[1, 2], [3, 4]// reduce聚集分区的所有元素先聚合分区内的数据再聚合分区间的数据// [1, 2]3, [3, 4]7    3  7  10}
} 
2. collect 
函数源码 
def collect(): Array[T]  withScope {val results  sc.runJob(this, (iter: Iterator[T])  iter.toArray)Array.concat(results: _*)} 
函数说明 
在驱动程序中以数组Array的形式返回数据集的所有元素。 
object Spark02_RDD_collect_action {def main(args: Array[String]): Unit  {val sparkConf  new SparkConf().setMaster(local[*]).setAppName(rdd_action)val sc  new SparkContext(sparkConf)val rdd  sc.makeRDD(List(1, 2, 3, 4), 2)val mapRDD  rdd.map(_ * 2)// 代码运行到collect时才开始触发执行任务。在这之前只是在driver构建一个逻辑的执行计划。// collect源码存在runJob函数。println(mapRDD.collect().mkString(,))// 将executor端的分区内的数据按分区有序的生成一个数组并返回到driver端// 调用collect函数的输出2,4,6,8mapRDD.foreach(println)// 不调用collect函数的输出// 2    6     8     4无序}
} 
3. count 和 first 
函数源码 
def count(): Long  sc.runJob(this, Utils.getIteratorSize _).sumdef first(): T  withScope {take(1) match {case Array(t)  tcase _  throw new UnsupportedOperationException(empty collection)}} 
first函数底层调用了take函数take函数底层调用了runJob函数所以first也是行动算子。 
函数说明 
object Spark03_RDD_count_first_action {def main(args: Array[String]): Unit  {val sparkConf  new SparkConf().setMaster(local[*]).setAppName(rdd_action)val sc  new SparkContext(sparkConf)val rdd  sc.makeRDD(List(1, 2, 3, 4), 2)// count函数获取rdd的数据的个数val cnt  rdd.count()// 4println(cnt)// first获取数据源中数据的第一个元素val first  rdd.first()println(first)// 1sc.stop()}
} 
4. take和takeOrdered 
函数源码 
def take(num: Int): Array[T]  withScope {val scaleUpFactor  Math.max(conf.get(RDD_LIMIT_SCALE_UP_FACTOR), 2)if (num  0) {new Array[T](0)} else {val buf  new ArrayBuffer[T]val totalParts  this.partitions.lengthvar partsScanned  0while (buf.size  num  partsScanned  totalParts) {// The number of partitions to try in this iteration. It is ok for this number to be// greater than totalParts because we actually cap it at totalParts in runJob.var numPartsToTry  1Lval left  num - buf.sizeif (partsScanned  0) {// If we didnt find any rows after the previous iteration, quadruple and retry.// Otherwise, interpolate the number of partitions we need to try, but overestimate// it by 50%. We also cap the estimation in the end.if (buf.isEmpty) {numPartsToTry  partsScanned * scaleUpFactor} else {// As left  0, numPartsToTry is always  1numPartsToTry  Math.ceil(1.5 * left * partsScanned / buf.size).toIntnumPartsToTry  Math.min(numPartsToTry, partsScanned * scaleUpFactor)}}val p  partsScanned.until(math.min(partsScanned  numPartsToTry, totalParts).toInt)val res  sc.runJob(this, (it: Iterator[T])  it.take(left).toArray, p)res.foreach(buf  _.take(num - buf.size))partsScanned  p.size}buf.toArray}} 
函数说明 
object Spark05_RDD_take_takeOrdered_action {def main(args: Array[String]): Unit  {val sparkConf  new SparkConf().setMaster(local[*]).setAppName(rdd_action)val sc  new SparkContext(sparkConf)val rdd  sc.makeRDD(List(4, 3, 2, 1, 6, 7), 3)println(rdd.take(3).mkString( ))// 4, 3, 2// take方法返回的是一个数组// 从数据源取前N个数据println(rdd.takeOrdered(3).mkString(, ))// 1 2 3// takeOrdered方法返回的是一个有序数组第二个参数可以传入排序规则。// 源码中的Ordering特质继承自Comparator[T]即相当于java中的比较器。// 从数据源获取前N个有序的数据sc.stop()}
} 5. aggregate 
函数源码 
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T)  U, combOp: (U, U)  U): U  withScope {// Clone the zero value since we will also be serializing it as part of tasksvar jobResult  Utils.clone(zeroValue, sc.env.serializer.newInstance())val cleanSeqOp  sc.clean(seqOp)val cleanCombOp  sc.clean(combOp)val aggregatePartition  (it: Iterator[T])  it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)val mergeResult  (_: Int, taskResult: U)  jobResult  combOp(jobResult, taskResult)sc.runJob(this, aggregatePartition, mergeResult)jobResult} 
函数说明 
分区的数据通过初始值和分区内的数据进行聚合然后再和初始值进行分区间的数据聚合。 
object Spark04_RDD_aggregate_action {def main(args: Array[String]): Unit  {val sparkConf  new SparkConf().setMaster(local[*]).setAppName(rdd_action)val sc  new SparkContext(sparkConf)val rdd  sc.makeRDD(List(1, 2, 3, 4), 2)val i  rdd.aggregate(0)(_  _, _  _)println(i)// 10// 0  1  2  3, 0  3  4  7// 0  3  7  10val i1  rdd.aggregate(10)(_  _, _  _)println(i1)// 40// 10  1  2  13,  10  3  4  17// 10  13  17  40sc.stop()}
} 
6. fold 
函数源码; 
略。 
函数说明 
折叠操作aggregate的简化版操作。 
即aggregate的分区内计算的逻辑和分区间计算的逻辑相同。 
7. countByKey和countByValue 
countByKey函数源码 
def countByKey(): Map[K, Long]  self.withScope {self.mapValues(_  1L).reduceByKey(_  _).collect().toMap} 
返回值类型是map集合。即Map[K, long] 
K即是key-value的key类型Long则是key出现的次数。 
countByValue函数源码 
def countByValue()(implicit ord: Ordering[T]  null): Map[T, Long]  withScope {map(value  (value, null)).countByKey()} 
得知底层调用了map方法将rdd的每个数据映射为一个元组然后调用countByKey方法。 
object Spark06_RDD_countByKey_countByValue_action {def main(args: Array[String]): Unit  {val sparkConf  new SparkConf().setMaster(local[*]).setAppName(rdd_action)val sc  new SparkContext(sparkConf)val rdd  sc.makeRDD(List((a, 1), (b, 2), (c, 3), (a, 2)))// 返回的是Map集合key是key-value的keyvalue是key出现的countval rdd_key  rdd.countByKey()println(rdd_key)// Map(a - 2, b - 1, c - 1)// a作为key出现了两次b作为key出现了一次...val rdd_value  rdd.countByValue()println(rdd_value)// Map((a,2) - 1, (b,2) - 1, (c,3) - 1, (a,1) - 1)// 底层调用了map方法和countbyKey方法sc.stop()}
} 
8. save相关算子 
save相关算子包括 
saveAsTextFile, saveAsObjectFile, saveAsSequenceFile 
函数源码 
def saveAsTextFile(path: String): Unit  withScope {saveAsTextFile(path, null)}def saveAsObjectFile(path: String): Unit  withScope {this.mapPartitions(iter  iter.grouped(10).map(_.toArray)).map(x  (NullWritable.get(), new BytesWritable(Utils.serialize(x)))).saveAsSequenceFile(path)}def saveAsSequenceFile(path: String,codec: Option[Class[_ : CompressionCodec]]  None): Unit  self.withScope {def anyToWritable[U % Writable](u: U): Writable  u// TODO We cannot force the return type of anyToWritable be same as keyWritableClass and// valueWritableClass at the compile time. To implement that, we need to add type parameters to// SequenceFileRDDFunctions. however, SequenceFileRDDFunctions is a public class so it will be a// breaking change.val convertKey  self.keyClass ! _keyWritableClassval convertValue  self.valueClass ! _valueWritableClasslogInfo(Saving as sequence file of type  s(${_keyWritableClass.getSimpleName},${_valueWritableClass.getSimpleName}) )val format  classOf[SequenceFileOutputFormat[Writable, Writable]]val jobConf  new JobConf(self.context.hadoopConfiguration)if (!convertKey  !convertValue) {self.saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)} else if (!convertKey  convertValue) {self.map(x  (x._1, anyToWritable(x._2))).saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)} else if (convertKey  !convertValue) {self.map(x  (anyToWritable(x._1), x._2)).saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)} else if (convertKey  convertValue) {self.map(x  (anyToWritable(x._1), anyToWritable(x._2))).saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)}}
}函数说明 
object Spark06_RDD_save_action {def main(args: Array[String]): Unit  {val sparkConf  new SparkConf().setMaster(local[*]).setAppName(rdd_action)val sc  new SparkContext(sparkConf)val rdd  sc.makeRDD(List((a, 1), (b, 2), (c, 3), (a, 2)))// 保存在文件中rdd.saveAsTextFile(datas)// 序列化成对象保存在文件中rdd.saveAsObjectFile(datas)// 要求数据的格式必须为key-value类型rdd.saveAsSequenceFile(datas)sc.stop()}
} 
9. foreach 
函数源码 
底层一直在调用runJob函数。 
def foreach(f: T  Unit): Unit  withScope {val cleanF  sc.clean(f)sc.runJob(this, (iter: Iterator[T])  iter.foreach(cleanF))} 
函数说明 
分布式遍历RDD中的每一个元素调用该函数。 
object Spark07_RDD_foreach_action {def main(args: Array[String]): Unit  {val sparkConf  new SparkConf().setMaster(local[*]).setAppName(rdd_action)val sc  new SparkContext(sparkConf)val rdd  sc.makeRDD(List((a, 1), (b, 2), (c, 3), (a, 2)), 2)rdd.collect().foreach(println)// collect会按分区建立的顺序把数据采集过来到driver端
//    (a,1)
//    (b,2)
//    (c,3)
//    (a,2)println(*************)// 而foreach直接在executor端内存数据的打印rdd.foreach(println)
//    (c,3)//(a,2)//(a,1)//(b,2)sc.stop()}
}