当前位置: 首页 > news >正文

建设文明网 联盟网站的wordpress增加广告

建设文明网 联盟网站的,wordpress增加广告,东莞找做网站的,美团网站除佣金表格怎么做使用转换算子是产生一个新的rdd#xff0c;此时在driver端会生成一个逻辑上的执行计划#xff0c;但任务还没有执行。但所谓的行动算子#xff0c;其实就是触发作业执行的方法#xff08;runJob#xff09;。底层代码调用的是环境对象的runJob方法。 1. reduce 函数源码此时在driver端会生成一个逻辑上的执行计划但任务还没有执行。但所谓的行动算子其实就是触发作业执行的方法runJob。底层代码调用的是环境对象的runJob方法。 1. reduce 函数源码 def reduce(f: (T, T) T): T withScope {val cleanF sc.clean(f)val reducePartition: Iterator[T] Option[T] iter {if (iter.hasNext) {Some(iter.reduceLeft(cleanF))} else {None}}var jobResult: Option[T] Noneval mergeResult (_: Int, taskResult: Option[T]) {if (taskResult.isDefined) {jobResult jobResult match {case Some(value) Some(f(value, taskResult.get))case None taskResult}}}sc.runJob(this, reducePartition, mergeResult)// Get the final result out of our Option, or throw an exception if the RDD was emptyjobResult.getOrElse(throw new UnsupportedOperationException(empty collection))} 函数说明 简而言之就是先聚合分区内的数据再聚合分区间的数据。 object Spark01_RDD_reduce_action {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(rdd_action)val sc new SparkContext(sparkConf)val rdd sc.makeRDD(List(1, 2, 3, 4), 2)val i rdd.reduce(_ _)println(i)// 10// 分区数据:[1, 2], [3, 4]// reduce聚集分区的所有元素先聚合分区内的数据再聚合分区间的数据// [1, 2]3, [3, 4]7 3 7 10} } 2. collect 函数源码 def collect(): Array[T] withScope {val results sc.runJob(this, (iter: Iterator[T]) iter.toArray)Array.concat(results: _*)} 函数说明 在驱动程序中以数组Array的形式返回数据集的所有元素。 object Spark02_RDD_collect_action {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(rdd_action)val sc new SparkContext(sparkConf)val rdd sc.makeRDD(List(1, 2, 3, 4), 2)val mapRDD rdd.map(_ * 2)// 代码运行到collect时才开始触发执行任务。在这之前只是在driver构建一个逻辑的执行计划。// collect源码存在runJob函数。println(mapRDD.collect().mkString(,))// 将executor端的分区内的数据按分区有序的生成一个数组并返回到driver端// 调用collect函数的输出2,4,6,8mapRDD.foreach(println)// 不调用collect函数的输出// 2 6 8 4无序} } 3. count 和 first 函数源码 def count(): Long sc.runJob(this, Utils.getIteratorSize _).sumdef first(): T withScope {take(1) match {case Array(t) tcase _ throw new UnsupportedOperationException(empty collection)}} first函数底层调用了take函数take函数底层调用了runJob函数所以first也是行动算子。 函数说明 object Spark03_RDD_count_first_action {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(rdd_action)val sc new SparkContext(sparkConf)val rdd sc.makeRDD(List(1, 2, 3, 4), 2)// count函数获取rdd的数据的个数val cnt rdd.count()// 4println(cnt)// first获取数据源中数据的第一个元素val first rdd.first()println(first)// 1sc.stop()} } 4. take和takeOrdered 函数源码 def take(num: Int): Array[T] withScope {val scaleUpFactor Math.max(conf.get(RDD_LIMIT_SCALE_UP_FACTOR), 2)if (num 0) {new Array[T](0)} else {val buf new ArrayBuffer[T]val totalParts this.partitions.lengthvar partsScanned 0while (buf.size num partsScanned totalParts) {// The number of partitions to try in this iteration. It is ok for this number to be// greater than totalParts because we actually cap it at totalParts in runJob.var numPartsToTry 1Lval left num - buf.sizeif (partsScanned 0) {// If we didnt find any rows after the previous iteration, quadruple and retry.// Otherwise, interpolate the number of partitions we need to try, but overestimate// it by 50%. We also cap the estimation in the end.if (buf.isEmpty) {numPartsToTry partsScanned * scaleUpFactor} else {// As left 0, numPartsToTry is always 1numPartsToTry Math.ceil(1.5 * left * partsScanned / buf.size).toIntnumPartsToTry Math.min(numPartsToTry, partsScanned * scaleUpFactor)}}val p partsScanned.until(math.min(partsScanned numPartsToTry, totalParts).toInt)val res sc.runJob(this, (it: Iterator[T]) it.take(left).toArray, p)res.foreach(buf _.take(num - buf.size))partsScanned p.size}buf.toArray}} 函数说明 object Spark05_RDD_take_takeOrdered_action {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(rdd_action)val sc new SparkContext(sparkConf)val rdd sc.makeRDD(List(4, 3, 2, 1, 6, 7), 3)println(rdd.take(3).mkString( ))// 4, 3, 2// take方法返回的是一个数组// 从数据源取前N个数据println(rdd.takeOrdered(3).mkString(, ))// 1 2 3// takeOrdered方法返回的是一个有序数组第二个参数可以传入排序规则。// 源码中的Ordering特质继承自Comparator[T]即相当于java中的比较器。// 从数据源获取前N个有序的数据sc.stop()} } 5. aggregate 函数源码 def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) U, combOp: (U, U) U): U withScope {// Clone the zero value since we will also be serializing it as part of tasksvar jobResult Utils.clone(zeroValue, sc.env.serializer.newInstance())val cleanSeqOp sc.clean(seqOp)val cleanCombOp sc.clean(combOp)val aggregatePartition (it: Iterator[T]) it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)val mergeResult (_: Int, taskResult: U) jobResult combOp(jobResult, taskResult)sc.runJob(this, aggregatePartition, mergeResult)jobResult} 函数说明 分区的数据通过初始值和分区内的数据进行聚合然后再和初始值进行分区间的数据聚合。 object Spark04_RDD_aggregate_action {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(rdd_action)val sc new SparkContext(sparkConf)val rdd sc.makeRDD(List(1, 2, 3, 4), 2)val i rdd.aggregate(0)(_ _, _ _)println(i)// 10// 0 1 2 3, 0 3 4 7// 0 3 7 10val i1 rdd.aggregate(10)(_ _, _ _)println(i1)// 40// 10 1 2 13, 10 3 4 17// 10 13 17 40sc.stop()} } 6. fold 函数源码; 略。 函数说明 折叠操作aggregate的简化版操作。 即aggregate的分区内计算的逻辑和分区间计算的逻辑相同。 7. countByKey和countByValue countByKey函数源码 def countByKey(): Map[K, Long] self.withScope {self.mapValues(_ 1L).reduceByKey(_ _).collect().toMap} 返回值类型是map集合。即Map[K, long] K即是key-value的key类型Long则是key出现的次数。 countByValue函数源码 def countByValue()(implicit ord: Ordering[T] null): Map[T, Long] withScope {map(value (value, null)).countByKey()} 得知底层调用了map方法将rdd的每个数据映射为一个元组然后调用countByKey方法。 object Spark06_RDD_countByKey_countByValue_action {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(rdd_action)val sc new SparkContext(sparkConf)val rdd sc.makeRDD(List((a, 1), (b, 2), (c, 3), (a, 2)))// 返回的是Map集合key是key-value的keyvalue是key出现的countval rdd_key rdd.countByKey()println(rdd_key)// Map(a - 2, b - 1, c - 1)// a作为key出现了两次b作为key出现了一次...val rdd_value rdd.countByValue()println(rdd_value)// Map((a,2) - 1, (b,2) - 1, (c,3) - 1, (a,1) - 1)// 底层调用了map方法和countbyKey方法sc.stop()} } 8. save相关算子 save相关算子包括 saveAsTextFile, saveAsObjectFile, saveAsSequenceFile 函数源码 def saveAsTextFile(path: String): Unit withScope {saveAsTextFile(path, null)}def saveAsObjectFile(path: String): Unit withScope {this.mapPartitions(iter iter.grouped(10).map(_.toArray)).map(x (NullWritable.get(), new BytesWritable(Utils.serialize(x)))).saveAsSequenceFile(path)}def saveAsSequenceFile(path: String,codec: Option[Class[_ : CompressionCodec]] None): Unit self.withScope {def anyToWritable[U % Writable](u: U): Writable u// TODO We cannot force the return type of anyToWritable be same as keyWritableClass and// valueWritableClass at the compile time. To implement that, we need to add type parameters to// SequenceFileRDDFunctions. however, SequenceFileRDDFunctions is a public class so it will be a// breaking change.val convertKey self.keyClass ! _keyWritableClassval convertValue self.valueClass ! _valueWritableClasslogInfo(Saving as sequence file of type s(${_keyWritableClass.getSimpleName},${_valueWritableClass.getSimpleName}) )val format classOf[SequenceFileOutputFormat[Writable, Writable]]val jobConf new JobConf(self.context.hadoopConfiguration)if (!convertKey !convertValue) {self.saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)} else if (!convertKey convertValue) {self.map(x (x._1, anyToWritable(x._2))).saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)} else if (convertKey !convertValue) {self.map(x (anyToWritable(x._1), x._2)).saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)} else if (convertKey convertValue) {self.map(x (anyToWritable(x._1), anyToWritable(x._2))).saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)}} }函数说明 object Spark06_RDD_save_action {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(rdd_action)val sc new SparkContext(sparkConf)val rdd sc.makeRDD(List((a, 1), (b, 2), (c, 3), (a, 2)))// 保存在文件中rdd.saveAsTextFile(datas)// 序列化成对象保存在文件中rdd.saveAsObjectFile(datas)// 要求数据的格式必须为key-value类型rdd.saveAsSequenceFile(datas)sc.stop()} } 9. foreach 函数源码 底层一直在调用runJob函数。 def foreach(f: T Unit): Unit withScope {val cleanF sc.clean(f)sc.runJob(this, (iter: Iterator[T]) iter.foreach(cleanF))} 函数说明 分布式遍历RDD中的每一个元素调用该函数。 object Spark07_RDD_foreach_action {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(rdd_action)val sc new SparkContext(sparkConf)val rdd sc.makeRDD(List((a, 1), (b, 2), (c, 3), (a, 2)), 2)rdd.collect().foreach(println)// collect会按分区建立的顺序把数据采集过来到driver端 // (a,1) // (b,2) // (c,3) // (a,2)println(*************)// 而foreach直接在executor端内存数据的打印rdd.foreach(println) // (c,3)//(a,2)//(a,1)//(b,2)sc.stop()} }
http://www.dnsts.com.cn/news/131600.html

相关文章:

  • 做网站全包营销型网站建设风格设定包括哪些方面?
  • 重庆建站模板搭建公司网站建设费用预算
  • 西安做网站的公司北京公司招聘
  • 云南建设注册考试中心网站深圳福田香格里拉酒店
  • 做电影网站考什么成都管控最新通告
  • 买服务器做网站 镜像选什么美容加盟的网站建设
  • 网站项目开发收费标准合肥设计网站公司
  • 从化手机网站建设tv域名的网站
  • 建设项目环境影响评价登记表网站jsp小型网站开发代码
  • 网站优化推广费用学做网站课程
  • 娄底网站开发室内设计图片大全
  • 王者荣耀网站建设的步骤阿里巴巴网站建设建议
  • 怎样查一个网站的空间商网站优化一般怎么做
  • 南宁做网站比较好的公司网站开发中期检查
  • 做礼品的网站qingdao城乡住房建设厅网站
  • 自己做抽奖网站违法重庆有那些公司
  • 网站查询系统桂平网站建设
  • 网站排名怎么上去北京设计院排名前十强
  • 网站建站卖首饰侵权郑州天道做网站
  • 网站管理后台地址怎么查询域名网站教程
  • 网站建设报告 商业价值网络哪个公司便宜又好
  • 初二做网站的首页模板现在网站建设用dw哪个版本好
  • 如何选择郑州网站建设合肥市住房和城乡建设局官网
  • 网络有限公司经营范围有哪些网站需要优化的小型公司
  • 廊坊企业网站排名优化电商小程序开发方案
  • 商丘做网站企业网站做口碑营销
  • 长沙网站开发微联有效果的网站排名
  • 专门做产品推广ppt的网站商业网站定义
  • 滑县住房城乡建设厅门户网站网站制作现在赚钱么
  • 网站开发电话正规手表回收网站