当前位置: 首页 > news >正文

怎么做返利网站专业网站建设咨询

怎么做返利网站,专业网站建设咨询,公司名字大全2022,互助网站开发一些理论 1.topic支持多分区#xff0c;每个分区只能被组内的一个消费者消费#xff0c;一个消费者可能消费多个分区的数据#xff1b; 2.消费者组重平衡的分区策略#xff0c;是由消费者自己决定的#xff0c;具体是从消费者组中选一个作为leader进行分区方案分配#…一些理论 1.topic支持多分区每个分区只能被组内的一个消费者消费一个消费者可能消费多个分区的数据 2.消费者组重平衡的分区策略是由消费者自己决定的具体是从消费者组中选一个作为leader进行分区方案分配 3.每条消息都有一个唯一的offsetkafka保证单个分区的消息有序因为每个分区的消息是按顺序写入的消费者是按offset拉取 4.自动提交和手动提交自动提交是指 sdk 开启了一个协程定时自动提交已经标记处理的消息的offset而不是说拉到消息就自动提交手动提交则需要业务代码手动提交offset如果是每消费一条消息再手动提交一次这样是同步操作会降低消费者消费速度可以考虑批量处理多个消息再一起提交 5.消费模式kafka是拉模式消费者定时从kafka拉取消息 6.服务发布更新、重启、k8s中pod扩缩容 会导致消费者组内消费者成员数量发生变化进而发生消费者组重平衡重平衡期间stw消费者组短暂停止拉取拉取 会导致消息堆积这种重平衡无法避免stw时间取决于服务升级期间的耗时 源码分析 消费者组接口 type ConsumerGroup interface {Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) errorErrors() -chan errorClose() error }type ConsumerGroupHandler interface {Setup(ConsumerGroupSession) errorCleanup(ConsumerGroupSession) errorConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error }type ConsumerGroupClaim interface {Topic() stringPartition() int32InitialOffset() int64HighWaterMarkOffset() int64Messages() -chan *ConsumerMessage }type ConsumerGroupSession interface {Claims() map[string][]int32MemberID() stringGenerationID() int32MarkOffset(topic string, partition int32, offset int64, metadata string)Commit()ResetOffset(topic string, partition int32, offset int64, metadata string)MarkMessage(msg *ConsumerMessage, metadata string)Context() context.Context }消息拉取 可以在一个请求拉多个分区的数据然后按分区分类 func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {// 。。。go withRecover(bc.subscriptionManager)go withRecover(bc.subscriptionConsumer)return bc }response, err : bc.fetchNewMessages()func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {request : FetchRequest{MinBytes: bc.consumer.conf.Consumer.Fetch.Min,MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),}// 。。。for child : range bc.subscriptions {request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)fmt.Printf(fetchNewMessages topic%s partition%d offset%d fetchSize%d\n,child.topic, child.partition, child.offset, child.fetchSize)}return bc.broker.Fetch(request) }//subscriptionConsumer ensures we will get nil right away if no new subscriptions is available func (bc *brokerConsumer) subscriptionConsumer() {-bc.wait // wait for our first piece of workfor newSubscriptions : range bc.newSubscriptions {bc.updateSubscriptions(newSubscriptions)if len(bc.subscriptions) 0 {// Were about to be shut down or were about to receive more subscriptions.// Either way, the signal just hasnt propagated to our goroutine yet.-bc.waitcontinue}response, err : bc.fetchNewMessages()fmt.Printf([%s]subscriptionConsumer.fetchNewMessages...\n, time.Now())if err ! nil {Logger.Printf(consumer/broker/%d disconnecting due to error processing FetchRequest: %s\n, bc.broker.ID(), err)bc.abort(err)return}bc.acks.Add(len(bc.subscriptions))for child : range bc.subscriptions {// 每个分区处理器都写入fmt.Printf(subscriptionConsumer write %s %d %v\n, child.topic, child.partition, response)child.feeder - response}bc.acks.Wait()bc.handleResponses()} }消息解析处理 func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) {child : partitionConsumer{consumer: c,conf: c.conf,topic: topic,partition: partition,messages: make(chan *ConsumerMessage, c.conf.ChannelBufferSize),errors: make(chan *ConsumerError, c.conf.ChannelBufferSize),feeder: make(chan *FetchResponse, 1),trigger: make(chan none, 1),dying: make(chan none),fetchSize: c.conf.Consumer.Fetch.Default,}if err : child.chooseStartingOffset(offset); err ! nil {return nil, err}var leader *Brokervar err errorif leader, err c.client.Leader(child.topic, child.partition); err ! nil {return nil, err}if err : c.addChild(child); err ! nil {return nil, err}go withRecover(child.dispatcher)// 每个分区起一个协程处理go withRecover(child.responseFeeder)fmt.Printf(\nConsumePartition go %s %d %d\n, topic, partition, offset)child.broker c.refBrokerConsumer(leader)child.broker.input - childreturn child, nil } 自动提交offset func newOffsetManagerFromClient(group, memberID string, generation int32, client Client) (*offsetManager, error) {// Check that we are not dealing with a closed Client before processing any other argumentsif client.Closed() {return nil, ErrClosedClient}conf : client.Config()om : offsetManager{client: client,conf: conf,group: group,poms: make(map[string]map[int32]*partitionOffsetManager),memberID: memberID,generation: generation,closing: make(chan none),closed: make(chan none),}if conf.Consumer.Offsets.AutoCommit.Enable {om.ticker time.NewTicker(conf.Consumer.Offsets.AutoCommit.Interval)go withRecover(om.mainLoop)}return om, nil }func (om *offsetManager) mainLoop() {defer om.ticker.Stop()defer close(om.closed)for {select {case -om.ticker.C:om.Commit()case -om.closing:return}} }func (om *offsetManager) Commit() {om.flushToBroker()om.releasePOMs(false) }func (om *offsetManager) flushToBroker() {req : om.constructRequest()if req nil {return}broker, err : om.coordinator()if err ! nil {om.handleError(err)return}resp, err : broker.CommitOffset(req)if err ! nil {fmt.Printf(broker.CommitOffset fail %v\n, err)om.handleError(err)om.releaseCoordinator(broker)_ broker.Close()return}om.handleResponse(broker, req, resp) }标记位移 func (s *consumerGroupSession) MarkMessage(msg *ConsumerMessage, metadata string) {s.MarkOffset(msg.Topic, msg.Partition, msg.Offset1, metadata) } func (pom *partitionOffsetManager) MarkOffset(offset int64, metadata string) {pom.lock.Lock()defer pom.lock.Unlock()if offset pom.offset {pom.offset offsetpom.metadata metadatapom.dirty true} }func (om *offsetManager) constructRequest() *OffsetCommitRequest {var r *OffsetCommitRequestvar perPartitionTimestamp int64if om.conf.Consumer.Offsets.Retention 0 {perPartitionTimestamp ReceiveTimer OffsetCommitRequest{Version: 1,ConsumerGroup: om.group,ConsumerID: om.memberID,ConsumerGroupGeneration: om.generation,}} else {r OffsetCommitRequest{Version: 2,RetentionTime: int64(om.conf.Consumer.Offsets.Retention / time.Millisecond),ConsumerGroup: om.group,ConsumerID: om.memberID,ConsumerGroupGeneration: om.generation,}}om.pomsLock.RLock()defer om.pomsLock.RUnlock()for _, topicManagers : range om.poms {for _, pom : range topicManagers {pom.lock.Lock()if pom.dirty {r.AddBlock(pom.topic, pom.partition, pom.offset, perPartitionTimestamp, pom.metadata)}pom.lock.Unlock()}}// 有处理的才提交if len(r.blocks) 0 {return r}return nil }消费者重平衡 func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {// 获取broker组协调器coordinator, err : c.client.Coordinator(c.groupID)if err ! nil {if retries 0 {return nil, err}return c.retryNewSession(ctx, topics, handler, retries, true)}// 申请加入组// Join consumer groupjoin, err : c.joinGroupRequest(coordinator, topics)if err ! nil {_ coordinator.Close()return nil, err}switch join.Err {case ErrNoError:c.memberID join.MemberIdcase ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediatelyc.memberID return c.newSession(ctx, topics, handler, retries)case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refreshif retries 0 {return nil, join.Err}return c.retryNewSession(ctx, topics, handler, retries, true)// 已经在重平衡期间case ErrRebalanceInProgress: // retry after backoffif retries 0 {return nil, join.Err}return c.retryNewSession(ctx, topics, handler, retries, false)default:return nil, join.Err}// 消费者组中的一个消费者作为leader进行分区方案分配// Prepare distribution plan if we joined as the leadervar plan BalanceStrategyPlanif join.LeaderId join.MemberId {members, err : join.GetMembers()if err ! nil {return nil, err}// 分配分区plan, err c.balance(members)if err ! nil {return nil, err}}// 同步给kafka,只有 leader会带上分区方案// Sync consumer groupgroupRequest, err : c.syncGroupRequest(coordinator, plan, join.GenerationId)if err ! nil {_ coordinator.Close()return nil, err}switch groupRequest.Err {case ErrNoError:case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediatelyc.memberID return c.newSession(ctx, topics, handler, retries)case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refreshif retries 0 {return nil, groupRequest.Err}return c.retryNewSession(ctx, topics, handler, retries, true)case ErrRebalanceInProgress: // retry after backoffif retries 0 {return nil, groupRequest.Err}return c.retryNewSession(ctx, topics, handler, retries, false)default:return nil, groupRequest.Err}// Retrieve and sort claimsvar claims map[string][]int32 // topic-partions// 如果有可消费的分区if len(groupRequest.MemberAssignment) 0 {members, err : groupRequest.GetMemberAssignment()if err ! nil {return nil, err}claims members.Topicsc.userData members.UserDatafor _, partitions : range claims {sort.Sort(int32Slice(partitions))}}return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler) }使用例子 消费者-自动提交 package mainimport (contextfmtgithub.com/Shopify/sarama )func main() {config : sarama.NewConfig()config.Version sarama.V2_0_0_0config.Consumer.Offsets.Initial sarama.OffsetNewestconfig.Consumer.Offsets.AutoCommit.Enable true // 自动提交config.Consumer.Return.Errors truevar (brokers []string{localhost:9092}groupID g1topics []string{test3})group, err : sarama.NewConsumerGroup(brokers, groupID, config)if err ! nil {panic(err)}defer func() { _ group.Close() }()// Track errorsgo func() {for err : range group.Errors() {fmt.Println(ERROR, err)}}()// Iterate over consumer sessions.ctx : context.Background()for {handler : exampleConsumerGroupHandler{}// Consume should be called inside an infinite loop, when a// server-side rebalance happens, the consumer session will need to be// recreated to get the new claimserr : group.Consume(ctx, topics, handler)if err ! nil {panic(err)}} }type exampleConsumerGroupHandler struct{}func (exampleConsumerGroupHandler) Setup(se sarama.ConsumerGroupSession) error {fmt.Printf(Setup %q %v, se.MemberID(), se.Claims())return nil } func (exampleConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } func (h exampleConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {for msg : range claim.Messages() {fmt.Printf(Message topic:%q partition:%d offset:%d ts:%s val:%s\n,msg.Topic, msg.Partition, msg.Offset, msg.Timestamp, msg.Value)//time.Sleep(time.Second * 10)sess.MarkMessage(msg, )//sess.Commit()//fmt.Printf(\n\nafter commit\n)}return nil } 消费者-手动提交 package mainimport (contextfmtgithub.com/Shopify/sarama )func main() {config : sarama.NewConfig()config.Version sarama.V2_0_0_0config.Consumer.Offsets.Initial sarama.OffsetNewestconfig.Consumer.Offsets.AutoCommit.Enable falseconfig.Consumer.Return.Errors truevar (brokers []string{localhost:9092}groupID g1topics []string{test3})group, err : sarama.NewConsumerGroup(brokers, groupID, config)if err ! nil {panic(err)}defer func() { _ group.Close() }()// Track errorsgo func() {for err : range group.Errors() {fmt.Println(ERROR, err)}}()// Iterate over consumer sessions.ctx : context.Background()for {handler : exampleConsumerGroupHandler{}// Consume should be called inside an infinite loop, when a// server-side rebalance happens, the consumer session will need to be// recreated to get the new claimserr : group.Consume(ctx, topics, handler)if err ! nil {panic(err)}} }type exampleConsumerGroupHandler struct{}func (exampleConsumerGroupHandler) Setup(se sarama.ConsumerGroupSession) error {fmt.Printf(Setup %q %v, se.MemberID(), se.Claims())return nil } func (exampleConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } func (h exampleConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {for msg : range claim.Messages() {fmt.Printf(Message topic:%q partition:%d offset:%d ts:%s val:%s\n,msg.Topic, msg.Partition, msg.Offset, msg.Timestamp, msg.Value)//time.Sleep(time.Second * 10)sess.MarkMessage(msg, )sess.Commit()}return nil } 生产者 同步发送 package mainimport (fmtlogosgithub.com/Shopify/sarama )var (logger log.New(os.Stderr, , log.LstdFlags) )func main() {var (brokers []string{localhost:9092}topic test3)config : sarama.NewConfig()config.Producer.Return.Successes true/*0 不发送任何响应TCP ACK就是你得到的全部WaitForLocal RequiredAcks1 只等待本地提交成功后再进行响应。WaitForAll RequiredAcks-1 等待所有同步副本提交后再响应。*/config.Producer.RequiredAcks sarama.WaitForAll // WaitForAll等待所有同步副本提交后再响应。producer, err : sarama.NewSyncProducer(brokers, config)if err ! nil {fmt.Printf(Failed to open Kafka producer: %s, err)return}defer func() {if err : producer.Close(); err ! nil {logger.Println(Failed to close Kafka producer cleanly:, err)}}()message : sarama.ProducerMessage{Topic: topic,Key: sarama.StringEncoder(k1),Value: sarama.StringEncoder(v1),}partition, offset, err : producer.SendMessage(message)if err ! nil {fmt.Printf(Failed to produce message: %s, err)}fmt.Printf(produce %d/%d\n, partition, offset) } shell 生产者 bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test3创建topic 分区数3 ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test查看堆积情况位移差值越大堆积越严重 [rootlocalhost kafka_2.12-2.5.1] # [kube:] ./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group g1GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID g1 test3 0 4 4 0 sarama-c0869d1a-9a8e-400d-a6f5-887ca95587d5 /0:0:0:0:0:0:0:1 sarama g1 test3 1 4 4 0 sarama-c0869d1a-9a8e-400d-a6f5-887ca95587d5 /0:0:0:0:0:0:0:1 sarama g1 test3 2 3 3 0 sarama-c0869d1a-9a8e-400d-a6f5-887ca95587d5 /0:0:0:0:0:0:0:1 sarama g1 test 0 4 4 0 - - -
http://www.dnsts.com.cn/news/165496.html

相关文章:

  • 专业做能源招聘的网站浙江省建设工程质监站网站
  • 秦皇岛住房建设网站国内新闻最新消息今天简短
  • asp本地网站无法打开长春房产网官网
  • 湖北网站定制开发价格表电子网站建设前台设计
  • 东坑网站建设公司深圳网站设..
  • 家纺营销型网站黄山网站建设jidela
  • 加强网站队伍建设自助企业建站模板
  • 旅游便宜网站建设wordpress虚拟资源交易平台
  • 做网站需要数据库么18款禁用软件app网站入口
  • 株洲网站建设哪家好企业招聘广告模板
  • 电子商务网站建设调查报告房地产销售政策
  • 教育网站开发公司企业做网站都购买域名吗
  • 网站背景图片优化wordpress适配手机端
  • 玉器珠宝做网站潜江做网站的公司
  • 手机端网站首页怎么做北京道路建设在什么网站查询
  • 哪个网站做飞机订单做得比较好的公司网站
  • 企业建设网站的步骤是什么网站怎么做mip技术
  • 建设大厦网站家居网站建设行业现状
  • 国外高校实验室网站建设成果深圳网站设计公司在哪里
  • 滁州建设网站百度网站v认证
  • 纯html网站开发工具高端网站建设注意
  • 网站都有什么功能河北建设工程信息网计算机辅助系统
  • 巨野县城乡和住房建设局网站网站业务员怎么给客户做方案
  • 微信视频网站怎么做的好处百度公司地址
  • 上海建设银行网站家里的网络用哪个公司
  • 杭州文化传媒类高端网站建设公司客户管理系统 软件
  • 做网站首选智投未来1做孵化的网站
  • 公司制作一个网站网站做下子压缩文件的链接
  • 菏泽住房和城乡建设部网站wordpress插件排列
  • 男女之间做那个的网站wordpress分类目录和标签的作用