当前位置: 首页 > news >正文

济宁网站建设济宁最好的网络推广方式

济宁网站建设济宁,最好的网络推广方式,福建工程网站建设团队,北京商标注册一、checkpoint概述 checkpoint#xff0c;是Spark提供的一个比较高级的功能。有时候#xff0c;我们的Spark任务#xff0c;比较复杂#xff0c;从初始化RDD开始#xff0c;到最后整个任务完成#xff0c;有比较多的步骤#xff0c;比如超过10个transformation算子。而…一、checkpoint概述 checkpoint是Spark提供的一个比较高级的功能。有时候我们的Spark任务比较复杂从初始化RDD开始到最后整个任务完成有比较多的步骤比如超过10个transformation算子。而且整个任务运行的时间也特别长比如通常要运行1~2个小时。在这种情况下就比较适合使用checkpoint功能了。因为对于特别复杂的Spark任务有很高的风险会出现某个要反复使用的RDD因为节点的故障导致丢失虽然之前持久化过但是还是导致数据丢失了。那么也就是说出现失败的时候没有容错机制所以当后面的transformation算子又要使用到该RDD时就会发现数据丢失了此时如果没有进行容错处理的话那么就需要再重新计算一次数据了。所以针对这种Spark Job如果我们担心某些关键的在后面会反复使用的RDD因为节点故障导致数据 丢失那么可以针对该RDD启动checkpoint机制实现容错和高可用 如何使用checkpoint 1首先要调用SparkContext的setCheckpointDir()方法设置一个容错的文件系统的目录比如HDFS然后对RDD调用checkpoint()方法。 2最后在RDD所在的job运行结束之后会启动一个单独的job将checkpoint设置过的RDD的数据写入之 前设置的文件系统中。 二、RDD的checkpoint流程 1SparkContext设置checkpoint目录用于存放checkpoint的数据对RDD调用checkpoint方法然后它就会被RDDCheckpointData对象进行管理此时这个RDD的checkpoint状态会被设置为Initialized。 2待RDD所在的job运行结束会调用job中最后一个RDD的doCheckpoint方法该方法沿着RDD的血缘关系向上查找被checkpoint()方法标记过的RDD并将其checkpoint状态从Initialized设置为 CheckpointingInProgress 3启动一个单独的job来将血缘关系中标记为CheckpointInProgress的RDD执行checkpoint操作也就是将其数据写入checkpoint目录 4将RDD数据写入checkpoint目录之后会将RDD状态改变为Checkpointed并且还会改变RDD的血缘关系即会清除掉RDD所有依赖的RDD最后还会设置其父RDD为新创建的CheckpointRDD 三、checkpoint与持久化的区别 1lineage是否发生改变。 lineage血缘关系说的就是RDD之间的依赖关系持久化只是将数据保存在内存中或者本地磁盘文件中RDD的lineage(血缘关系)是不变的Checkpoint执行之后RDD就没有依赖的RDD了也就是它的lineage改变了。 2丢失数据的可能性。 持久化的数据丢失的可能性较大如果采用 persist 把数据存在内存中的话虽然速度最快但是也是最不可靠的就算放在磁盘上也不是完全可靠的因为磁盘也会损坏。Checkpoint的数据通常是保存在高可用文件系统中(HDFS),丢失的可能性很低 建议对需要checkpoint的RDD先执行persist(StorageLevel.DISK_ONLY) 为什么呢 因为默认情况下如果某个RDD没有持久化但是设置了checkpoint那么这个时候本来Spark任务已经执行结束了但是由于中间的RDD没有持久化在进行checkpoint的时候想要将这个RDD的数据写入外部存储系统的话就需要重新计算这个RDD的数据再将其checkpoint到外部存储系统中。如果对需要checkpoint的rdd进行了基于磁盘的持久化那么后面进行checkpoint操作时就会直接从磁盘上读取rdd的数据了就不需要重新再计算一次了这样效率就高了。那在这能不能使用基于内存的持久化呢当然是可以的不过没那个必要。 四、checkpoint的使用 1. scala代码 package com.sanqian.scalaimport org.apache.spark.api.java.StorageLevels import org.apache.spark.{SparkConf, SparkContext}object CheckPointScala {def main(args: Array[String]): Unit {val conf new SparkConf()conf.setAppName(CheckPointScala)val sc new SparkContext(conf)if (args.length 0) {System.exit(100)}val outoutPath args(0)// 1.设置checkpoint目录\sc.setCheckpointDir(hdfs://bigdata01:9000/chk001)val dataRDD sc.textFile(hdfs://bigdata01:9000/hadoop)dataRDD.persist(StorageLevels.DISK_ONLY)// 2.对RDD执行checkpoint操作dataRDD.checkpoint()dataRDD.flatMap(_.split( )).map((_, 1)).reduceByKey(_ _).saveAsTextFile(outoutPath)sc.stop()} }2. Java代码 package com.sanqian.java;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2;import java.util.Arrays; import java.util.Iterator;public class CheckPointJava {public static void main(String[] args) {SparkConf conf new SparkConf();conf.setAppName(CheckPointJava);JavaSparkContext sc new JavaSparkContext(conf);if (args.length 0){System.exit(100);}String outputPath args[0];// 1.设置checkpoint目录sc.setCheckpointDir(hdfs://bigdata01:9000/chk001);JavaRDDString rdd sc.textFile(hdfs://bigdata01:9000/hadoop);// 2.对RDD执行checkpoint操作rdd.checkpoint();rdd.flatMap(new FlatMapFunctionString, String() {Overridepublic IteratorString call(String line) throws Exception {return Arrays.asList(line.split( )).iterator();}}).mapToPair(new PairFunctionString, String, Integer() {Overridepublic Tuple2String, Integer call(String word) throws Exception {return new Tuple2String, Integer(word, 1);}}).reduceByKey(new Function2Integer, Integer, Integer() {Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1 v2;}}).saveAsTextFile(outputPath);sc.stop();} }3. 打包代码 1将pom.xml中的spark-core的依赖设置为provided然后编译打包 dependencygroupIdorg.apache.spark/groupIdartifactIdspark-core_2.11/artifactIdversion2.4.3/versionscopeprovided/scope/dependency 2D:\ProgramData\IdeaProjects\db_sparkmvn clean package -DskipTests 3将打包的jar包上传到bigdata04的/data/soft/sparkjars目录创建一个新的spark-submit脚本 spark-submit \ --class $1 \ --master yarn \ --deploy-mode cluster \ --executor-memory 1G \ --num-executors 1 \ db_spark-1.0-SNAPSHOT.jar \ $2 4提交任务 sh lwx_run.sh com.sanqian.scala.CheckPointScala /out-chk003 执行成功之后可以到 setCheckpointDir 指定的目录中查看一下可以看到目录中会生成对应的文件保存rdd中的数据只不过生成的文件不是普通文本文件直接查看文件中的内容显示为乱码。
http://www.dnsts.com.cn/news/231638.html

相关文章:

  • 传销公司做网站什么罪名上海网站建设公司sky
  • 西安建设网站办公电脑租赁平台
  • 山东住房与城乡建设部网站苏州专业网站建设设计公司排名
  • 公司建设网站的费用苏州小程序开发
  • 西安做网站商标眉山网站设计
  • php网站开发技术代码新闻稿发布平台
  • 零基础学建网站网站建设主管的策划案
  • 中国建设企业银行网站首页Wordpress虚拟网址
  • 湛江市住房和城乡建设网站在线制作图片渐变色
  • muse做网站arcengine网站开发
  • 做律师咨询网站三只松鼠的网络营销方式
  • 观澜网站制作多个 管理 wordpress
  • 企业网站建设公司电话wordpress访问速度太慢
  • 国外教做蛋糕的网站乐清本地网站
  • 网站建设公司服务公司北京网络技术有限公司
  • 深圳有做网站的公司公司网站建设哪个好
  • 免费做产品宣传的网站平面图在线设计
  • 做gif动图的网站犯法吗廊坊制作网站模板建站公司
  • 龙岗区网站建设事件营销ppt
  • 网站导航栏字体平台商城网站建设
  • 高校档案室网站建设二维码制作网站
  • 贺州同城购物网站建设龙胜网站建设公司
  • 网站开发的编程语言有哪些在线制作图片的软件
  • 好的空间网站银川网站seo
  • 梅河口市建设局网站广东广实建设有限公司网站
  • 视频网站做视频容易火写小说赚钱的网站
  • 小型电子商务企业网站建设国外无版权图片网站
  • 公司制作网站google下载app
  • 手机网站营销wordpress 远程媒体库
  • 网站建设 三牛网站原型设计工具