网站开发的前景,中小微企业查询平台,微信红包封面分销平台,如何做电商创业消息中间件的对比 消息中间件 ActiveMQ RabbitMQ RocketMQ kafka 开发语言 java erlang java scala 单击吞吐量 万级 万级 10万级 10万级 时效性 ms us ms ms 可用性 高(主从架构) 高(主从架构) 非常高(主从架构) 非常高(主从架构)
消息中间件: acti…消息中间件的对比 消息中间件 ActiveMQ RabbitMQ RocketMQ kafka 开发语言 java erlang java scala 单击吞吐量 万级 万级 10万级 10万级 时效性 ms us ms ms 可用性 高(主从架构) 高(主从架构) 非常高(主从架构) 非常高(主从架构)
消息中间件: activeMQ:javajms协议性能一般吞吐量低。rabbitMQ:erlang(amqp协议)性能好功能丰富吞吐量一般。rocketMQjava性能好吞吐量丰富功能丰富。Kafka: scala吞吐量最大功能单一大数据领域
RocketMQ 是阿里开源的分布式消息中间件跟其它中间件相比RocketMQ 的特点是纯JAVA实现是一套提供了消息生产存储消费全过程API的软件系统。 RocketMQ的作用数据收集、限流削峰、异步解耦 数据收集 分布式系统会产生海量级数据流如:业务日志、监控数据、用户行为等。针对这些数据流进行实时或批量采集汇总然后对这些数据流进行大数据分析这是当前互联网平台的必备技术。通过MQ完成此类数据收集是最好的选择。
限流削峰 MQ可以将系统的超量请求暂存其中以便系统后期可以慢慢进行处理从而避免了请求的丢失或系统被压垮。
异步解耦 上游系统对下游系统的调用若为同步调用则会大大降低系统的吞吐量与并发度且系统耦合度太高、而异步调用则会解决这些问题。所以两层之间若要实现由同步到异步的转化一般性做法就是在这两层间添加一个MQ层。 rocketmq.apache.org Broker:经纪人(经理人)
Topic主题消息区分分类,虚拟结构
Queue消息队列 Apache RocketMQ 是一款低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 可为分布式应用系统提供异步解耦和削峰填谷的能力同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。
发送消息
发送同步消息
同步消息发送后会用一个返回值也就是MQ服务器接收到消息返回的一个确认这种方式非常安全但是性能就没那么高而在MQ集群中也是要等到所有的从机都复制了消息以后才会返回这种方式适合重要消息的场景
Test
void rocketmqProducerTest() throws Exception {
//创建生产者
DefaultMQProducer producer new DefaultMQProducer(ProducerGroupName);
//连接namesrv
producer.setNamesrvAddr(192.168.68.133:9876);
//启动
producer.start();
//创建消息
Message message new Message(Topic,消息.getBytes());
SendResult send producer.send(message);
System.out.println(发送状态send.getSendStatus());
//关闭生产者
producer.shutdown();
}Test
void rocketmqConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer new DefaultMQPushConsumer(ConsumerTest);
//连接namesrv
consumer.setNamesrvAddr(192.168.68.133:9876);
//订阅主题 *表示该主题的所有消息
consumer.subscribe(Topic,*);//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
Override
public ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {
//业务处理for (MessageExt messageExt : msgs) {
System.out.println(消费了new String(messageExt.getBody()));
}
System.out.println(消费者上下文context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
} 发送异步消息
异步消息常用在对响应时间敏感的业务场景发送端不能容忍长时间等待Broker的响应。发送完后会有一个异步消息通知 Test
void aysncProducerTest() throws Exception {
//创建生产者
DefaultMQProducer producer new DefaultMQProducer(ProducerGroupName);
//连接namesrv
producer.setNamesrvAddr(192.168.68.133:9876);
//启动
producer.start();
//创建消息
Message message new Message(aysncTopic,异步消息.getBytes());
producer.send(message, new SendCallback() {
Override
public void onSuccess(SendResult sendResult) {
System.out.println(发送成功sendResult);
}Override
public void onException(Throwable throwable) {
System.err.println(发送失败throwable);
}
});
System.out.println(执行了);
//关闭生产者
//producer.shutdown();
//挂起当前jvm
System.in.read();
}Test
void aysncConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer new DefaultMQPushConsumer(ConsumerTest);
//连接namesrv
consumer.setNamesrvAddr(192.168.68.133:9876);
//订阅主题 *表示该主题的所有消息
consumer.subscribe(aysncTopic,*);//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
Override
public ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {
//业务处理for (MessageExt messageExt : msgs) {
System.out.println(消费了new String(messageExt.getBody()));
}
System.out.println(消费者上下文context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
} 发送单向消息
单向消息发送这种方式不关心发送结果的场景这种方式吞吐量大但存在消息丢失的风险。使用案例日志信息发送
Test
void OnewayProducerTest()throws Exception{
//创建生产者
DefaultMQProducer producer new DefaultMQProducer(Oneway);
//连接namesrv
producer.setNamesrvAddr(192.168.68.133:9876);
//启动
producer.start();
//创建消息
Message message new Message(Oneway,单向消息.getBytes());
//发送消息
producer.sendOneway(message);
//关闭生产者
producer.shutdown();
}Test
void OnewayConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer new DefaultMQPushConsumer(ConsumerTest);
//连接namesrv
consumer.setNamesrvAddr(192.168.68.133:9876);
//订阅主题 *表示该主题的所有消息
consumer.subscribe(Oneway,*);//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
Override
public ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {
//业务处理for (MessageExt messageExt : msgs) {
System.out.println(消费了new String(messageExt.getBody()));
}
System.out.println(消费者上下文context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}
发送延时消息
发送延时消息顾名思义。场景比如淘宝商城下单后并未支付有30分钟未支付订单状态 Test
void delayProducerTest()throws Exception{
//创建生产者
DefaultMQProducer producer new DefaultMQProducer(delayGroup);
//连接namesrv
producer.setNamesrvAddr(192.168.68.133:9876);
//启动
producer.start();
//创建消息
Message message new Message(delay,延迟消息.getBytes());
//设置延时 根据官方延时等级
message.setDelayTimeLevel(2);
//发送消息
producer.sendOneway(message);
//关闭生产者
producer.shutdown();
}Test
void delayConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer new DefaultMQPushConsumer(ConsumerTest);
//连接namesrv
consumer.setNamesrvAddr(192.168.68.133:9876);
//订阅主题 *表示该主题的所有消息
consumer.subscribe(delay,*);//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
Override
public ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {
//业务处理for (MessageExt messageExt : msgs) {
System.out.println(消费了new String(messageExt.getBody()));
}
System.out.println(消费者上下文context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}
发送批量消息
批量消息可以一次性发送一组消息
Test
void delayProducerTest()throws Exception{
//创建生产者
DefaultMQProducer producer new DefaultMQProducer(delayGroup);
//连接namesrv
producer.setNamesrvAddr(192.168.68.133:9876);
//启动
producer.start();
//创建消息
//批量消息
ListMessage messages Arrays.asList(
new Message(delay,批量消息1.getBytes()),
new Message(delay,批量消息2.getBytes()),
new Message(delay,批量消息3.getBytes())
);
//发送消息
producer.send(messages);
//关闭生产者
producer.shutdown();
}
发送带标签消息
RocketMQ提供消息过滤功能可根据业务逻辑区分带有A标签的被A消费带有B标签的被B消费
Test
void TagProducerTest()throws Exception{
//创建生产者
DefaultMQProducer producer new DefaultMQProducer(TagGroup);
//连接namesrv
producer.setNamesrvAddr(192.168.68.133:9876);
//启动
producer.start();
//创建消息
//批量消息
Message message1 new Message(tagTopic, tagA, tag标签内容A.getBytes());
Message message2 new Message(tagTopic, tagB, tag标签内容B.getBytes());
//发送消息
producer.send(message1);
producer.send(message2);
//关闭生产者
producer.shutdown();
}Test
void TagAConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer new DefaultMQPushConsumer(ConsumerTest);
//连接namesrv
consumer.setNamesrvAddr(192.168.68.133:9876);
//订阅主题 *表示该主题的所有消息
consumer.subscribe(tagTopic,tagA);//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
Override
public ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {
//业务处理for (MessageExt messageExt : msgs) {
System.out.println(消费了new String(messageExt.getBody()));
}
System.out.println(消费者上下文context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}Test
void TagBConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer new DefaultMQPushConsumer(ConsumerTest);
//连接namesrv
consumer.setNamesrvAddr(192.168.68.133:9876);
//订阅主题 *表示该主题的所有消息
consumer.subscribe(tagTopic,tagB);//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
Override
public ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {
//业务处理for (MessageExt messageExt : msgs) {
System.out.println(消费了new String(messageExt.getBody()));
}
System.out.println(消费者上下文context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
} 发送顺序消息
发送的消息要保证消息是一定有序的顺序消息发送到同一个队列
实体类Data
AllArgsConstructor
public class MessageM {
private int userID;
private String desc;
}顺序消息发送到同一个队列private ListMessageM messageMs Arrays.asList(
new MessageM(1,下单),
new MessageM(1,付款),
new MessageM(1,配送),
new MessageM(2,下单),
new MessageM(2,付款),
new MessageM(2,配送)
);Test
void orderProducerTest() throws Exception {
//创建生产者
DefaultMQProducer producer new DefaultMQProducer(ProducerGroupName);
//连接namesrv
producer.setNamesrvAddr(192.168.211.131:9876);
//启动
producer.start();messageMs.forEach(messageM - {
//创建消息
Message message new Message(orderMsg,messageM.toString().getBytes());
//发送顺序消息发送到同一个队列
try {
//相同的userID去相同的队列
producer.send(
message,
new MessageQueueSelector() {
Override
public MessageQueue select(ListMessageQueue list, Message message, Object o) {
//选择队列
int hashCode o.toString().hashCode();
int i hashCode % list.size();
return list.get(i);
}
},
messageM.getUserID());
} catch (MQClientException e) {
throw new RuntimeException(e);
} catch (RemotingException e) {
throw new RuntimeException(e);
} catch (MQBrokerException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
//关闭生产者
producer.shutdown();
} Test
void orderConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer new DefaultMQPushConsumer(ConsumerTest);
//连接namesrv
consumer.setNamesrvAddr(192.168.211.131:9876);
//订阅主题 *表示该主题的所有消息
consumer.subscribe(orderMsg,*);//设置监听器(一直,异步回调方式)
//MessageListenerConcurrently 并发模式多线程
//MessageListenerOrderly 顺序模式单线程
consumer.registerMessageListener(new MessageListenerOrderly() {
//消费方法
Override
public ConsumeOrderlyStatus consumeMessage(ListMessageExt list, ConsumeOrderlyContext consumeOrderlyContext) {
System.out.println(当前线程IDThread.currentThread().getId());
return ConsumeOrderlyStatus.SUCCESS;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}