当前位置: 首页 > news >正文

内网门户网站 建设方案游戏开发大亨最佳搭配

内网门户网站 建设方案,游戏开发大亨最佳搭配,wordpress全自动赚钱,广告传媒公司介绍使用转换算子是产生一个新的rdd#xff0c;此时在driver端会生成一个逻辑上的执行计划#xff0c;但任务还没有执行。但所谓的行动算子#xff0c;其实就是触发作业执行的方法#xff08;runJob#xff09;。底层代码调用的是环境对象的runJob方法。 1. reduce 函数源码此时在driver端会生成一个逻辑上的执行计划但任务还没有执行。但所谓的行动算子其实就是触发作业执行的方法runJob。底层代码调用的是环境对象的runJob方法。 1. reduce 函数源码 def reduce(f: (T, T) T): T withScope {val cleanF sc.clean(f)val reducePartition: Iterator[T] Option[T] iter {if (iter.hasNext) {Some(iter.reduceLeft(cleanF))} else {None}}var jobResult: Option[T] Noneval mergeResult (_: Int, taskResult: Option[T]) {if (taskResult.isDefined) {jobResult jobResult match {case Some(value) Some(f(value, taskResult.get))case None taskResult}}}sc.runJob(this, reducePartition, mergeResult)// Get the final result out of our Option, or throw an exception if the RDD was emptyjobResult.getOrElse(throw new UnsupportedOperationException(empty collection))} 函数说明 简而言之就是先聚合分区内的数据再聚合分区间的数据。 object Spark01_RDD_reduce_action {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(rdd_action)val sc new SparkContext(sparkConf)val rdd sc.makeRDD(List(1, 2, 3, 4), 2)val i rdd.reduce(_ _)println(i)// 10// 分区数据:[1, 2], [3, 4]// reduce聚集分区的所有元素先聚合分区内的数据再聚合分区间的数据// [1, 2]3, [3, 4]7 3 7 10} } 2. collect 函数源码 def collect(): Array[T] withScope {val results sc.runJob(this, (iter: Iterator[T]) iter.toArray)Array.concat(results: _*)} 函数说明 在驱动程序中以数组Array的形式返回数据集的所有元素。 object Spark02_RDD_collect_action {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(rdd_action)val sc new SparkContext(sparkConf)val rdd sc.makeRDD(List(1, 2, 3, 4), 2)val mapRDD rdd.map(_ * 2)// 代码运行到collect时才开始触发执行任务。在这之前只是在driver构建一个逻辑的执行计划。// collect源码存在runJob函数。println(mapRDD.collect().mkString(,))// 将executor端的分区内的数据按分区有序的生成一个数组并返回到driver端// 调用collect函数的输出2,4,6,8mapRDD.foreach(println)// 不调用collect函数的输出// 2 6 8 4无序} } 3. count 和 first 函数源码 def count(): Long sc.runJob(this, Utils.getIteratorSize _).sumdef first(): T withScope {take(1) match {case Array(t) tcase _ throw new UnsupportedOperationException(empty collection)}} first函数底层调用了take函数take函数底层调用了runJob函数所以first也是行动算子。 函数说明 object Spark03_RDD_count_first_action {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(rdd_action)val sc new SparkContext(sparkConf)val rdd sc.makeRDD(List(1, 2, 3, 4), 2)// count函数获取rdd的数据的个数val cnt rdd.count()// 4println(cnt)// first获取数据源中数据的第一个元素val first rdd.first()println(first)// 1sc.stop()} } 4. take和takeOrdered 函数源码 def take(num: Int): Array[T] withScope {val scaleUpFactor Math.max(conf.get(RDD_LIMIT_SCALE_UP_FACTOR), 2)if (num 0) {new Array[T](0)} else {val buf new ArrayBuffer[T]val totalParts this.partitions.lengthvar partsScanned 0while (buf.size num partsScanned totalParts) {// The number of partitions to try in this iteration. It is ok for this number to be// greater than totalParts because we actually cap it at totalParts in runJob.var numPartsToTry 1Lval left num - buf.sizeif (partsScanned 0) {// If we didnt find any rows after the previous iteration, quadruple and retry.// Otherwise, interpolate the number of partitions we need to try, but overestimate// it by 50%. We also cap the estimation in the end.if (buf.isEmpty) {numPartsToTry partsScanned * scaleUpFactor} else {// As left 0, numPartsToTry is always 1numPartsToTry Math.ceil(1.5 * left * partsScanned / buf.size).toIntnumPartsToTry Math.min(numPartsToTry, partsScanned * scaleUpFactor)}}val p partsScanned.until(math.min(partsScanned numPartsToTry, totalParts).toInt)val res sc.runJob(this, (it: Iterator[T]) it.take(left).toArray, p)res.foreach(buf _.take(num - buf.size))partsScanned p.size}buf.toArray}} 函数说明 object Spark05_RDD_take_takeOrdered_action {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(rdd_action)val sc new SparkContext(sparkConf)val rdd sc.makeRDD(List(4, 3, 2, 1, 6, 7), 3)println(rdd.take(3).mkString( ))// 4, 3, 2// take方法返回的是一个数组// 从数据源取前N个数据println(rdd.takeOrdered(3).mkString(, ))// 1 2 3// takeOrdered方法返回的是一个有序数组第二个参数可以传入排序规则。// 源码中的Ordering特质继承自Comparator[T]即相当于java中的比较器。// 从数据源获取前N个有序的数据sc.stop()} } 5. aggregate 函数源码 def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) U, combOp: (U, U) U): U withScope {// Clone the zero value since we will also be serializing it as part of tasksvar jobResult Utils.clone(zeroValue, sc.env.serializer.newInstance())val cleanSeqOp sc.clean(seqOp)val cleanCombOp sc.clean(combOp)val aggregatePartition (it: Iterator[T]) it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)val mergeResult (_: Int, taskResult: U) jobResult combOp(jobResult, taskResult)sc.runJob(this, aggregatePartition, mergeResult)jobResult} 函数说明 分区的数据通过初始值和分区内的数据进行聚合然后再和初始值进行分区间的数据聚合。 object Spark04_RDD_aggregate_action {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(rdd_action)val sc new SparkContext(sparkConf)val rdd sc.makeRDD(List(1, 2, 3, 4), 2)val i rdd.aggregate(0)(_ _, _ _)println(i)// 10// 0 1 2 3, 0 3 4 7// 0 3 7 10val i1 rdd.aggregate(10)(_ _, _ _)println(i1)// 40// 10 1 2 13, 10 3 4 17// 10 13 17 40sc.stop()} } 6. fold 函数源码; 略。 函数说明 折叠操作aggregate的简化版操作。 即aggregate的分区内计算的逻辑和分区间计算的逻辑相同。 7. countByKey和countByValue countByKey函数源码 def countByKey(): Map[K, Long] self.withScope {self.mapValues(_ 1L).reduceByKey(_ _).collect().toMap} 返回值类型是map集合。即Map[K, long] K即是key-value的key类型Long则是key出现的次数。 countByValue函数源码 def countByValue()(implicit ord: Ordering[T] null): Map[T, Long] withScope {map(value (value, null)).countByKey()} 得知底层调用了map方法将rdd的每个数据映射为一个元组然后调用countByKey方法。 object Spark06_RDD_countByKey_countByValue_action {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(rdd_action)val sc new SparkContext(sparkConf)val rdd sc.makeRDD(List((a, 1), (b, 2), (c, 3), (a, 2)))// 返回的是Map集合key是key-value的keyvalue是key出现的countval rdd_key rdd.countByKey()println(rdd_key)// Map(a - 2, b - 1, c - 1)// a作为key出现了两次b作为key出现了一次...val rdd_value rdd.countByValue()println(rdd_value)// Map((a,2) - 1, (b,2) - 1, (c,3) - 1, (a,1) - 1)// 底层调用了map方法和countbyKey方法sc.stop()} } 8. save相关算子 save相关算子包括 saveAsTextFile, saveAsObjectFile, saveAsSequenceFile 函数源码 def saveAsTextFile(path: String): Unit withScope {saveAsTextFile(path, null)}def saveAsObjectFile(path: String): Unit withScope {this.mapPartitions(iter iter.grouped(10).map(_.toArray)).map(x (NullWritable.get(), new BytesWritable(Utils.serialize(x)))).saveAsSequenceFile(path)}def saveAsSequenceFile(path: String,codec: Option[Class[_ : CompressionCodec]] None): Unit self.withScope {def anyToWritable[U % Writable](u: U): Writable u// TODO We cannot force the return type of anyToWritable be same as keyWritableClass and// valueWritableClass at the compile time. To implement that, we need to add type parameters to// SequenceFileRDDFunctions. however, SequenceFileRDDFunctions is a public class so it will be a// breaking change.val convertKey self.keyClass ! _keyWritableClassval convertValue self.valueClass ! _valueWritableClasslogInfo(Saving as sequence file of type s(${_keyWritableClass.getSimpleName},${_valueWritableClass.getSimpleName}) )val format classOf[SequenceFileOutputFormat[Writable, Writable]]val jobConf new JobConf(self.context.hadoopConfiguration)if (!convertKey !convertValue) {self.saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)} else if (!convertKey convertValue) {self.map(x (x._1, anyToWritable(x._2))).saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)} else if (convertKey !convertValue) {self.map(x (anyToWritable(x._1), x._2)).saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)} else if (convertKey convertValue) {self.map(x (anyToWritable(x._1), anyToWritable(x._2))).saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)}} }函数说明 object Spark06_RDD_save_action {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(rdd_action)val sc new SparkContext(sparkConf)val rdd sc.makeRDD(List((a, 1), (b, 2), (c, 3), (a, 2)))// 保存在文件中rdd.saveAsTextFile(datas)// 序列化成对象保存在文件中rdd.saveAsObjectFile(datas)// 要求数据的格式必须为key-value类型rdd.saveAsSequenceFile(datas)sc.stop()} } 9. foreach 函数源码 底层一直在调用runJob函数。 def foreach(f: T Unit): Unit withScope {val cleanF sc.clean(f)sc.runJob(this, (iter: Iterator[T]) iter.foreach(cleanF))} 函数说明 分布式遍历RDD中的每一个元素调用该函数。 object Spark07_RDD_foreach_action {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local[*]).setAppName(rdd_action)val sc new SparkContext(sparkConf)val rdd sc.makeRDD(List((a, 1), (b, 2), (c, 3), (a, 2)), 2)rdd.collect().foreach(println)// collect会按分区建立的顺序把数据采集过来到driver端 // (a,1) // (b,2) // (c,3) // (a,2)println(*************)// 而foreach直接在executor端内存数据的打印rdd.foreach(println) // (c,3)//(a,2)//(a,1)//(b,2)sc.stop()} }
http://www.dnsts.com.cn/news/54337.html

相关文章:

  • 电子商务网站建设系统功能wordpress机械展示
  • 珠海市官网网站建设价格wordpress aws上集成环境
  • 建设银行的网站用户名是什么免费手机网站开发
  • 做电影网站解决版权问题惠州网站网站建设
  • 网站权重分析深圳专业做网站较好的公司
  • 什么网站系统好公司网站建设维护合同
  • 网站的总体风格包括企业做网站需要注意事项
  • 荆州 网站建设wordpress主题idown
  • 建设网站一般要多少钱精品课程网站建设毕业设计
  • 威海做企业网站的公司app开发外包网
  • 优设网下载优化公司股权结构
  • 网站建设面包屑导航条定制开发电商网站建设多少钱
  • 公司建设网站申请报告范文广东建站
  • 华侨城网站建设北滘企业网站开发
  • 中文购物网站模板大型网页设计
  • 义乌网站建设优化排名网站建设无锡
  • 服装网站建设项目实施报告软件开发全流程
  • 微信微网站模板下载四川企业品牌网站建设
  • 网站开发信息wordpress百度分享插件下载地址
  • 免费网站app哪个好游戏攻略网站开发
  • 网站设计首页微信商城怎么进入
  • 轴承 网站建设 企炬网站托管是什么
  • 涞源县住房和城乡建设局网站网站制作优化推广
  • 在线建站店铺seo是什么意思
  • 怎样做网站公司的销售vscode 网站开发教程
  • 昆山建站公司php网站支付宝接口
  • 网站建设备案信息wordpress不同语言
  • 网站建设吧云南网站设计哪家好
  • 7黄页网站建设中企动力官网 网站
  • 网站备案时网站没有内容可以网站一般在哪建设