IP怎么屏蔽网站域名,网站流量通道,重庆专业做网站的公司,青岛网站建设有限公司一、上下文
《Kafka-broker粗粒度启动流程》博客中我们分析了broker的大致启动流程#xff0c;这个时候每个broker都不是controller角色#xff0c;下面我们就来看下它是如何选举出来的吧
二、设置ZooKeeper
ZooKeeper是一个开源的分布式协调服务#xff0c;主要用于分…一、上下文
《Kafka-broker粗粒度启动流程》博客中我们分析了broker的大致启动流程这个时候每个broker都不是controller角色下面我们就来看下它是如何选举出来的吧
二、设置ZooKeeper
ZooKeeper是一个开源的分布式协调服务主要用于分布式系统中各节点的协调和管理。Kafka的Controller选举也一样用到了它。 override def startup(): Unit {//....initZkClient(time)configRepository new ZkConfigRepository(new AdminZkClient(zkClient))//....}private def initZkClient(time: Time): Unit {//config.zkConnect//zookeeper.connecthostname1:2181,hostname2:2181,hostname2:2181/kafkainfo(sConnecting to zookeeper on ${config.zkConnect})_zkClient KafkaZkClient.createZkClient(Kafka server, time, config, zkClientConfig)//如果需要在ZK中预先创建顶级路径。_zkClient.createTopLevelPaths()}def createTopLevelPaths(): Unit {//创建 Persistent 持久化的 zk 路径ZkData.PersistentZkPaths.foreach(makeSurePersistentPathExists)}//确保ZK中存在持久路径def makeSurePersistentPathExists(path: String): Unit {createRecursive(path, data null, throwIfPathExists false)}1、KafkaZkClient
KafkaZkClient是在Kafka.zookeeper.ZooKeeperClient之上提供更高级别的Kafka特定操作。
实现说明此类包括各种组件Controller, Configs, Old Consumer等的方法在某些情况下会从调用包中返回类的实例。 def createZkClient(name: String, time: Time, config: KafkaConfig, zkClientConfig: ZKClientConfig): KafkaZkClient {//...KafkaZkClient(...)}
2、AdminZkClient
它提供与ZooKeeper交互的管理员相关方法。
class AdminZkClient(...){//创建topicdef createTopic(...){...}//获取broker元数据def getBrokerMetadatas(...){...}//创建主题并可选地验证其参数。请注意TopicCommand也使用此方法。def createTopicWithAssignment(...{...}//验证主题创建参数def validateTopicCreate(...){...}//删除topic //为给定主题创建删除路径def deleteTopic(...){...}//使用可选的副本分配向现有主题添加分区。请注意TopicCommand使用此方法。def addPartitions(...){...}//将broker从实体名称解析为整数iddef parseBroker(...){...}//.....
}
3、ZkConfigRepository
zookeeper的配置仓库也就是kafka在zookeeper中配置信息。
class ZkConfigRepository(adminZkClient: AdminZkClient) extends ConfigRepository {override def config(configResource: ConfigResource): Properties {//....//从zookeeper的目录下读取数据并封装成实体topic、broker、client-id、user、user/clients/client-id、ipadminZkClient.fetchEntityConfig(configTypeForZk, effectiveName)}
}
4、ZkData
object ZkData {//这些是kafka broker 启动时应该存在的持久ZK路径。val PersistentZkPaths: Seq[String] Seq(ConsumerPathZNode.path, // old consumer pathBrokerIdsZNode.path,TopicsZNode.path,ConfigEntityChangeNotificationZNode.path,DeleteTopicsZNode.path,BrokerSequenceIdZNode.path,IsrChangeNotificationZNode.path,ProducerIdBlockZNode.path,LogDirEventNotificationZNode.path) ConfigType.ALL.asScala.map(ConfigEntityTypeZNode.path)
}//旧的consumer在zk上的路径
object ConsumerPathZNode {def path /consumers
}object BrokerIdsZNode {def path s${BrokersZNode.path}/idsdef encode: Array[Byte] null
}object TopicsZNode {def path s${BrokersZNode.path}/topics
}object ConfigEntityChangeNotificationZNode {def path s${ConfigZNode.path}/changes
}object DeleteTopicsZNode {def path s${AdminZNode.path}/delete_topics
}object BrokerSequenceIdZNode {def path s${BrokersZNode.path}/seqid
}object IsrChangeNotificationZNode {def path /isr_change_notification
}object ProducerIdBlockZNode {val CurrentVersion: Long 1Ldef path /latest_producer_id_blockdef generateProducerIdBlockJson(producerIdBlock: ProducerIdsBlock): Array[Byte] {Json.encodeAsBytes(Map(version - CurrentVersion,broker - producerIdBlock.assignedBrokerId,block_start - producerIdBlock.firstProducerId.toString,block_end - producerIdBlock.lastProducerId.toString).asJava)}object LogDirEventNotificationZNode {def path /log_dir_event_notification
}
三、验证元数据属性集成是否有效
1、meta.properties文件是否始终设置了cluster.id
2、meta.properties文件是否始终设置了node.id或者 broker.id
initialMetaPropsEnsemble.verify(Optional.of(_clusterId), verificationId, verificationFlags)
四、动态broker初始化
动态broker配置存储在ZooKeeper中可以在两个级别定义
1、每个代理的配置持久化在/configs/brokers/{brokerId} 这些可以使用AdminClient使用资源名称brokerId进行描述/更改
2、整个集群的默认值持续存在于/configs/brokers/default 这些可以使用AdminClient使用空资源名称进行描述/更改。
broker配置的优先级顺序为:
1、DYNAMIC_BROKER_CONFIG存储在ZK中的/configs/brokers/{brokerId}
2、DYNAMIC_DEFAULT_BROKER_CONFIG: 存储在ZK中的//configs/brokers/default
3、STATIC_BROKER_CONFIG启动代理时使用的属性通常来自server.properties文件
4、DEFAULT_CONFIG:KafkaConfig中定义的默认配置
config.dynamicConfig.initialize(Some(zkClient), clientMetricsReceiverPluginOpt None)
五、启动KafkaController
_kafkaController new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, metadataCache, threadNamePrefix)
kafkaController.startup()
1、KafkaController结构
class KafkaController(...){//事件管理private[controller] val eventManager new ControllerEventManager(config.brokerId, this, time,controllerContext.stats.rateAndTimeMetrics)//如果brokerid 当前的controllerid 那么就返回truedef isActive: Boolean activeControllerId config.brokerIdvolatile private var brokerInfo initialBrokerInfovolatile private var _brokerEpoch initialBrokerEpoch//启动def startup(): Unit {zkClient.registerStateChangeHandler(new StateChangeHandler {//ControllerHandler controller-state-change-handleroverride val name: String StateChangeHandlers.ControllerHandleroverride def afterInitializingSession(): Unit {eventManager.put(RegisterBrokerAndReelect)}override def beforeInitializingSession(): Unit {val queuedEvent eventManager.clearAndPut(Expire)//阻止新会话的初始化直到处理过期事件这确保在创建新会话之前已处理所有挂起的事件queuedEvent.awaitProcessing()}})eventManager.put(Startup)eventManager.start()}override def process(event: ControllerEvent): Unit {event match {//.....case RegisterBrokerAndReelect processRegisterBrokerAndReelect()case Startup processStartup()//.....}}
}
1、ControllerEventManager结构
class ControllerEventManager(...){//用串行化队列代替锁private val queue new LinkedBlockingQueue[QueuedEvent]//ControllerEventThreadName controller-event-threadprivate[controller] var thread new ControllerEventThread(ControllerEventThreadName)def start(): Unit thread.start()def put(event: ControllerEvent): QueuedEvent inLock(putLock) {val queuedEvent new QueuedEvent(event, time.milliseconds())queue.put(queuedEvent)queuedEvent}class ControllerEventThread(name: String) extends ShutdownableThread(...){override def doWork(): Unit {//从队列获取事件主要是controller相关的事件val dequeued pollFromEventQueue()dequeued.event match {case controllerEvent def process(): Unit dequeued.process(processor)}}}}
}
ControllerEventManager中的ControllerEventThread的父类是ShutdownableThread它里面有真正的run()且调起了doWork()doWork()又调起了process()因此真正执行的是process()
public abstract class ShutdownableThread extends Thread {public abstract void doWork();public void run() {while (isRunning())doWork();}
}
这是一个死循环也就是后面只要往队列中添加事件会自动执行对应方法。从KafkaController的startup()中我们知道放了两个事件RegisterBrokerAndReelect和Startup下面我们来看看它们里面做了什么
2、RegisterBrokerAndReelect事件处理 private def processRegisterBrokerAndReelect(): Unit {_brokerEpoch zkClient.registerBroker(brokerInfo)processReelect()}
1、向zookeeper注册broker
class KafkaZkClient private[zk] (...{def registerBroker(brokerInfo: BrokerInfo): Long {//brokers/ids/brokeridval path brokerInfo.path//创建 对应的 brokerid 的 临时znode节点说明当该brokers挂掉后会随之消失val stat checkedEphemeralCreate(path, brokerInfo.toJsonBytes)info(sRegistered broker ${brokerInfo.broker.id} at path $path with addresses: s${brokerInfo.broker.endPoints.map(_.connectionString).mkString(,)}, czxid (broker epoch): ${stat.getCzxid})//返回czxid (broker epoch)stat.getCzxid}}
2、开始选举
class KafkaController(...){private def processReelect(): Unit {maybeResign()elect()}private def maybeResign(): Unit {val wasActiveBeforeChange isActive//在zk上注册节点改变事件当controller改变时触发为下面的选举做铺垫zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)activeControllerId zkClient.getControllerId.getOrElse(-1)if (wasActiveBeforeChange !isActive) {//当当前broker辞去controller职务时触发onControllerResignation()}}private def elect(): Unit {//获取 活动状态 contoller 如果 集群已经启动了很长时间新增了一台broker那么此时会获得 当下的controller //如果此时集群刚刚启动那么此时没有 活动状态的 controller 返回的结果就是 -1activeControllerId zkClient.getControllerId.getOrElse(-1)/** 我们可以在初始启动和handleDeleted ZK回调期间到达这里。由于潜在的竞争条件当我们到达这里时控制器可能已经被选中了。如果此代理已经是控制器则此检查将防止以下createEphemeralPath方法进入无限循环。*/if (activeControllerId ! -1) {//如果当下已经有 activeControllerId 那么就停止选举 否则继续往下走//Broker $activeControllerId 已被选为控制器因此停止选举过程debug(sBroker $activeControllerId has been elected as the controller, so stopping the election process.)return}//try中会发生如下情况//1、正常运行当选controller//2、异常// 1、ControllerMovedException // 1、其他broker成功当选controller// 2、controller已经当选但刚刚离职需要重新选举// 2、Throwable 该节点当选controller但是就职时出错了。删除该controller,重新选举// try {val (epoch, epochZkVersion) zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId)controllerContext.epoch epochcontrollerContext.epochZkVersion epochZkVersionactiveControllerId config.brokerId//${config.brokerId}已成功当选为控制器。Epoch增加到${controllerContext.eepoch}Epoch zk版本现在是${controller Context.eepoch ZkVersion}”info(s${config.brokerId} successfully elected as the controller. Epoch incremented to ${controllerContext.epoch} sand epoch zk version is now ${controllerContext.epochZkVersion})//成功当选controller并开始履行作为该角色的责任onControllerFailover()} catch {case e: ControllerMovedException //重新开始监听目录变化maybeResign()if (activeControllerId ! -1)debug(s代理$activeControllerId被选为控制器而不是代理${config.brokerId}, e)elsewarn(管制员已经当选但刚刚辞职这将导致另一轮选举, e)case t: Throwable error(s在代理${config.brokerId}上选择或成为控制器时出错。立即触发控制器移动, t)triggerControllerMove()}}}
在选举前zkClient注册的 controllerChangeHandler 事件其实就是观察 controller目录的变化
class ControllerChangeHandler(eventManager: ControllerEventManager) extends ZNodeChangeHandler {//controller目录override val path: String ControllerZNode.pathoverride def handleCreation(): Unit eventManager.put(ControllerChange)override def handleDeletion(): Unit eventManager.put(Reelect)override def handleDataChange(): Unit eventManager.put(ControllerChange)
}
六、总结
1、设置zookeeper如zookeeper.connecthostname1:2181,hostname2:2181,hostname2:2181/kafka并创建持久化目录consumers、brokers/ids、brokers/topics、config/changes、admin/delete_topics、brokers/seqid、isr_change_notification、latest_producer_id_block、log_dir_event_notification
2、验证元数据属性集成是否有效主要时看每个broker是否有了唯一的id
3、将每个broker的id注册到zookeeper
4、启动KafkaController
5、启动ControllerEventThread线程并不断消费LinkedBlockingQueue中事件
6、向队列注册RegisterBrokerAndReelect事件、Startup事件
7、首先处理RegisterBrokerAndReelect事件
8、向zookeeper注册broker并建立临时znode
9、注册controllerChangeHandler 事件其实就是观察 controller目录的变化
10、每个broker开始向zookeeper将自己注册为controller
11、正常情况下只有一个broker成功注册成功其他broker抛出ControllerMovedException继续监控controller目录的变化
12、如果选举controller成功但是在就职时失败会里面进行卸任工作并进行新一轮选举