免费淘宝客网站建设,房子设计图片,汕头网站建设运营团队,施工企业安全生产考核评定应分为一、Spark的三种持久化机制
1、cache
它是persist的一种简化方式#xff0c;作用是将RDD缓存到内存中#xff0c;以便后续快速访问#xff0c;提高计算效率。cache操作是懒执行的#xff0c;即执行action算子时才会触发。
2、persist
它提供了不同的存储级别#xff0…一、Spark的三种持久化机制
1、cache
它是persist的一种简化方式作用是将RDD缓存到内存中以便后续快速访问提高计算效率。cache操作是懒执行的即执行action算子时才会触发。
2、persist
它提供了不同的存储级别仅磁盘、仅内存、内存或磁盘、内存或磁盘副本数、序列化后存入内存或磁盘、堆外可以根据不同的应用场景进行选择。
3、checkpoint
它将数据永久保存用于减少长血缘关系带来的容错成本。checkpoint不仅保存了数据还保存了计算该数据的算子操作。当需要恢复数据时可以通过这些操作重新计算而不仅仅是依赖于原始数据。且在作业完成后仍然保留可以用于后续的计算任务。
二、用法示例
1、cache
//制作数据
val data: RDD[Int] sc.parallelize( 1 to 10000)
//简单加工
val tempRdd: RDD[(String, Int)] data.map(numif(num%20)(even,num)else(odd,num))
//缓存
tempRdd.cache()
//调用action算子运行
tempRdd.foreach(println) 我们看下tempRdd的存储情况 2、persist
//制作数据
val data: RDD[Int] sc.parallelize( 1 to 10000)
//简单加工
val tempRdd: RDD[(String, Int)] data.map(numif(num%20)(even,num)else(odd,num))
//持久化
tempRdd.persist(StorageLevel.MEMORY_AND_DISK)
//调用action算子运行
tempRdd.foreach(println) 3、checkpoint
//使用checkpoint之前需要用sc先设置检查点目录
sc.setCheckpointDir(./local-spark/checkpoint-data)
//制作数据
val data:RDD[Int] sc.parallelize( 1 to 10000)
//简单加工
val tempRdd:RDD[(String, Int)] data.map(numif(num%20)(even,num)else(odd,num))
//持久化
tempRdd.persist(StorageLevel.MEMORY_AND_DISK)
//创建checkpoint 会触发job
tempRdd.checkpoint()
//调用action算子运行
tempRdd.foreach(println) 从历史服务界面可以观察到该程序启动了两个job在源码分析中我们就会知道原因 我们再看下两个job的DAG 发现重复的计算跑了两次因此我们在使用checkpoint前一般都会添加一个persist来进行加速
下面是添加完persist后再进行checkpoint的DAG虽然也是两个Job但是tempRdd上的那个点变了颜色这意味着tempRdd之前的步骤就不用重复计算了 三、源码分析
1、cache
//使用默认存储级别MEMORY_ONLY持久化此RDD
def cache(): this.type persist()
//其实背后就是使用的persist
def persist(): this.type persist(StorageLevel.MEMORY_ONLY)2、persist
RDD
abstract class RDD[T: ClassTag](transient private var _sc: SparkContext,transient private var deps: Seq[Dependency[_]]) extends Serializable with Logging {//设置此RDD的存储级别以便在第一次计算后跨操作持久化其值。/只有当RDD尚未设置存储级别时这才能用于分配新的存储级别。本地检查点是一个例外。def persist(newLevel: StorageLevel): this.type {if (isLocallyCheckpointed) {//这意味着用户之前调用了localCheckpoint()它应该已经将此RDD标记为持久化。//在这里我们应该用用户明确请求的存储级别在将其调整为使用磁盘后覆盖旧的存储级别。persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride true)} else {persist(newLevel, allowOverride false)}}//标记此RDD以使用指定级别进行持久化//newLevel 目标存储级别//allowOverride 是否用新级别覆盖任何现有级别private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type {// 如果想要重新调整一个RDD的存储级别就必须将allowOverride 置为 trueif (storageLevel ! StorageLevel.NONE newLevel ! storageLevel !allowOverride) {throw new UnsupportedOperationException(Cannot change storage level of an RDD after it was already assigned a level)}// 如果这是第一次将此RDD标记为持久化请在SparkContext中注册它以进行清理和核算。只做一次。if (storageLevel StorageLevel.NONE) {sc.cleaner.foreach(_.registerRDDForCleanup(this))//注册此RDD以持久化在内存和/或磁盘存储中sc.persistRDD(this)}//设置该RDD的storageLevel 以便在Task计算时直接获取数据来加速计算storageLevel newLevelthis}//迭代器嵌套计算如果该RDD是持久化的就直接获取数据封装成iterator给后续RDD使用final def iterator(split: Partition, context: TaskContext): Iterator[T] {if (storageLevel ! StorageLevel.NONE) {getOrCompute(split, context)} else {computeOrReadCheckpoint(split, context)}}//获取或计算RDD分区private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] {val blockId RDDBlockId(id, partition.index)var readCachedBlock true// 此方法在executors上调用因此需要调用SparkEnv.get而不是sc.env 获取blockManager//接下来我们看下BlockManager的getOrElseUpdate方法//最后一个参数是一个匿名函数如果缓存中没有块需要调用它来获取块SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () {readCachedBlock falsecomputeOrReadCheckpoint(partition, context)}) match {// Block hit.case Left(blockResult) if (readCachedBlock) {val existingMetrics context.taskMetrics().inputMetricsexistingMetrics.incBytesRead(blockResult.bytes)new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {override def next(): T {existingMetrics.incRecordsRead(1)delegate.next()}}} else {new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])}// Need to compute the block.case Right(iter) new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])}}//当缓存中没有块时调用它来制作块private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] {if (isCheckpointedAndMaterialized) {//如果checkpointed和materialized 那么直接返回firstParent[T].iterator(split, context)} else {//继续计算通过迭代器嵌套计算知道读取到有持久化的块或者进行Shuffle或者最初的数据源compute(split, context)}}}
SparkContext
class SparkContext(config: SparkConf) extends Logging {//跟踪所有持久的RDDprivate[spark] val persistentRdds {val map: ConcurrentMap[Int, RDD[_]] new MapMaker().weakValues().makeMap[Int, RDD[_]]()map.asScala}private[spark] def persistRDD(rdd: RDD[_]) {persistentRdds(rdd.id) rdd}}
BlockManager
private[spark] class BlockManager(val executorId: String,rpcEnv: RpcEnv,val master: BlockManagerMaster,val serializerManager: SerializerManager,val conf: SparkConf,memoryManager: MemoryManager,mapOutputTracker: MapOutputTracker,shuffleManager: ShuffleManager,val blockTransferService: BlockTransferService,securityManager: SecurityManager,externalBlockStoreClient: Option[ExternalBlockStoreClient])extends BlockDataManager with BlockEvictionHandler with Logging {//如果给定的块存在则检索它//否则调用提供的makeIterator 方法来计算该块持久化它并返回其值。def getOrElseUpdate[T](blockId: BlockId,level: StorageLevel,classTag: ClassTag[T],makeIterator: () Iterator[T]): Either[BlockResult, Iterator[T]] {// 尝试从本地或远程存储读取块。如果它存在那么我们就不需要通过local-get-or-put路径。get[T](blockId)(classTag) match {case Some(block) return Left(block)case _ // 没有获取到块需要计算如果该RDD设置了持久化就对其持久化}// 最初我们在这个块上没有锁.doPutIterator(blockId, makeIterator, level, classTag, keepReadLock true) match {case None // doPut() 没有将工作交还给我们因此该块已经存在或已成功存储。//因此我们现在在块上持有读取锁。val blockResult getLocalValues(blockId).getOrElse {// 由于我们在doPut()和get()调用之间保持了读取锁因此该块不应该被驱逐因此get()不返回该块表示存在一些内部错误releaseLock(blockId)throw new SparkException(sget() failed for block $blockId even though we held a lock)}// 我们已经通过doPut()调用在块上持有读取锁getLocalValue()再次获取锁因此我们需要在这里调用releaseLock()这样锁获取的净次数为1因为调用者只会调用release()一次。releaseLock(blockId)Left(blockResult)case Some(iter) // put失败可能是因为数据太大无法放入内存无法放入磁盘。因此我们需要将输入迭代器传递回调用者以便他们可以决定如何处理这些值例如在不缓存的情况下处理它们。Right(iter)}}//根据给定级别将给定块放入其中一个块存储中必要时复制值//如果该块已存在则此方法不会覆盖它。private def doPutIterator[T](blockId: BlockId,iterator: () Iterator[T],level: StorageLevel,classTag: ClassTag[T],tellMaster: Boolean true,keepReadLock: Boolean false): Option[PartiallyUnrolledIterator[T]] {doPut(blockId, level, classTag, tellMaster tellMaster, keepReadLock keepReadLock) { info val startTimeNs System.nanoTime()var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator[T]] None// 块的大小字节var size 0L//如果RDD持久化选择有内存if (level.useMemory) {// 先把它放在内存中即使它也将useDisk设置为true如果内存存储无法容纳它我们稍后会将其放入磁盘。//如果RDD持久化选择需要反序列化 if (level.deserialized) {//尝试将给定块作为值放入内存存储中memoryStore.putIteratorAsValues(blockId, iterator(), level.memoryMode, classTag) match {case Right(s) size scase Left(iter) // 没有足够的空间展开此块如果持久化也选择了磁盘请下载到磁盘if (level.useDisk) {logWarning(sPersisting block $blockId to disk instead.)diskStore.put(blockId) { channel val out Channels.newOutputStream(channel)serializerManager.dataSerializeStream(blockId, out, iter)(classTag)}size diskStore.getSize(blockId)} else {iteratorFromFailedMemoryStorePut Some(iter)}}} else { // RDD持久化没有选择反序列化//尝试将给定块作为字节放入内存存储中memoryStore.putIteratorAsBytes(blockId, iterator(), classTag, level.memoryMode) match {case Right(s) size scase Left(partiallySerializedValues) // 没有足够的空间展开此块如果持久化也选择了磁盘请下载到磁盘if (level.useDisk) {logWarning(sPersisting block $blockId to disk instead.)diskStore.put(blockId) { channel val out Channels.newOutputStream(channel)partiallySerializedValues.finishWritingToStream(out)}size diskStore.getSize(blockId)} else {iteratorFromFailedMemoryStorePut Some(partiallySerializedValues.valuesIterator)}}}//RDD持久化时也选择了磁盘} else if (level.useDisk) {diskStore.put(blockId) { channel val out Channels.newOutputStream(channel)serializerManager.dataSerializeStream(blockId, out, iterator())(classTag)}size diskStore.getSize(blockId)}val putBlockStatus getCurrentBlockStatus(blockId, info)val blockWasSuccessfullyStored putBlockStatus.storageLevel.isValidif (blockWasSuccessfullyStored) {// 现在该块位于内存或磁盘存储中请将其告知主机info.size sizeif (tellMaster info.tellMaster) {reportBlockStatus(blockId, putBlockStatus)}addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)logDebug(sPut block $blockId locally took ${Utils.getUsedTimeNs(startTimeNs)})//如果RDD持久化选择的副本数大于1if (level.replication 1) {val remoteStartTimeNs System.nanoTime()val bytesToReplicate doGetLocalBytes(blockId, info)val remoteClassTag if (!serializerManager.canUseKryo(classTag)) {scala.reflect.classTag[Any]} else {classTag}try {replicate(blockId, bytesToReplicate, level, remoteClassTag)} finally {bytesToReplicate.dispose()}logDebug(sPut block $blockId remotely took ${Utils.getUsedTimeNs(remoteStartTimeNs)})}}assert(blockWasSuccessfullyStored iteratorFromFailedMemoryStorePut.isEmpty)iteratorFromFailedMemoryStorePut}}}
3、checkpoint
RDD
//将此RDD标记为检查点。它将被保存到使用SparkContext#setCheckpointDir设置的检查点目录中的一个文件中并且对其父RDD的所有引用都将被删除。必须在此RDD上执行任何作业之前调用此函数。强烈建议将此RDD持久化在内存中否则将其保存在文件上将需要重新计算。
def checkpoint(): Unit RDDCheckpointData.synchronized {// 注意由于下游的复杂性我们在这里使用全局锁来确保子RDD分区指向正确的父分区。今后我们应该重新考虑这个问题。if (context.checkpointDir.isEmpty) {//SparkContext中尚未设置检查点目录 , 因此使用之前需要用sc先设置检查点目录throw new SparkException(Checkpoint directory has not been set in the SparkContext)} else if (checkpointData.isEmpty) {checkpointData Some(new ReliableRDDCheckpointData(this))}
}
ReliableRDDCheckpointData
private[spark] class ReliableRDDCheckpointData[T: ClassTag](transient private val rdd: RDD[T])extends RDDCheckpointData[T](rdd) with Logging {//........省略..........//将此RDD具体化并将其内容写入可靠的DFS。在该RDD上调用的第一个action 完成后立即调用。protected override def doCheckpoint(): CheckpointRDD[T] {//将RDD写入检查点文件并返回表示RDD的ReliableCheckpointRDDval newRDD ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)// 如果引用超出范围可以选择清理检查点文件if (rdd.conf.getBoolean(spark.cleaner.referenceTracking.cleanCheckpoints, false)) {rdd.context.cleaner.foreach { cleaner cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)}}logInfo(sDone checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id})newRDD}}
ReliableCheckpointRDD
private[spark] object ReliableCheckpointRDD extends Logging {def writeRDDToCheckpointDirectory[T: ClassTag](originalRDD: RDD[T],checkpointDir: String,blockSize: Int -1): ReliableCheckpointRDD[T] {val checkpointStartTimeNs System.nanoTime()val sc originalRDD.sparkContext// 为检查点创建输出路径val checkpointDirPath new Path(checkpointDir)val fs checkpointDirPath.getFileSystem(sc.hadoopConfiguration)if (!fs.mkdirs(checkpointDirPath)) {throw new SparkException(sFailed to create checkpoint path $checkpointDirPath)}// 保存到文件并将其重新加载为RDDval broadcastedConf sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration))// 这很昂贵因为它不必要地再次计算RDD 因此一般都会在检查点前调用持久化sc.runJob(originalRDD,writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)if (originalRDD.partitioner.nonEmpty) {//将分区器写入给定的RDD检查点目录。这是在尽最大努力的基础上完成的写入分区器时的任何异常都会被捕获、记录并忽略。writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)}val checkpointDurationMs TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - checkpointStartTimeNs)logInfo(sCheckpointing took $checkpointDurationMs ms.)//从以前写入可靠存储的检查点文件中读取的RDDval newRDD new ReliableCheckpointRDD[T](sc, checkpointDirPath.toString, originalRDD.partitioner)if (newRDD.partitions.length ! originalRDD.partitions.length) {throw new SparkException(sCheckpoint RDD $newRDD(${newRDD.partitions.length}) has different snumber of partitions from original RDD $originalRDD(${originalRDD.partitions.length}))}newRDD}}
什么时候对RDD进行checkpoint
当该RDD所属的Job执行后再对该RDD进行checkpoint
class SparkContext(config: SparkConf) extends Logging {def runJob[T, U: ClassTag](rdd: RDD[T],func: (TaskContext, Iterator[T]) U,partitions: Seq[Int],resultHandler: (Int, U) Unit): Unit {//执行任务dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)//递归调用父RDD查看是否要进行checkpointrdd.doCheckpoint()}}abstract class RDD[T: ClassTag](... ) extends Serializable with Logging {//递归函数private[spark] def doCheckpoint(): Unit {RDDOperationScope.withScope(sc, checkpoint, allowNesting false, ignoreParent true) {if (!doCheckpointCalled) {doCheckpointCalled trueif (checkpointData.isDefined) {if (checkpointAllMarkedAncestors) {// 我们可以收集所有需要检查点的RDD然后并行检查它们。首先检查父母因为我们的血统在检查自己后会被截断dependencies.foreach(_.rdd.doCheckpoint())}checkpointData.get.checkpoint()} else {dependencies.foreach(_.rdd.doCheckpoint())}}}}
}
总结
1、RDD执行checkpoint方法对该RDD进行标记
2、RDD所在的Job执行
3、执行完会用这个Job最后的RDD递归向父寻找找到所有的被标记需要checkpoint的RDD再次调用runJob启动任务将这个RDD进行checkpoint
所以我们在对RDD进行checkpoint前一般会对其persist