网站规划建设心得与体会,商城类网站开发,网页端,网站开发公司合作协议书Spark事件总线机制 采用Spark2.11源码#xff0c;以下类或方法被DeveloperApi注解额部分#xff0c;可能出现不同版本不同实现的情况。 Spark中的事件总线用于接受事件并提交到对应的监听器中。事件总线在Spark应用启动时#xff0c;会在SparkContext中激活spark运行的事件总…Spark事件总线机制 采用Spark2.11源码以下类或方法被DeveloperApi注解额部分可能出现不同版本不同实现的情况。 Spark中的事件总线用于接受事件并提交到对应的监听器中。事件总线在Spark应用启动时会在SparkContext中激活spark运行的事件总线LiveListenerBus。
LiveListenerBus相关的部分类图如下 由于Spark使用scala语言编写的所以在类图上的接口代表的是Traits类的接口功能。 #mermaid-svg-ZtOLirsbpChUgZpv {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-ZtOLirsbpChUgZpv .error-icon{fill:#552222;}#mermaid-svg-ZtOLirsbpChUgZpv .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-ZtOLirsbpChUgZpv .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-ZtOLirsbpChUgZpv .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-ZtOLirsbpChUgZpv .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-ZtOLirsbpChUgZpv .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-ZtOLirsbpChUgZpv .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-ZtOLirsbpChUgZpv .marker{fill:#333333;stroke:#333333;}#mermaid-svg-ZtOLirsbpChUgZpv .marker.cross{stroke:#333333;}#mermaid-svg-ZtOLirsbpChUgZpv svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-ZtOLirsbpChUgZpv g.classGroup text{fill:#9370DB;fill:#131300;stroke:none;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:10px;}#mermaid-svg-ZtOLirsbpChUgZpv g.classGroup text .title{font-weight:bolder;}#mermaid-svg-ZtOLirsbpChUgZpv .nodeLabel,#mermaid-svg-ZtOLirsbpChUgZpv .edgeLabel{color:#131300;}#mermaid-svg-ZtOLirsbpChUgZpv .edgeLabel .label rect{fill:#ECECFF;}#mermaid-svg-ZtOLirsbpChUgZpv .label text{fill:#131300;}#mermaid-svg-ZtOLirsbpChUgZpv .edgeLabel .label span{background:#ECECFF;}#mermaid-svg-ZtOLirsbpChUgZpv .classTitle{font-weight:bolder;}#mermaid-svg-ZtOLirsbpChUgZpv .node rect,#mermaid-svg-ZtOLirsbpChUgZpv .node circle,#mermaid-svg-ZtOLirsbpChUgZpv .node ellipse,#mermaid-svg-ZtOLirsbpChUgZpv .node polygon,#mermaid-svg-ZtOLirsbpChUgZpv .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-ZtOLirsbpChUgZpv .divider{stroke:#9370DB;stroke:1;}#mermaid-svg-ZtOLirsbpChUgZpv g.clickable{cursor:pointer;}#mermaid-svg-ZtOLirsbpChUgZpv g.classGroup rect{fill:#ECECFF;stroke:#9370DB;}#mermaid-svg-ZtOLirsbpChUgZpv g.classGroup line{stroke:#9370DB;stroke-width:1;}#mermaid-svg-ZtOLirsbpChUgZpv .classLabel .box{stroke:none;stroke-width:0;fill:#ECECFF;opacity:0.5;}#mermaid-svg-ZtOLirsbpChUgZpv .classLabel .label{fill:#9370DB;font-size:10px;}#mermaid-svg-ZtOLirsbpChUgZpv .relation{stroke:#333333;stroke-width:1;fill:none;}#mermaid-svg-ZtOLirsbpChUgZpv .dashed-line{stroke-dasharray:3;}#mermaid-svg-ZtOLirsbpChUgZpv #compositionStart,#mermaid-svg-ZtOLirsbpChUgZpv .composition{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-ZtOLirsbpChUgZpv #compositionEnd,#mermaid-svg-ZtOLirsbpChUgZpv .composition{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-ZtOLirsbpChUgZpv #dependencyStart,#mermaid-svg-ZtOLirsbpChUgZpv .dependency{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-ZtOLirsbpChUgZpv #dependencyStart,#mermaid-svg-ZtOLirsbpChUgZpv .dependency{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-ZtOLirsbpChUgZpv #extensionStart,#mermaid-svg-ZtOLirsbpChUgZpv .extension{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-ZtOLirsbpChUgZpv #extensionEnd,#mermaid-svg-ZtOLirsbpChUgZpv .extension{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-ZtOLirsbpChUgZpv #aggregationStart,#mermaid-svg-ZtOLirsbpChUgZpv .aggregation{fill:#ECECFF!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-ZtOLirsbpChUgZpv #aggregationEnd,#mermaid-svg-ZtOLirsbpChUgZpv .aggregation{fill:#ECECFF!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-ZtOLirsbpChUgZpv .edgeTerminals{font-size:11px;}#mermaid-svg-ZtOLirsbpChUgZpv :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} 继承 实现 实现 聚合 聚合 继承 继承 继承 实现 SparkContext «interface» SparkListenerEvent «interface» SparkListenerInterface «interface» SparkListenerBus «interface» ListenerBus LiveListenerBus AsyncEventQueue AppStatusListener ExecutorAllocationListener «Abstract» SparkListener SparkListener相关事件 EventLoggingListener 主体逻辑
启动应用的时候在SparkConext中对LiveListenerBus进行实例化除了内部的监听器还将注册在 spark.extraListeners配置项中指定的监听器然后启动监听器总线。
在LiveListenerBus中使用AsyncEventQueue作为核心实现将事件异步的分发给已经注册的SparkListener监听器们。其中AsyncEventQueue有4类
LiveListenerBus将AsyncEventQueue分为4类不同的事件分发给各自独立的线程进行处理防止在监听器和事件较多的时候造成积压问题。
eventLog日志事件队列executorManagement执行器管理队列appStatus应用程序状态队列shared非内部监听器共享的队列
在AsyncEventQueue内部采用LinkedBlockingQueue来存储事件并启动一个常住线程dispatchThread进行事件的转发。 #mermaid-svg-TvSoPL0HfiocJaXK {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-TvSoPL0HfiocJaXK .error-icon{fill:#552222;}#mermaid-svg-TvSoPL0HfiocJaXK .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-TvSoPL0HfiocJaXK .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-TvSoPL0HfiocJaXK .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-TvSoPL0HfiocJaXK .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-TvSoPL0HfiocJaXK .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-TvSoPL0HfiocJaXK .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-TvSoPL0HfiocJaXK .marker{fill:#333333;stroke:#333333;}#mermaid-svg-TvSoPL0HfiocJaXK .marker.cross{stroke:#333333;}#mermaid-svg-TvSoPL0HfiocJaXK svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-TvSoPL0HfiocJaXK .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-TvSoPL0HfiocJaXK .cluster-label text{fill:#333;}#mermaid-svg-TvSoPL0HfiocJaXK .cluster-label span{color:#333;}#mermaid-svg-TvSoPL0HfiocJaXK .label text,#mermaid-svg-TvSoPL0HfiocJaXK span{fill:#333;color:#333;}#mermaid-svg-TvSoPL0HfiocJaXK .node rect,#mermaid-svg-TvSoPL0HfiocJaXK .node circle,#mermaid-svg-TvSoPL0HfiocJaXK .node ellipse,#mermaid-svg-TvSoPL0HfiocJaXK .node polygon,#mermaid-svg-TvSoPL0HfiocJaXK .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-TvSoPL0HfiocJaXK .node .label{text-align:center;}#mermaid-svg-TvSoPL0HfiocJaXK .node.clickable{cursor:pointer;}#mermaid-svg-TvSoPL0HfiocJaXK .arrowheadPath{fill:#333333;}#mermaid-svg-TvSoPL0HfiocJaXK .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-TvSoPL0HfiocJaXK .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-TvSoPL0HfiocJaXK .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-TvSoPL0HfiocJaXK .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-TvSoPL0HfiocJaXK .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-TvSoPL0HfiocJaXK .cluster text{fill:#333;}#mermaid-svg-TvSoPL0HfiocJaXK .cluster span{color:#333;}#mermaid-svg-TvSoPL0HfiocJaXK div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-TvSoPL0HfiocJaXK :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} LiveListenerBus AsyncEventQueue-eventLog AsyncEventQueue-executorManagement AsyncEventQueue-appStatus AsyncEventQueue-shared addToQueue addToQueue addToQueue addToQueue start stop eventQueue event4-1 event4-2 listeners listener4类 listener8类 dispatchThread eventQueue event3-1 event3-2 listeners listener3类 listener7类 dispatchThread eventQueue event2-1 event2-2 listeners listener2类 listener6类 dispatchThread eventQueue event1-1 event1-2 listeners listener1类 listener5类 dispatchThread events发生源1 listener1 events发生源2 listener2 events发生源3 listener3 events发生源4 listener4 代码详解
org.apache.spark.util.ListenerBus Traits类 scala中的Traits类类似Java中的接口类。与接口相同的部分是可以定义抽象的方法和成员不用的部分是可以包含具体的方法可以成员。 package org.apache.spark.utilimport java.util.concurrent.CopyOnWriteArrayListimport scala.collection.JavaConverters._
import scala.reflect.ClassTag
import scala.util.control.NonFatalimport com.codahale.metrics.Timerimport org.apache.spark.internal.Logging/*** 事件总线的基类。用来转发事件到对应的事件监听器*/
// [ L:AnyRef]指的是泛型:符号是泛型的上限。private[spark]代表作用域只对spark目录下可见
private[spark] trait ListenerBus[L : AnyRef, E] extends Logging {// (L, Option[Timer])采用的元组式集合private[this] val listenersPlusTimers new CopyOnWriteArrayList[(L, Option[Timer])]// Marked private[spark] for access in tests.private[spark] def listeners listenersPlusTimers.asScala.map(_._1).asJavaprotected def getTimer(listener: L): Option[Timer] None/*** 添加监听器来监听事件。 该方法是线程安全的可以在任何线程中调用。*/final def addListener(listener: L): Unit {listenersPlusTimers.add((listener, getTimer(listener)))}/*** 移除监听器它将不会接收任何事件。 该方法是线程安全的可以在任何线程中调用。*/final def removeListener(listener: L): Unit {listenersPlusTimers.asScala.find(_._1 eq listener).foreach { listenerAndTimer listenersPlusTimers.remove(listenerAndTimer)}}/*** 如果删除侦听器时需要进行任何额外的清理则可以由子类覆盖它。 特别是AsyncEventQueue可以清理LiveListenerBus中的队列。*/def removeListenerOnError(listener: L): Unit {removeListener(listener)}/*** 将事件转发给所有注册的侦听器。 postToAll 调用者应该保证在同一线程中为所有事件调用 postToAll。*/def postToAll(event: E): Unit {val iter listenersPlusTimers.iteratorwhile (iter.hasNext) {val listenerAndMaybeTimer iter.next()val listener listenerAndMaybeTimer._1val maybeTimer listenerAndMaybeTimer._2val maybeTimerContext if (maybeTimer.isDefined) {maybeTimer.get.time()} else {null}try {doPostEvent(listener, event)if (Thread.interrupted()) {throw new InterruptedException()}} catch {case ie: InterruptedException logError(sInterrupted while posting to ${Utils.getFormattedClassName(listener)}. sRemoving that listener., ie)removeListenerOnError(listener)case NonFatal(e) logError(sListener ${Utils.getFormattedClassName(listener)} threw an exception, e)} finally {if (maybeTimerContext ! null) {maybeTimerContext.stop()}}}}/*** 将事件发布到指定的侦听器。 保证所有侦听器在同一线程中调用“onPostEvent”。*/protected def doPostEvent(listener: L, event: E): Unitprivate[spark] def findListenersByClass[T : L : ClassTag](): Seq[T] {val c implicitly[ClassTag[T]].runtimeClasslisteners.asScala.filter(_.getClass c).map(_.asInstanceOf[T]).toSeq}}org.apache.spark.util.ListenerBus.SparkListenerBus
package org.apache.spark.schedulerimport org.apache.spark.util.ListenerBus/*** SparkListenerEvent事件总线继承ListenerBus类将SparkListenerEvent事件转发到SparkListenerInterface中。* SparkListenerInterface是一个trait接口类里面定义了一些关于spark应用运行周期中的一些事件监听器。* SparkListenerEvent是定义了一个事件的通用接口类其他关于Spark应用运行周期过程中的事件均以 case class实现这个接口*/
private[spark] trait SparkListenerBusextends ListenerBus[SparkListenerInterface, SparkListenerEvent] {// 监听器处理对不同的事件采用不用的处理protected override def doPostEvent(listener: SparkListenerInterface,event: SparkListenerEvent): Unit {event match {case stageSubmitted: SparkListenerStageSubmitted listener.onStageSubmitted(stageSubmitted)case stageCompleted: SparkListenerStageCompleted listener.onStageCompleted(stageCompleted)case jobStart: SparkListenerJobStart listener.onJobStart(jobStart)case jobEnd: SparkListenerJobEnd listener.onJobEnd(jobEnd)case taskStart: SparkListenerTaskStart listener.onTaskStart(taskStart)case taskGettingResult: SparkListenerTaskGettingResult listener.onTaskGettingResult(taskGettingResult)case taskEnd: SparkListenerTaskEnd listener.onTaskEnd(taskEnd)case environmentUpdate: SparkListenerEnvironmentUpdate listener.onEnvironmentUpdate(environmentUpdate)case blockManagerAdded: SparkListenerBlockManagerAdded listener.onBlockManagerAdded(blockManagerAdded)case blockManagerRemoved: SparkListenerBlockManagerRemoved listener.onBlockManagerRemoved(blockManagerRemoved)case unpersistRDD: SparkListenerUnpersistRDD listener.onUnpersistRDD(unpersistRDD)case applicationStart: SparkListenerApplicationStart listener.onApplicationStart(applicationStart)case applicationEnd: SparkListenerApplicationEnd listener.onApplicationEnd(applicationEnd)case metricsUpdate: SparkListenerExecutorMetricsUpdate listener.onExecutorMetricsUpdate(metricsUpdate)case executorAdded: SparkListenerExecutorAdded listener.onExecutorAdded(executorAdded)case executorRemoved: SparkListenerExecutorRemoved listener.onExecutorRemoved(executorRemoved)case executorBlacklisted: SparkListenerExecutorBlacklisted listener.onExecutorBlacklisted(executorBlacklisted)case executorUnblacklisted: SparkListenerExecutorUnblacklisted listener.onExecutorUnblacklisted(executorUnblacklisted)case nodeBlacklisted: SparkListenerNodeBlacklisted listener.onNodeBlacklisted(nodeBlacklisted)case nodeUnblacklisted: SparkListenerNodeUnblacklisted listener.onNodeUnblacklisted(nodeUnblacklisted)case blockUpdated: SparkListenerBlockUpdated listener.onBlockUpdated(blockUpdated)case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted)case _ listener.onOtherEvent(event)}}}SparkListener实现了接口SparkListenerInterface是它的默认实现类。主要对所有的事件回调做了无操作实现。
事件的存储与转发队列
org.apache.spark.scheduler.AsyncEventQueue
package org.apache.spark.schedulerimport java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}import com.codahale.metrics.{Gauge, Timer}import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.util.Utils/*** 事件的异步队列。 发布到此队列的所有事件都将传递到单独线程中的子侦听器。** 仅当调用 start() 方法时才会开始传递事件。 当不需要传递更多事件时应该调用“stop()”方法。*/
private class AsyncEventQueue(val name: String,conf: SparkConf,metrics: LiveListenerBusMetrics,bus: LiveListenerBus)extends SparkListenerBuswith Logging {import AsyncEventQueue._// 维护了队列前文所述的继承自SparkListenerEvent的样例类事件默认长度10000。private val eventQueue new LinkedBlockingQueue[SparkListenerEvent](conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))// 代表未处理的事件个数从eventQueue弹出的事件不保证处理结束了所以采用一个单独的变量对事件进行计数private val eventCount new AtomicLong()/**丢弃事件的计数器。 */private val droppedEventsCounter new AtomicLong(0L)/** 上次记录“droppedEventsCounter”的时间以毫秒为单位。 */volatile private var lastReportTimestamp 0Lprivate val logDroppedEvent new AtomicBoolean(false)private var sc: SparkContext nullprivate val started new AtomicBoolean(false)private val stopped new AtomicBoolean(false)private val droppedEvents metrics.metricRegistry.counter(squeue.$name.numDroppedEvents)private val processingTime metrics.metricRegistry.timer(squeue.$name.listenerProcessingTime)// 首先删除队列大小计量器以防它是由从侦听器总线中删除的该队列的先前版本创建的。metrics.metricRegistry.remove(squeue.$name.size)metrics.metricRegistry.register(squeue.$name.size, new Gauge[Int] {override def getValue: Int eventQueue.size()})// 事件转发的常驻线程不停的调用dispatch()进行事件转发private val dispatchThread new Thread(sspark-listener-group-$name) {setDaemon(true)override def run(): Unit Utils.tryOrStopSparkContext(sc) {dispatch()}}private def dispatch(): Unit LiveListenerBus.withinListenerThread.withValue(true) {var next: SparkListenerEvent eventQueue.take()while (next ! POISON_PILL) {val ctx processingTime.time()try {// 通过事件总线将事件转发到所有的注册的监听器中。super.postToAll(next)} finally {ctx.stop()}eventCount.decrementAndGet()next eventQueue.take()}eventCount.decrementAndGet()}override protected def getTimer(listener: SparkListenerInterface): Option[Timer] {metrics.getTimerForListenerClass(listener.getClass.asSubclass(classOf[SparkListenerInterface]))}/*** 启动一个dispatchThread线程将事件分派给监听器。** param sc Used to stop the SparkContext in case the async dispatcher fails.*/private[scheduler] def start(sc: SparkContext): Unit {if (started.compareAndSet(false, true)) {this.sc scdispatchThread.start()} else {throw new IllegalStateException(s$name already started!)}}/*** 停止监听器总线。 它将等待直到处理完排队的事件但新事件将被丢弃。* 插入POISON_PILLdispatchThread线程读取到POISON_PIL时就会停止事件的分发*/private[scheduler] def stop(): Unit {if (!started.get()) {throw new IllegalStateException(sAttempted to stop $name that has not yet started!)}if (stopped.compareAndSet(false, true)) {eventCount.incrementAndGet()eventQueue.put(POISON_PILL)}if (Thread.currentThread() ! dispatchThread) {dispatchThread.join()}}// 向队列中添加事件如果队列满了丢弃当前事件并记录日志。这是个生产者消费者模型当队列满时生产者丢弃事件但队列为空时消费者等待生产者。def post(event: SparkListenerEvent): Unit {if (stopped.get()) {return}eventCount.incrementAndGet()if (eventQueue.offer(event)) {return}// 向eventQueue添加事件失败后的逻辑eventCount.decrementAndGet()droppedEvents.inc()droppedEventsCounter.incrementAndGet()if (logDroppedEvent.compareAndSet(false, true)) {logError(sDropping event from queue $name. This likely means one of the listeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler.)}logTrace(sDropping event $event)val droppedCount droppedEventsCounter.getif (droppedCount 0) {// 为了控制日志的输出频率。采用1分钟输出一次。if (System.currentTimeMillis() - lastReportTimestamp 60 * 1000) {if (droppedEventsCounter.compareAndSet(droppedCount, 0)) {val prevLastReportTimestamp lastReportTimestamplastReportTimestamp System.currentTimeMillis()val previous new java.util.Date(prevLastReportTimestamp)logWarning(sDropped $droppedCount events from $name since $previous.)}}}}/*** For testing only. Wait until there are no more events in the queue.*/def waitUntilEmpty(deadline: Long): Boolean {while (eventCount.get() ! 0) {if (System.currentTimeMillis deadline) {return false}Thread.sleep(10)}true}override def removeListenerOnError(listener: SparkListenerInterface): Unit {bus.removeListener(listener)}}private object AsyncEventQueue {val POISON_PILL new SparkListenerEvent() { }}spark运行事件总线
org.apache.spark.scheduler.LiveListenerBus
package org.apache.spark.schedulerimport java.util.{List JList}
import java.util.concurrent._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.reflect.ClassTag
import scala.util.DynamicVariableimport com.codahale.metrics.{Counter, MetricRegistry, Timer}import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.metrics.source.Source/*** SparkListenerEvent事件管理器* 将 SparkListenerEvents 异步传递给已注册的 SparkListener。** 在调用start()之前所有发布的事件都只会被缓冲。 只有在此侦听器总线启动后事件才会实际传播到所有连接的侦听器。 当调用 stop() 时该监听器总线将停止停止后它将丢弃更多事件。*/
private[spark] class LiveListenerBus(conf: SparkConf) {import LiveListenerBus._private var sparkContext: SparkContext _private[spark] val metrics new LiveListenerBusMetrics(conf)// 表示是否调用了start()方法总线已启动private val started new AtomicBoolean(false)// 表示是否调用了stop()方法总线已启动private val stopped new AtomicBoolean(false)/** 事件放弃计数器 */private val droppedEventsCounter new AtomicLong(0L)/** 上次记录“droppedEventsCounter”的时间以毫秒为单位。 */volatile private var lastReportTimestamp 0Lprivate val queues new CopyOnWriteArrayList[AsyncEventQueue]()// Visible for testing.volatile private[scheduler] var queuedEvents new mutable.ListBuffer[SparkListenerEvent]()/**将侦听器添加到所有非内部侦听器共享的队列中。 */def addToSharedQueue(listener: SparkListenerInterface): Unit {addToQueue(listener, SHARED_QUEUE)}/** 将监听器添加到执行器管理队列中。 */def addToManagementQueue(listener: SparkListenerInterface): Unit {addToQueue(listener, EXECUTOR_MANAGEMENT_QUEUE)}/** 将侦听器添加到应用程序状态队列。*/def addToStatusQueue(listener: SparkListenerInterface): Unit {addToQueue(listener, APP_STATUS_QUEUE)}/** 将监听器添加到事件日志队列. */def addToEventLogQueue(listener: SparkListenerInterface): Unit {addToQueue(listener, EVENT_LOG_QUEUE)}/*** 将侦听器添加到特定队列并根据需要创建新队列。 * 队列彼此独立每个队列使用单独的线程来传递事件允许较慢的侦听器在一定程度上与其他侦听器隔离。*/private[spark] def addToQueue(listener: SparkListenerInterface,queue: String): Unit synchronized {if (stopped.get()) {throw new IllegalStateException(LiveListenerBus is stopped.)}// 先寻找队列是否存在如果存在就注册不存在就创建新队列并注册queues.asScala.find(_.name queue) match {case Some(queue) queue.addListener(listener)case None val newQueue new AsyncEventQueue(queue, conf, metrics, this)newQueue.addListener(listener)if (started.get()) {newQueue.start(sparkContext)}queues.add(newQueue)}}def removeListener(listener: SparkListenerInterface): Unit synchronized {// 从添加到的所有队列中删除侦听器并停止已变空的队列。queues.asScala.filter { queue queue.removeListener(listener)queue.listeners.isEmpty()}.foreach { toRemove if (started.get() !stopped.get()) {toRemove.stop()}queues.remove(toRemove)}}/** 将事件转发到所有的队列中 */def post(event: SparkListenerEvent): Unit {if (stopped.get()) {return}metrics.numEventsPosted.inc()// 如果事件缓冲区为空则意味着总线已启动我们可以避免同步并将事件直接发布到队列中。 这应该是事件总线生命周期中最常见的情况。if (queuedEvents null) {postToQueues(event)return}// 否则需要同步检查总线是否启动以确保调用 start() 的线程拾取新事件。synchronized {if (!started.get()) {queuedEvents eventreturn}}// 如果进行上述检查时总线已经启动则直接发送到队列。postToQueues(event)}// 遍历所有队列进行事件分发private def postToQueues(event: SparkListenerEvent): Unit {val it queues.iterator()while (it.hasNext()) {it.next().post(event)}}/*** 启动每个队列并发送queuedEvents中缓存的事件。每个队列就开始消费之前post的事件并调用postToAll()方法将事件发送给监视器。** 这首先发送在此侦听器总线启动之前发布的所有缓冲事件然后在侦听器总线仍在运行时异步侦听任何其他事件。* 这应该只被调用一次。** param sc Used to stop the SparkContext in case the listener thread dies.*/def start(sc: SparkContext, metricsSystem: MetricsSystem): Unit synchronized {if (!started.compareAndSet(false, true)) {throw new IllegalStateException(LiveListenerBus already started.)}this.sparkContext scqueues.asScala.foreach { q q.start(sc)queuedEvents.foreach(q.post)}queuedEvents nullmetricsSystem.registerSource(metrics)}/*** Exposed for testing.*/throws(classOf[TimeoutException])def waitUntilEmpty(timeoutMillis: Long): Unit {val deadline System.currentTimeMillis timeoutMillisqueues.asScala.foreach { queue if (!queue.waitUntilEmpty(deadline)) {throw new TimeoutException(sThe event queue is not empty after $timeoutMillis ms.)}}}/*** 停止监听器总线。 它将等待直到处理完排队的事件但在停止后删除新事件。*/def stop(): Unit {if (!started.get()) {throw new IllegalStateException(sAttempted to stop bus that has not yet started!)}if (!stopped.compareAndSet(false, true)) {return}synchronized {queues.asScala.foreach(_.stop())queues.clear()}}// For testing only.private[spark] def findListenersByClass[T : SparkListenerInterface : ClassTag](): Seq[T] {queues.asScala.flatMap { queue queue.findListenersByClass[T]() }}// For testing only.private[spark] def listeners: JList[SparkListenerInterface] {queues.asScala.flatMap(_.listeners.asScala).asJava}// For testing only.private[scheduler] def activeQueues(): Set[String] {queues.asScala.map(_.name).toSet}}private[spark] object LiveListenerBus {// Allows for Context to check whether stop() call is made within listener threadval withinListenerThread: DynamicVariable[Boolean] new DynamicVariable[Boolean](false)private[scheduler] val SHARED_QUEUE sharedprivate[scheduler] val APP_STATUS_QUEUE appStatusprivate[scheduler] val EXECUTOR_MANAGEMENT_QUEUE executorManagementprivate[scheduler] val EVENT_LOG_QUEUE eventLog
}private[spark] class LiveListenerBusMetrics(conf: SparkConf)extends Source with Logging {override val sourceName: String LiveListenerBusoverride val metricRegistry: MetricRegistry new MetricRegistryval numEventsPosted: Counter metricRegistry.counter(MetricRegistry.name(numEventsPosted))// Guarded by synchronization.private val perListenerClassTimers mutable.Map[String, Timer]()def getTimerForListenerClass(cls: Class[_ : SparkListenerInterface]): Option[Timer] {synchronized {val className cls.getNameval maxTimed conf.get(LISTENER_BUS_METRICS_MAX_LISTENER_CLASSES_TIMED)perListenerClassTimers.get(className).orElse {if (perListenerClassTimers.size maxTimed) {logError(sNot measuring processing time for listener class $className because a smaximum of $maxTimed listener classes are already timed.)None} else {perListenerClassTimers(className) metricRegistry.timer(MetricRegistry.name(listenerProcessingTime, className))perListenerClassTimers.get(className)}}}}}Spark任务启动时会在SparkContext中启动spark运行的事件总线LiveListenerBus private def setupAndStartListenerBus(): Unit {try {conf.get(EXTRA_LISTENERS).foreach { classNames val listeners Utils.loadExtensions(classOf[SparkListenerInterface], classNames, conf)listeners.foreach { listener listenerBus.addToSharedQueue(listener)logInfo(sRegistered listener ${listener.getClass().getName()})}}} catch {case e: Exception try {stop()} finally {throw new SparkException(sException when registering SparkListener, e)}}// 启动应用的运行事件总线listenerBus.start(this, _env.metricsSystem)_listenerBusStarted true}