当前位置: 首页 > news >正文

音乐网站制作课程报告网站使用标题做路径

音乐网站制作课程报告,网站使用标题做路径,连锁租车网站源码,立方集团 网站一、前提 kafka的版本是 2.6.2 一般我们消费kafka的时候是指定消费组#xff0c;是不会指定消费组内部消费kafka各个分区的分配策略#xff0c;但是我们也可以指定消费策略#xff0c;通过源码发现#xff0c;我们可以有三种分区策略#xff1a; RangeAssignor (默认是不会指定消费组内部消费kafka各个分区的分配策略但是我们也可以指定消费策略通过源码发现我们可以有三种分区策略 RangeAssignor (默认RoundRobinAssignorStickyAssignor 指定消费分区策略 props.put(partition.assignment.strategy, org.apache.kafka.clients.consumer.RoundRobinAssignor);kafka消费分区策略的分区入口类是ConsumerCoordinator的performAssignment方法 Overrideprotected MapString, ByteBuffer performAssignment(String leaderId,String assignmentStrategy,ListJoinGroupResponseData.JoinGroupResponseMember allSubscriptions) {//获取分区策略ConsumerPartitionAssignor assignor lookupAssignor(assignmentStrategy);//存储消费组订阅的所有topicSetString allSubscribedTopics new HashSet();//存储消费组内各个消费者对应的基本信息比如元数据MapString, Subscription subscriptions new HashMap();MapString, ListTopicPartition ownedPartitions new HashMap();for (JoinGroupResponseData.JoinGroupResponseMember memberSubscription : allSubscriptions) {Subscription subscription ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberSubscription.metadata()));subscription.setGroupInstanceId(Optional.ofNullable(memberSubscription.groupInstanceId()));subscriptions.put(memberSubscription.memberId(), subscription);allSubscribedTopics.addAll(subscription.topics());ownedPartitions.put(memberSubscription.memberId(), subscription.ownedPartitions());}//具体实现在类 AbstractPartitionAssignor 各个分区算法的抽象类MapString, Assignment assignments assignor.assign(metadata.fetch(), new GroupSubscription(subscriptions)).groupAssignment();...log.info(Finished assignment for group at generation {}: {}, generation().generationId, assignments);...return groupAssignment;}AbstractPartitionAssignor 的 assign() //各个分区策略具体的算法public abstract MapString, ListTopicPartition assign(MapString, Integer partitionsPerTopic,MapString, Subscription subscriptions);Overridepublic GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscription) {MapString, Subscription subscriptions groupSubscription.groupSubscription();SetString allSubscribedTopics new HashSet();for (Map.EntryString, Subscription subscriptionEntry : subscriptions.entrySet())allSubscribedTopics.addAll(subscriptionEntry.getValue().topics());MapString, Integer partitionsPerTopic new HashMap();for (String topic : allSubscribedTopics) {Integer numPartitions metadata.partitionCountForTopic(topic);if (numPartitions ! null numPartitions 0)partitionsPerTopic.put(topic, numPartitions);elselog.debug(Skipping assignment for topic {} since no metadata is available, topic);}/构建参数 partitionsPerTopicmap表示各个topic有多少个分区//subscriptions map表示消费者相关信息消费者id消费者对应的主题MapString, ListTopicPartition rawAssignments assign(partitionsPerTopic, subscriptions);// this class maintains no user data, so just wrap the resultsMapString, Assignment assignments new HashMap();for (Map.EntryString, ListTopicPartition assignmentEntry : rawAssignments.entrySet())assignments.put(assignmentEntry.getKey(), new Assignment(assignmentEntry.getValue()));return new GroupAssignment(assignments);}下面说明下RangeAssignor与RoundRobinAssignor两种分区策略的区别 二、RangeAssignor 分区策略 RangeAssignor是默认分配的策略 public class RangeAssignor extends AbstractPartitionAssignor {Overridepublic String name() {return range;}private MapString, ListMemberInfo consumersPerTopic(MapString, Subscription consumerMetadata) {MapString, ListMemberInfo topicToConsumers new HashMap();for (Map.EntryString, Subscription subscriptionEntry : consumerMetadata.entrySet()) {String consumerId subscriptionEntry.getKey();MemberInfo memberInfo new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId());for (String topic : subscriptionEntry.getValue().topics()) {put(topicToConsumers, topic, memberInfo);}}return topicToConsumers;}Overridepublic MapString, ListTopicPartition assign(MapString, Integer partitionsPerTopic,MapString, Subscription subscriptions) {//获取主题对应的消费者列表//partitionsPerTopic 主题对应分区个数//subscriptions 消费者的信息消费者id消费者对应的主题消费者实例MapString, ListMemberInfo consumersPerTopic consumersPerTopic(subscriptions);//打印输出可以看到消费组group-one有两个消费者 consumer-group-one-1-504e90bc-c1cc-45d5-a687-5e4f98ee48c3 和 consumer-group-one-1-11580834-fc23-468e-ae11-edbc3c4a74bd//其中 consumer-group-one-1-11580834-fc23-468e-ae11-edbc3c4a74bd 消费了 test_topic_partition_one 和 test_topic_partition_two// consumer-group-one-1-504e90bc-c1cc-45d5-a687-5e4f98ee48c3 只消费了 test_topic_partition_one// consumersPerTopic: {test_topic_partition_one[MemberInfo [member.id: consumer-group-one-1-504e90bc-c1cc-45d5-a687-5e4f98ee48c3, group.instance.id: {}], MemberInfo [member.id: consumer-group-one-1-11580834-fc23-468e-ae11-edbc3c4a74bd, group.instance.id: {}]], test_topic_partition_two[MemberInfo [member.id: consumer-group-one-1-11580834-fc23-468e-ae11-edbc3c4a74bd, group.instance.id: {}]]}MapString, ListTopicPartition assignment new HashMap();for (String memberId : subscriptions.keySet())assignment.put(memberId, new ArrayList());for (Map.EntryString, ListMemberInfo topicEntry : consumersPerTopic.entrySet()) {//获取topicString topic topicEntry.getKey();//获取topic对应的消费者ListMemberInfo consumersForTopic topicEntry.getValue();//获取topic的分区数Integer numPartitionsForTopic partitionsPerTopic.get(topic);if (numPartitionsForTopic null)continue;Collections.sort(consumersForTopic);//计算每个消费者至少消费几个分区int numPartitionsPerConsumer numPartitionsForTopic / consumersForTopic.size();//计算剩余几个分区int consumersWithExtraPartition numPartitionsForTopic % consumersForTopic.size();//获取主题分区列表ListTopicPartition partitions AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);for (int i 0, n consumersForTopic.size(); i n; i) {int start numPartitionsPerConsumer * i Math.min(i, consumersWithExtraPartition);//可以看到前面的消费者会多分配一个分区int length numPartitionsPerConsumer (i 1 consumersWithExtraPartition ? 0 : 1);//计算每个消费者对应的分区列表可以看到前面的消费者会多分配一个分区assignment.get(consumersForTopic.get(i).memberId).addAll(partitions.subList(start, start length));}}return assignment;} }举例说明构建消费组下两个消费者 test_topic_partition_one和test_topic_partition_two都是9个分区 进程一 props.put(group.id, group-one);props.put(auto.offset.reset, latest);KafkaConsumerString, byte[] consumer new KafkaConsumer(props);consumer.subscribe(Arrays.asList(test_topic_partition_one, test_topic_partition_two));进程二 props.put(group.id, group-one);props.put(auto.offset.reset, latest);KafkaConsumerString, byte[] consumer new KafkaConsumer(props);consumer.subscribe(Arrays.asList(test_topic_partition_one));通过上面的分配算法可以得到 消费者consumer-group-one-1-11580834-fc23-468e-ae11-edbc3c4a74bd消费的分区为 test_topic_partition_one-0, test_topic_partition_one-1, test_topic_partition_one-2, test_topic_partition_one-3, test_topic_partition_one-4, test_topic_partition_two-0, test_topic_partition_two-1, test_topic_partition_two-2, test_topic_partition_two-3, test_topic_partition_two-4, test_topic_partition_two-5, test_topic_partition_two-6, test_topic_partition_two-7, test_topic_partition_two-8消费者consumer-group-one-1-504e90bc-c1cc-45d5-a687-5e4f98ee48c3消费的分区为 test_topic_partition_one-5, test_topic_partition_one-6, test_topic_partition_one-7, test_topic_partition_one-8如果进程二也消费两个主题则对应的关系变成 通过上面的分配算法可以得到 消费者consumer-group-one-1-11580834-fc23-468e-ae11-edbc3c4a74bd消费的分区为 test_topic_partition_one-0, test_topic_partition_one-1, test_topic_partition_one-2, test_topic_partition_one-3, test_topic_partition_one-4, test_topic_partition_two-0, test_topic_partition_two-1, test_topic_partition_two-2, test_topic_partition_two-3, test_topic_partition_two-4,消费者consumer-group-one-1-504e90bc-c1cc-45d5-a687-5e4f98ee48c3消费的分区为 test_topic_partition_one-5, test_topic_partition_one-6, test_topic_partition_one-7, test_topic_partition_one-8, test_topic_partition_two-5, test_topic_partition_two-6, test_topic_partition_two-7, test_topic_partition_two-8可以看到第一个消费者比第二个消费者多消费一个test_topic_partition_one的分区而且是连续的。同时可以看到分类是按照topic粒度区分的也就是每个消费者消费一个topic的分区与其他topic是无关的。可以会导致第一个实例运行压力较大的问题。 三、RoundRobinAssignor 分区策略 public class RoundRobinAssignor extends AbstractPartitionAssignor {Overridepublic MapString, ListTopicPartition assign(MapString, Integer partitionsPerTopic,MapString, Subscription subscriptions) {MapString, ListTopicPartition assignment new HashMap();//存储消费组下所有的消费者构建两个消费者// 其中一个consumer-group-one-1-6c946240-3ffc-4bba-806d-7d7a0ccc1ad5// 另一个consumer-group-one-1-d227d230-8adc-4d4e-a092-77b63c07855aListMemberInfo memberInfoList new ArrayList();for (Map.EntryString, Subscription memberSubscription : subscriptions.entrySet()) {assignment.put(memberSubscription.getKey(), new ArrayList());memberInfoList.add(new MemberInfo(memberSubscription.getKey(),memberSubscription.getValue().groupInstanceId()));}//排序后的消费者CircularIteratorMemberInfo assigner new CircularIterator(Utils.sorted(memberInfoList));for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {final String topic partition.topic();//轮询指定消费者的分区while (!subscriptions.get(assigner.peek().memberId).topics().contains(topic)) {assigner.next();}assignment.get(assigner.next().memberId).add(partition);}return assignment;}//获取排序后的所有主题分区private ListTopicPartition allPartitionsSorted(MapString, Integer partitionsPerTopic,MapString, Subscription subscriptions) {SortedSetString topics new TreeSet();for (Subscription subscription : subscriptions.values())topics.addAll(subscription.topics());ListTopicPartition allPartitions new ArrayList();for (String topic : topics) {Integer numPartitionsForTopic partitionsPerTopic.get(topic);if (numPartitionsForTopic ! null)allPartitions.addAll(AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic));}return allPartitions;}Overridepublic String name() {return roundrobin;} }举例说明构建消费组下两个消费者 test_topic_partition_one和test_topic_partition_two都是9个分区 进程一 props.put(group.id, group-one);props.put(auto.offset.reset, latest);//指定轮询策略props.put(partition.assignment.strategy, org.apache.kafka.clients.consumer.RoundRobinAssignor);KafkaConsumerString, byte[] consumer new KafkaConsumer(props);consumer.subscribe(Arrays.asList(test_topic_partition_one, test_topic_partition_two));props.put(group.id, group-one);props.put(auto.offset.reset, latest);//指定轮询策略props.put(partition.assignment.strategy, org.apache.kafka.clients.consumer.RoundRobinAssignor);KafkaConsumerString, byte[] consumer new KafkaConsumer(props);consumer.subscribe(Arrays.asList(test_topic_partition_one));通过上面的分配算法可以得到 消费者consumer-group-one-1-6c946240-3ffc-4bba-806d-7d7a0ccc1ad5消费的分区为 test_topic_partition_one-0, test_topic_partition_one-2, test_topic_partition_one-4, test_topic_partition_one-6, test_topic_partition_one-8, test_topic_partition_two-0, test_topic_partition_two-1, test_topic_partition_two-2, test_topic_partition_two-3, test_topic_partition_two-4, test_topic_partition_two-5, test_topic_partition_two-6, test_topic_partition_two-7, test_topic_partition_two-8消费者consumer-group-one-1-504e90bc-c1cc-45d5-a687-5e4f98ee48c3消费的分区为 test_topic_partition_one-1, test_topic_partition_one-3, test_topic_partition_one-5, test_topic_partition_one-7可以看到test_topic_partition_one分区是轮流的分配给两个消费者的 对应的日志 2024-08-19 14:28:34 INFO [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator Line:626] [Consumer clientIdconsumer-group-one-1, groupIdgroup-one] Finished assignment for group at generation 44: {consumer-group-one-1-6c946240-3ffc-4bba-806d-7d7a0ccc1ad5Assignment(partitions[test_topic_partition_one-0, test_topic_partition_one-2, test_topic_partition_one-4, test_topic_partition_one-6, test_topic_partition_one-8, test_topic_partition_two-0, test_topic_partition_two-1, test_topic_partition_two-2, test_topic_partition_two-3, test_topic_partition_two-4, test_topic_partition_two-5, test_topic_partition_two-6, test_topic_partition_two-7, test_topic_partition_two-8]), consumer-group-one-1-d227d230-8adc-4d4e-a092-77b63c07855aAssignment(partitions[test_topic_partition_one-1, test_topic_partition_one-3, test_topic_partition_one-5, test_topic_partition_one-7])}如果进程二也消费两个主题则对应的关系变成 消费者consumer-group-one-1-6c946240-3ffc-4bba-806d-7d7a0ccc1ad5消费的分区为 test_topic_partition_one-0, test_topic_partition_one-2, test_topic_partition_one-4, test_topic_partition_one-6, test_topic_partition_one-8, test_topic_partition_two-1, test_topic_partition_two-3, test_topic_partition_two-5, test_topic_partition_two-7消费者consumer-group-one-1-504e90bc-c1cc-45d5-a687-5e4f98ee48c3消费的分区为 test_topic_partition_one-1, test_topic_partition_one-3, test_topic_partition_one-5, test_topic_partition_one-7 test_topic_partition_two-0, test_topic_partition_two-2, test_topic_partition_two-4, test_topic_partition_two-6, test_topic_partition_two-8也就是会把所有的分区轮流分给两个消费者所以这种模式就和主题个数与主题分区有关了。
http://www.dnsts.com.cn/news/207993.html

相关文章:

  • 为什么要立刻做网站wordpress文章显示在页面
  • 自建网站服务器备案西地那非片能延时多久
  • 东莞做网站有哪些做网站什么空间比较好
  • 手机端快速建站工具网站建设专业的公司排名
  • 汕头教育学会网站建设学网站开发好吗
  • 如何推广网站运营ag娱乐建设网站
  • 什么是网站的自适应域名注册好如何网站建设
  • 网站开发与设计 课程简介什么样建广告网站
  • 六安网站自然排名优化价格长沙哪家网站公司
  • 手机网站适应屏幕在家用电脑做网站
  • 企业网站中文域名有必要续费吗怎样做类似淘宝网的网站
  • 做宣传网站开发公司需要什么资质
  • 挖矿网站怎么免费建设网站建站建设的公司
  • 比较好的网页网站设计成都有什么互联网公司
  • 为什么四川省建设厅网站打不开2014苏州建设银行招聘网站
  • 怎样查看网站是用什么cms_做的123邢台招聘信息网
  • 在线包车网站建设百度seo服务方案
  • 做类似昵图网网站天猫商城支付方式
  • 山东省建设机械协会网站军事最新消息
  • 创意二维码制作网站北京建网站哪家公司好
  • 用织梦做领券网站网站建设人员岗位职责
  • 网站建设张世勇网站 制作 工具
  • 深圳建站公司价格程序员做网站
  • 食品营销型网站手机网站建设维护协议书
  • 做系统简单还是网站简单代理服务器ip国外
  • 山西餐饮加盟网站建设搜狗收录网站
  • 微信辅助做单网站大型资讯门户网站怎么做排名
  • 网站推广的全过程济南官网排名推广
  • 网站开发 工具广告版式设计图片
  • 图片网站怎样选择虚拟主机厦门企业网站制作