dede本地搭建网站,手术直播平台,wordpress supercache,京东企业门户Spark核心组件解析#xff1a;Executor、RDD与缓存优化
Spark Executor
Executor 是 Spark 中用于执行任务#xff08;task#xff09;的执行单元#xff0c;运行在 worker 上#xff0c;但并不等同于 worker。实际上#xff0c;Executor 是一组计算资源#xff08;如…Spark核心组件解析Executor、RDD与缓存优化
Spark Executor
Executor 是 Spark 中用于执行任务task的执行单元运行在 worker 上但并不等同于 worker。实际上Executor 是一组计算资源如 CPU 核心和内存的集合多个 executor 共享 worker 上的 CPU 和内存资源。
Executor 的功能 任务执行Executor 负责执行分配给它的任务并返回结果到 driver 程序。 缓存机制如果应用程序调用了 cache() 或 persist() 函数Executor 会通过 Block Manager 为RDD 提供缓存机制优化重复计算。 生命周期Executor 存在于整个 Spark 应用的生命周期内。
Executor 的创建
Spark 在以下几种情况下创建 Executor
当资源管理器为 Standalone 或 YARN且 CoarseGrainedExecutorBackend 进程接收到 RegisteredExecutor 消息时当使用 Mesos 资源管理器时MesosExecutorBackend 进程注册时在本地模式下当 LocalEndpoint 被创建时。
创建成功后日志会显示如下信息
INFO Executor: Starting executor ID [executorId] on host [executorHostname]
心跳发送线程
Executor 会定期向 driver 发送心跳信号以确保连接活跃。心跳线程通常是一个调度线程池利用 ScheduledThreadPoolExecutor 来维持任务的实时性。
执行任务
Executor 通过 launchTask 方法来执行任务。这个方法会创建一个 TaskRunner 线程并在 Executor Task Launch Worker 线程池中执行任务。
private val threadPool ThreadUtils.newDaemonCachedThreadPool(Executor task launch worker)
Spark RDD (Resilient Distributed Dataset)
RDD 是 Spark 的基础数据结构表示一个不可变的分布式数据集。RDD 在集群中的各个节点上并行计算并且具有弹性容错性和分布式的特性。
RDD 的特性
弹性RDD 是容错的丢失的数据可以通过其父 RDD 重新计算。分布式RDD 的数据分布在集群的不同节点上支持分布式计算。不可修改RDD 一旦创建其数据不可修改这也保证了数据的一致性。分区RDD 会被划分为多个分区以便并行处理。
RDD 的创建方式
1并行化可以通过 SparkContext.parallelize() 方法从一个数据集合创建 RDD。 2从外部存储可以通过 SparkContext.textFile() 等方法从外部存储系统如 HDFS加载数据创建 RDD。 3从其他 RDD通过 Spark 的 Transformation 操作从已有的 RDD 创建新的 RDD。
RDD 操作
RDD 支持两种类型的操作
Transformation 操作转换如 map()、filter()返回新的 RDD。Action 操作行动如 count()、collect()触发实际计算并返回结果。
RDD 的容错性
RDD 提供容错能力。当某个节点失败时可以根据其父 RDD 的计算逻辑恢复丢失的数据。这是通过 DAG有向无环图和父 RDD 关系来实现的。
RDD 的持久化
RDD 可以使用 cache() 或 persist() 进行持久化存储缓存的 RDD 会存储在内存中若内存不足则溢写到磁盘避免重复计算。
RDD 的局限性
缺少内置优化引擎RDD 无法像 DataFrame 和 Dataset 一样利用 Spark 的 Catalyst 优化器进行自动优化。
性能问题随着数据量增大RDD 计算的性能可能下降尤其是与 JVM 垃圾回收和序列化相关的开销。
存储问题当内存不足时RDD 会将数据溢写到磁盘这会导致计算性能大幅下降。
创建 RDD 的例子
并行化创建 RDD
val data sc.parallelize(Seq(1, 2, 3, 4, 5))
val result data.map(_ * 2)
result.collect() // 返回 [2, 4, 6, 8, 10]
从外部存储创建 RDD
val rdd sc.textFile(hdfs://path/to/file)
从其他 RDD 创建 RDD
val newRdd oldRdd.filter(_ 10)
Spark RDD 缓存机制
Spark RDD 缓存是一种优化技术用于将中间计算结果存储在内存中以便在后续操作中复用从而减少重复计算提高性能。RDD 缓存可以显著加速一些需要迭代计算的应用特别是在机器学习和图计算等场景中。
持久化 RDD
持久化操作会将 RDD 的计算结果存储到内存中。这样每次对 RDD 进行操作时Spark 会直接使用内存中的数据而不必重新计算。通过持久化可以避免重复计算从而提高效率。
cache()cache() 是 persist() 的简化方法默认将 RDD 数据存储在内存中使用 MEMORY_ONLY 存储级别。persist()可以通过 persist() 方法选择不同的存储级别例如 MEMORY_ONLY、DISK_ONLY 等。unpersist()用于移除已缓存的数据释放内存。
RDD 持久化存储级别
Spark 提供了多种存储级别每种级别的存储方式不同根据具体的需求选择合适的存储级别。
存储级别使用空间CPU时间是否在内存中是否在磁盘上备注MEMORY_ONLY高低是否默认级别数据未序列化全部存储在内存中MEMORY_ONLY_2高低是否数据存储 2 份MEMORY_ONLY_SER低高是否数据序列化存储占用更少内存MEMORY_AND_DISK高中等部分部分内存不够时数据溢写到磁盘MEMORY_AND_DISK_SER低高部分部分数据序列化内存不够时溢写到磁盘DISK_ONLY低高否是数据仅存储在磁盘中OFF_HEAP----存储在堆外内存目前为试验性选项
副本机制
带有 _2 后缀的存储级别表示在每个节点上缓存数据的副本。副本机制是为了提高容错性。如果某个节点的数据丢失Spark 可以从其他节点的副本中恢复数据而不必重新计算。
缓存策略的选择
MEMORY_ONLY适用于内存足够大的场景避免序列化和磁盘 I/O开销。性能较高但如果内存不足可能会导致计算失败。MEMORY_ONLY_SER适用于内存较为紧张的场景将数据进行序列化后保存在内存中减少内存占用但会增加序列化的开销。MEMORY_AND_DISK适用于内存不足的场景数据无法完全存储在内存时会溢写到磁盘确保数据不会丢失。DISK_ONLY适用于数据量极大的情况全部数据存储在磁盘中性能较低但可以处理大规模数据。
如何使用 Spark RDD 缓存
缓存 RDD
val rdd sc.textFile(data.txt)
rdd.cache() // 使用默认的 MEMORY_ONLY 存储级别
选择存储级别
rdd.persist(StorageLevel.MEMORY_AND_DISK) // 选择 MEMORY_AND_DISK 存储级别
清除缓存
rdd.unpersist() // 移除缓存
Spark 键值对 RDD
Spark 通过 PairRDD 处理键值对类型的数据提供了多种用于处理键值对数据的转换操作。
如何创建键值对 RDD 通过 map 操作将普通 RDD 转换为键值对 RDD。 Scala 示例
val lines sc.textFile(data.txt)
val pairs lines.map(s (s, 1))
Python 示例
lines sc.textFile(data.txt)
pairs lines.map(lambda s: (s, 1))
常见的键值对操作 reduceByKey(func)对具有相同键的值进行规约操作。
val pairs sc.parallelize(List((1,2),(3,4),(3,6)))
val result pairs.reduceByKey((a, b) a b)
println(result.collect().mkString(,))
groupByKey()对具有相同键的值进行分组。
val result pairs.groupByKey()
println(result.collect().mkString(,))
mapValues()对值进行转换操作但不改变键。
val result pairs.mapValues(x x 1)
println(result.collect().mkString(,))
sortByKey()按键排序。
val sorted pairs.sortByKey()
println(sorted.collect().mkString(,))
对两个 RDD 的操作 join()连接两个 RDD返回键相同的数据。
val other sc.parallelize(List((3, 9)))
val joined pairs.join(other)
println(joined.collect().mkString(,))
leftOuterJoin() 和 rightOuterJoin()左外连接和右外连接分别确保第一个和第二个 RDD 中的键存在。
val leftJoined pairs.leftOuterJoin(other)
val rightJoined pairs.rightOuterJoin(other)
println(leftJoined.collect().mkString(,))
println(rightJoined.collect().mkString(,))
通过合理使用 Spark 的缓存和键值对 RDD 操作可以显著提升大数据计算的效率尤其是在迭代计算和需要频繁访问中间数据的场景下。