广州白云网站建设公司,php+ajax网站开发典型实例pdf,赣州人才网官方网站,4p 4c 4r营销理论区别Kafka消费者 TCP管理创建 TCPFindCoordinator连接协调者消费数据TCP 连接数关闭 TCP 连接消费者的程序入口类是 KafkaConsumer
构建 KafkaConsumer 时 #xff0c;不会创建任何 TCP 连接TCP 连接是用 KafkaConsumer.poll 创建
创建 TCP
poll 创建 TCP 的地方 :
发起 FindC…
Kafka消费者 TCP管理创建 TCPFindCoordinator连接协调者消费数据TCP 连接数关闭 TCP 连接消费者的程序入口类是 KafkaConsumer
构建 KafkaConsumer 时 不会创建任何 TCP 连接TCP 连接是用 KafkaConsumer.poll 创建
创建 TCP
poll 创建 TCP 的地方 :
发起 FindCoordinator 请求时连接协调者时消费数据时
FindCoordinator
协调者 (Coordinator) : 驻留在 Broker 的内存中
负责消费组的组成员管理和各个消费者的位移提交管理当消费者首次用 poll 时发送 FindCoordinator 请求到任意个Broker (负载最小) 发送请求并告知 Broker 的协调者
负载评估 : 消费者连接所有 Broker 中待发送请求最少
连接协调者
Broker 处理完 FindCoordinator 请求后会返回 Broker 的协调者
消费者知道协调者后就对该 Broker 进行 Socket 连接成功连接协调者后就能组协调操作如 : 加入组、等待组分配方案、心跳请求处理、位移获取、位移提交
消费数据
消费者给每个要消费的分区创建与该分区领导者副本所在 Broker 连接的 TCP
例子 : 消费者要消费 5 个分区的数据这 5 个分区的领导者副本分布在 4 台 Broker 上那消费者在消费时 会与这 4 台 Broker 的创建 Socket 连接
TCP 连接数
消费者创建 3 类 TCP 连接
确定协调者和获取集群元数据连接协调者令其执行组成员管理操作执行实际的消息获取
Kafka 日志:
# 消费者程序创建的第一个 TCP 连接用于发送 FindCoordinator 请求
# 消费者创建第一个连接它连接的 Broker 节点的 ID 是 -1 :
# 消费者不知道 Kafka Broker 的任何信息
[2019-05-27 10:00:54,142] DEBUG [Consumer clientIdconsumer-1, groupIdtest] Initiating connection to node localhost:9092 (id: -1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)# 消费者复用上次创建 Socket 连接
# 向 Kafka 发送元数据请求获取整个集群的信息
[2019-05-27 10:00:54,188] DEBUG [Consumer clientIdconsumer-1, groupIdtest] Sending metadata request MetadataRequestData(topics[MetadataRequestTopic(name‘t4’)], allowAutoTopicCreationtrue, includeClusterAuthorizedOperationsfalse, includeTopicAuthorizedOperationsfalse) to node localhost:9092 (id: -1 rack: null) (org.apache.kafka.clients.NetworkClient:1097)# 消费者开始发送 FindCoordinator 请求里的 Broker
# 即 localhost:9092nodeId -1
[2019-05-27 10:00:54,188] TRACE [Consumer clientIdconsumer-1, groupIdtest] Sending FIND_COORDINATOR {keytest,key_type0} with correlation id 0 to node -1 (org.apache.kafka.clients.NetworkClient:496)# 消费者成功协调者的 Broker 信息(node_id 2) 后
# 消费者就知道协调者 Broker 的连接信息
[2019-05-27 10:00:54,203] TRACE [Consumer clientIdconsumer-1, groupIdtest] Completed receive from node -1 for FIND_COORDINATOR with correlation id 0, received {throttle_time_ms0,error_code0,error_messagenull, node_id2,hostlocalhost,port9094} (org.apache.kafka.clients.NetworkClient:837)# 发第二个 Socket 连接TCP连接 localhost:9094
# 只有连接协调者后消费者才能开启消费组的各种功能
[2019-05-27 10:00:54,204] DEBUG [Consumer clientIdconsumer-1, groupIdtest] Initiating connection to node localhost:9094 (id: 2147483645 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)# 消费者要创建新 TCP 连接用于实际的消息获取
# 消费分区的领导者副本在哪台 Broker消费者连接那个 Broker
# 消费者创建 3 个 TCP 连接:
# localhost:9092localhost:9093 和 localhost:9094
[2019-05-27 10:00:54,237] DEBUG [Consumer clientIdconsumer-1, groupIdtest] Initiating connection to node localhost:9094 (id: 2 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)[2019-05-27 10:00:54,237] DEBUG [Consumer clientIdconsumer-1, groupIdtest] Initiating connection to node localhost:9092 (id: 0 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)[2019-05-27 10:00:54,238] DEBUG [Consumer clientIdconsumer-1, groupIdtest] Initiating connection to node localhost:9093 (id: 1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)ID -1 原因 :
消费者程序其实也不光是消费者生产者也是这样的机制首次启动时对 Kafka 集群一无所知因此用 -1 来表示尚未获取到 Broker 数据
ID 2147483645 原因 :
Integer.MAX_VALUE - 协调者的 Broker ID协调者 ID 是 2Socket 连接节点 ID Integer.MAX_VALUE - 2 2147483645这种节点 ID 目的 : 让组协调请求和真正的数据获取请求使用不同的 Socket 连接
关闭 TCP 连接
消费者关闭 Socket :
主动关闭 : 调用 KafkaConsumer.close() 或执行 killKafka 自动关闭 : 由 connection.max.idle.ms 控制 (默认值: 9 分钟)当某个 Socket 连续 9 分钟都没有任何请求消费者杀掉该 Socket 连接