佛山专业网站营销,网站seo内链建设,员工管理系统源码,网站建设参考文献英文书籍大家好#xff0c;本文主要是按照官网的教程把消费者和生产者的示例写下来#xff0c;开箱即用。
RocketMQ安装 安装请参考官方安装教程#xff1a;
快速开始 | RocketMQhttps://rocketmq.apache.org/zh/docs/quickStart/01quickstart 本人安装的是最新版本5.x#xff0c… 大家好本文主要是按照官网的教程把消费者和生产者的示例写下来开箱即用。
RocketMQ安装 安装请参考官方安装教程
快速开始 | RocketMQhttps://rocketmq.apache.org/zh/docs/quickStart/01quickstart 本人安装的是最新版本5.x上述教程中的发送与消费官方已经写的很详细就不贴出来了。 maven依赖如下 dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-client/artifactIdversion5.1.0/version/dependency
生产者示例
同步发送 同步发送是最常用的方式是指消息发送方发出一条消息后会在收到服务端同步响应之后才发下一条消息的通讯方式可靠的同步传输被广泛应用于各种场景如重要的通知消息、短消息通知等。代码如下
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;/*** ClassName CommonProducer* Description 同步发送是最常用的方式是指消息发送方发出一条消息后会在收到服务端同步响应之后才发下一条消息的通讯方式可靠的同步传输被广泛应用于各种场景如重要的通知消息、短消息通知等* Author admin* Date 2023/11/1 15:46* Version 1.0*/
public class CommonSyncProducer {public static void main(String[] args) throws Exception {// 初始化一个producer并设置Producer group nameDefaultMQProducer producer new DefaultMQProducer(eric_group); //1// 设置NameServer地址producer.setNamesrvAddr(127.0.0.1:9800); //2// 启动producerproducer.start();for (int i 0; i 100; i) {// 创建一条消息并指定topic、tag、body等信息tag可以理解成标签对消息进行再归类RocketMQ可以在消费端对tag进行过滤Message msg new Message(TopicTest /* Topic */,TagA /* Tag */,(Hello RocketMQ i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */); //3// 利用producer进行发送并同步等待发送结果SendResult sendResult producer.send(msg); //4System.out.printf(%s%n, sendResult);}// 一旦producer不再使用关闭producerproducer.shutdown();}
}
异步发送 异步发送是指发送方发出一条消息后不等服务端返回响应接着发送下一条消息的通讯方式。 消息发送方在发送了一条消息后不需要等待服务端响应即可发送第二条消息发送方通过回调接口接收服务端响应 并处理响应结果。异步发送一般用于链路耗时较长对响应时间较为敏感的业务场景。例如视频上传后通知启动转码服务 转码完成后通知推送转码结果等。代码如下
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;/*** ClassName CommonAsyncProducer* Description 异步发送是指发送方发出一条消息后不等服务端返回响应接着发送下一条消息的通讯方式。* 消息发送方在发送了一条消息后不需要等待服务端响应即可发送第二条消息发送方通过回调接口接收服务端响应* 并处理响应结果。异步发送一般用于链路耗时较长对响应时间较为敏感的业务场景。例如视频上传后通知启动转码服务* 转码完成后通知推送转码结果等。* Author admin* Date 2023/11/1 16:40* Version 1.0*/
public class CommonAsyncProducer {public static void main(String[] args) throws Exception {// 初始化一个producer并设置Producer group nameDefaultMQProducer producer new DefaultMQProducer(eric_group);// 设置NameServer地址producer.setNamesrvAddr(127.0.0.1:9876);// 启动producerproducer.start();producer.setRetryTimesWhenSendAsyncFailed(0);int messageCount 50;final CountDownLatch countDownLatch new CountDownLatch(messageCount);for (int i 0; i messageCount; i) {try {final int index i;// 创建一条消息并指定topic、tag、body等信息tag可以理解成标签对消息进行再归类RocketMQ可以在消费端对tag进行过滤Message msg new Message(TopicTest,TagA,Hello world.getBytes(RemotingHelper.DEFAULT_CHARSET));// 异步发送消息, 发送结果通过callback返回给客户端producer.send(msg, new SendCallback() {Overridepublic void onSuccess(SendResult sendResult) {System.out.printf(%-10d OK %s %n, index,sendResult.getMsgId());countDownLatch.countDown();}Overridepublic void onException(Throwable e) {System.out.printf(%-10d Exception %s %n, index, e);e.printStackTrace();countDownLatch.countDown();}});} catch (Exception e) {e.printStackTrace();countDownLatch.countDown();}}//异步发送如果要求可靠传输必须要等回调接口返回明确结果后才能结束逻辑否则立即关闭Producer可能导致部分消息尚未传输成功countDownLatch.await(5, TimeUnit.SECONDS);// 一旦producer不再使用关闭producerproducer.shutdown();}
}
顺序消息 顺序消息是一种对消息发送和消费顺序有严格要求的消息。 对于一个指定的Topic消息严格按照先进先出FIFO的原则进行消息发布和消费即先发布的消息先消费后发布的消息后消费。 在 Apache RocketMQ 中支持分区顺序消息如下图所示。我们可以按照某一个标准对消息进行分区比如图中的ShardingKey 同一个ShardingKey的消息会被分配到同一个队列中并按照顺序被消费。 需要注意的是 RocketMQ 消息的顺序性分为两部分生产顺序性和消费顺序性。只有同时满足了生产顺序性和消费顺序性才能达到上述的FIFO效果。 生产顺序性 RocketMQ 通过生产者和服务端的协议保障单个生产者串行地发送消息并按序存储和持久化。如需保证消息生产的顺序性则必须满足以下条件 单一生产者 消息生产的顺序性仅支持单一生产者不同生产者分布在不同的系统即使设置相同的分区键不同生产者之间产生的消息也无法判定其先后顺序。 串行发送生产者客户端支持多线程安全访问但如果生产者使用多线程并行发送则不同线程间产生的消息将无法判定其先后顺序。 满足以上条件的生产者将顺序消息发送至服务端后会保证设置了同一分区键的消息按照发送顺序存储在同一队列中。代码如下
public class OrderedProducer {public static void main(String[] args) throws UnsupportedEncodingException {try {DefaultMQProducer producer new DefaultMQProducer(order_group);// 设置NameServer地址producer.setNamesrvAddr(127.0.0.1:9800); //2producer.start();String[] tags new String[] {TagA, TagB, TagC, TagD, TagE};for (int i 0; i 100; i) {int orderId i % 10;Message msg new Message(TopicTest, tags[i % tags.length], KEY i,(Hello RocketMQ i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult producer.send(msg, new MessageQueueSelector() {// 其中 mqs 是可以发送的队列msg是消息arg是上述send接口中传入的Object对象返回的是该消息需要发送到的队列。// 上述例子里是以orderId作为分区分类标准对所有队列个数取余来对将相同orderId的消息发送到同一个队列中。//生产环境中建议选择最细粒度的分区键进行拆分例如将订单ID、用户ID作为分区键关键字可实现同一终端用户的消息按照顺序处理// 不同用户的消息无需保证顺序。Overridepublic MessageQueue select(ListMessageQueue mqs, Message msg, Object arg) {Integer id (Integer) arg;int index id % mqs.size();return mqs.get(index);}}, orderId);System.out.printf(%s%n, sendResult);}producer.shutdown();} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {e.printStackTrace();}}
}
延时消息 在分布式定时调度触发、任务超时处理等场景需要实现精准、可靠的定时事件触发。使用 Apache RocketMQ 的定时消息可以简化定时调度任务的开发逻辑实现高性能、可扩展、高可靠的定时触发能力。一共支持18个等级的延迟投递延时级别如下1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h;
代码如下
public class ScheduledMessageProducer {public static void main(String[] args) throws Exception {// Instantiate a producer to send scheduled messagesDefaultMQProducer producer new DefaultMQProducer(delay_group);producer.setNamesrvAddr(127.0.0.1:9800); //2// Launch producerproducer.start();int totalMessagesToSend 1;for (int i 0; i totalMessagesToSend; i) {Message message new Message(TopicTest, (Hello scheduled message i).getBytes());// This message will be delivered to consumer 10 seconds later.message.setDelayTimeLevel(3);// Send the messageproducer.send(message);}// Shutdown producer after use.producer.shutdown();}
}
批量发送消息 这里调用非常简单将消息打包成 Collection msgs 传入方法中即可需要注意的是批量消息的大小不能超过 1MiB否则需要自行分割其次同一批 batch 中 topic 必须相同代码如下
public class SimpleBatchProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer new DefaultMQProducer(batch_group);producer.setNamesrvAddr(127.0.0.1:9800);producer.start();//If you just send messages of no more than 1MiB at a time, it is easy to use batch//Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule supportString topic TopicTest;ListMessage messages new ArrayList();messages.add(new Message(topic, TagSB, OrderID001, Hello world 0.getBytes()));messages.add(new Message(topic, TagSB, OrderID002, Hello world 1.getBytes()));messages.add(new Message(topic, TagSB, OrderID003, Hello world 2.getBytes()));producer.send(messages);}
}事务消息 事务消息发送分为两个阶段。第一阶段会发送一个半事务消息半事务消息是指暂不能投递的消息 生产者已经成功地将消息发送到了 Broker但是Broker 未收到生产者对该消息的二次确认此时该消息被标记成“暂不能投递”状态 如果发送成功则执行本地事务并根据本地事务执行成功与否向 Broker 半事务消息状态commit或者rollback 半事务消息只有 commit 状态才会真正向下游投递。如果由于网络闪断、生产者应用重启等原因导致某条事务消息的二次确认丢失 Broker 端会通过扫描发现某条消息长期处于“半事务消息”时需要主动向消息生产者询问该消息的最终状态Commit或是Rollback。 这样最终保证了本地事务执行成功下游就能收到消息本地事务执行失败下游就收不到消息。总而保证了上下游数据的一致性。 事务消息发送步骤如下 1、生产者将半事务消息发送至 RocketMQ Broker。 2、RocketMQ Broker 将消息持久化成功之后向生产者返回 Ack 确认消息已经发送成功此时消息暂不能投递为半事务消息。 3、生产者开始执行本地事务逻辑。 4、生产者根据本地事务执行结果向服务端提交二次确认结果Commit或是Rollback服务端收到确认结果后处理逻辑如下 二次确认结果为Commit服务端将半事务消息标记为可投递并投递给消费者。 二次确认结果为Rollback服务端将回滚事务不会将半事务消息投递给消费者。 5、在断网或者是生产者应用重启的特殊情况下若服务端未收到发送者提交的二次确认结果或服务端收到的二次确认结果为Unknown未知状态经过固定时间后服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。 :::note 需要注意的是服务端仅仅会按照参数尝试指定次数超过次数后事务会强制回滚因此未决事务的回查时效性非常关键需要按照业务的实际风险来设置 ::: 事务消息回查步骤如下 7. 生产者收到消息回查后需要检查对应消息的本地事务执行的最终结果。 8. 生产者根据检查得到的本地事务的最终状态再次提交二次确认服务端仍按照步骤4对半事务消息进行处理。代码如下
public class TransactionProducer {/**** 事务消息的发送不再使用 DefaultMQProducer而是使用 TransactionMQProducer 进行发送* 例子中设置了事务回查的线程池如果不设置也会默认生成一个最重要的是需要实现 TransactionListener 接口* 并传入 TransactionMQProducer*/public static void main(String[] args) throws MQClientException, InterruptedException {TransactionListener transactionListener new TransactionListenerImpl();TransactionMQProducer producer new TransactionMQProducer(transaction_group);producer.setNamesrvAddr(127.0.0.1:9800);ExecutorService executorService new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueueRunnable(2000), new ThreadFactory() {Overridepublic Thread newThread(Runnable r) {Thread thread new Thread(r);thread.setName(client-transaction-msg-check-thread);return thread;}});producer.setExecutorService(executorService);producer.setTransactionListener(transactionListener);producer.start();String[] tags new String[] {TagA, TagB, TagC, TagD, TagE};for (int i 0; i 10; i) {try {Message msg new Message(TopicTest, tags[i % tags.length], KEY i,(Hello RocketMQ i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult producer.sendMessageInTransaction(msg, null);System.out.printf(%s%n, sendResult);Thread.sleep(10);} catch (MQClientException | UnsupportedEncodingException e) {e.printStackTrace();}}for (int i 0; i 100000; i) {Thread.sleep(1000);}producer.shutdown();}/**** executeLocalTransaction 是半事务消息发送成功后执行本地事务的方法具体执行完本地事务后可以在该方法中返回以下三种状态** LocalTransactionState.COMMIT_MESSAGE提交事务允许消费者消费该消息* LocalTransactionState.ROLLBACK_MESSAGE回滚事务消息将被丢弃不允许消费。* LocalTransactionState.UNKNOW暂时无法判断状态等待固定时间以后Broker端根据回查规则向生产者进行消息回查。** checkLocalTransaction是由于二次确认消息没有收到Broker端回查事务状态的方法。回查规则本地事务执行完成后* 若Broker端收到的本地事务返回状态为LocalTransactionState.UNKNOW或生产者应用退出导致本地事务未提交任何状态。* 则Broker端会向消息生产者发起事务回查第一次回查后仍未获取到事务状态则之后每隔一段时间会再次回查。*/static class TransactionListenerImpl implements TransactionListener {private AtomicInteger transactionIndex new AtomicInteger(0);private ConcurrentHashMapString, Integer localTrans new ConcurrentHashMap();Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {int value transactionIndex.getAndIncrement();int status value % 3;localTrans.put(msg.getTransactionId(), status);return LocalTransactionState.UNKNOW;}Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {Integer status localTrans.get(msg.getTransactionId());if (null ! status) {switch (status) {case 0:System.out.println(checkLocalTransaction return UNKNOW);return LocalTransactionState.UNKNOW;case 1:System.out.println(checkLocalTransaction return COMMIT_MESSAGE);return LocalTransactionState.COMMIT_MESSAGE;case 2:System.out.println(checkLocalTransaction return ROLLBACK_MESSAGE);return LocalTransactionState.ROLLBACK_MESSAGE;default:System.out.println(checkLocalTransaction return COMMIT_MESSAGE);return LocalTransactionState.COMMIT_MESSAGE;}}return LocalTransactionState.COMMIT_MESSAGE;}}
} 下一篇讲一下消费者
完整代码Xu Qian/rocketmq-client