福州网站制作有限公司,潍坊做网站多少钱,微信官方网站首页,建设英文网站公司哪家好本人理解#xff0c;顺序消息如果不分消息组#xff0c;那么会影响并行处理速度#xff0c;所以尽量消息组分的散一些 首先上要求#xff0c;官方文档如下#xff1a; 总结#xff1a; 1.必须同一个消息组#xff0c;消息组和消费组不是一个概念#xff0c;不要混 2.必…本人理解顺序消息如果不分消息组那么会影响并行处理速度所以尽量消息组分的散一些 首先上要求官方文档如下 总结 1.必须同一个消息组消息组和消费组不是一个概念不要混 2.必须单一生产者也就是说线上生产只能开一个 pod感觉局限有点高无法多 pod 接入 3.必须串行发送这个点也不太好限制过高 以上三点 在我的案例中企业微信回调消息 那么只能开一个接口服务Pod 来接收微信回调如果挂了那完蛋开多个 pod的话那只有把请求放队列通过队列pop进行消费再生产消息到 mq,这样同时也解决了第三点的串行发送。
生产的时候如何确定是顺序消息,只需要生产消息的时候给设定一个消息组
msg : rmq_client.Message{
Topic: Topic,Body: []byte(this is a message : strconv.Itoa(i)),
}
// set keys and tag
msg.SetKeys(a, b)
msg.SetTag(ab)
// 这里设置消息组
msg.SetMessageGroup(fifo)分组分的越细越好
提高消费速度
当拿到消息后根据消息分组来进行并发处理每个分组内进行串行处理关键代码如下 func (m *RocketMq) ConsumerOrderly(funcMap map[string]func([]byte) error) {var err errorm.consumerOnce.Do(func() {Log().Info(##############pro consume orderly start#############, m.MqConfig)errTmp : m.proSimConsumer.Start()if errTmp ! nil {Log().Panic(MQ启动失败, errTmp)return}})defer m.proSimConsumer.GracefulStop()// 总体保证有N个在运行var ch make(chan int, m.MaxGoroutine)for {fmt.Println(start receive message)mvs, errReceive : m.proSimConsumer.Receive(context.TODO(), 8, 20*time.Second)if errReceive ! nil {if strings.Contains(errReceive.Error(), no new message) {Log().Info(errReceive)} else {Log().Error(顺序消息拉取MQ消息失败:, errReceive)}time.Sleep(time.Second * 2)continue}var msgGroupMap make(map[string][]*rmq_client.MessageView, 0)for _, v : range mvs {if _, ok : funcMap[*v.GetTag()]; !ok {Log().Error(v.GetTag(), : action do not exist)continue} else {// 根据msgGroup汇总msgGroup : v.GetMessageGroup()msgGroupMap[*msgGroup] append(msgGroupMap[*msgGroup], v)}}// 最大程度多线程消费for _, item : range msgGroupMap {ch - 1go func(item []*rmq_client.MessageView) {for _, v : range item {fmt.Println(*v.GetTag(), *v.GetMessageGroup(), string(v.GetBody()))action, _ : funcMap[*v.GetTag()]if errTmp : action(v.GetBody()); errTmp ! nil {Log().Error(mq顺序消费失败, errTmp)break} else {m.proSimConsumer.Ack(context.TODO(), v)}}-ch}(item)}}
}最后需要注意的点
同一个消费者 Group ID 下所有的 Consumer 实例必须保证订阅的 Topic 一致并且也必须保证订阅 Topic 时设置的过滤规则Tag一致。否则您的消息可能会丢失 请保证订阅一致性