当前位置: 首页 > news >正文

网站的行为怎么做东阳光门户平台

网站的行为怎么做,东阳光门户平台,如何做网站做网站需要多少钱,怎么搭建一个网站教程RabbitMQ学习笔记 一些概念 Broker #xff1a;RabbitMQ服务。 virtual host#xff1a; 其实就是分组。 Connection#xff1a;连接#xff0c;生产者消费者与Broker之间的TCP连接。 Channel#xff1a;网络信道#xff0c;轻量级的Connection#xff0c;使用Chann…RabbitMQ学习笔记 一些概念 Broker RabbitMQ服务。 virtual host 其实就是分组。 Connection连接生产者消费者与Broker之间的TCP连接。 Channel网络信道轻量级的Connection使用Channel可以减少Connection的建立减少开销。 Message消息由 Properties和 Body组成Properties可以对消息的优先级、延迟等特性进行记录Body存储消息体的内容。 Exchange交换机没有消息存储功能负责分发消息。 BindingExchange和Queue之间的虚拟连接其中可以包含Routing Key。 Routing Key路由规则用于确定如何分发、接收消息。 Queue消息队列保存消息并将其转发给消费者进行消费。 安装 Windows安装 安装erLang语言 进入官网 下载完之后一直下一步安装即可安装完成后进入目录配置环境变量 安装RabbitMQ服务端 Release RabbitMQ 3.7.3 · rabbitmq/rabbitmq-server (github.com) 一直下一步安装即可 安装完成后打开安装目录进入到这个文件夹打开命令行 输入命令安装插件 rabbitmq-plugins enable rabbitmq_management完成后双击rabbitmq-server.bat 打开http://localhost:15672/ 用户名密码是guest/guest Linux下使用 Docker 安装 直接拉取最新版 docker pull rabbitmq运行容器 docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management进入容器 docker exec -it rabbitmq /bin/bash开启管理插件 rabbitmq-plugins enable rabbitmq_management打开管理网站 http://localhost:15672/ 4369, 25672 (Erlang发现集群端口) 5672, 5671 (AMQP端口) 15672 (web管理后台端口) 61613, 61614 (STOMP协议端口) 1883, 8883 (MQTT协议端口) 用户名密码均为 guest 实操 官网例子 简单模式 配置文件 application-easy.yml spring:rabbitmq:host: 123.123.123.123port: 5672username: Gettlerpassword: ********virtual-host: /queue: easy-queue生产者 package com.gettler.rabbitmq.easy;import com.gettler.rabbitmq.RabbitmqApplication; import com.gettler.rabbitmq.config.RabbitMqConnectionFactory; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.junit4.SpringRunner;ActiveProfiles(easy) RunWith(SpringRunner.class) SpringBootTest(classes RabbitmqApplication.class, webEnvironment SpringBootTest.WebEnvironment.RANDOM_PORT) public class ProducerTest {// 队列名称Value(${spring.rabbitmq.queue})public String QUEUE_NAME;private static final Logger logger LoggerFactory.getLogger(ProducerTest.class);Testpublic void testProducer() throws Exception {// 创建一个connectionConnection connection RabbitMqConnectionFactory.getSingleInstanceConnection();// 创建一个channelChannel channel connection.createChannel();/*创建一个队列1.队列名称2.队列里面的消息是否持久化(默认为false,代表消息存储在内存中)3.该队列是否只供一个消费者进行消费,是否进行共享(true表示可以多个消费者消费)4.表示最后一个消费者断开连接以后,该队列是否自动删除(true表示自动删除)5.其他参数*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message hello, this is an easy message;/*发送一个消息1.发送到那个交换机(空代表默认交换机)2.路由key3.其他的参数信息4.发送消息的消息体*/channel.basicPublish(, QUEUE_NAME, null, message.getBytes());logger.info(消息发送完毕);} }消费者 package com.gettler.rabbitmq.easy;import com.gettler.rabbitmq.RabbitmqApplication; import com.gettler.rabbitmq.config.RabbitMqConnectionFactory; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DeliverCallback; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.junit4.SpringRunner;ActiveProfiles(easy) RunWith(SpringRunner.class) SpringBootTest(classes RabbitmqApplication.class, webEnvironment SpringBootTest.WebEnvironment.RANDOM_PORT) public class ConsumerTest {// 队列名称Value(${spring.rabbitmq.queue})public String QUEUE_NAME;private static final Logger logger LoggerFactory.getLogger(ConsumerTest.class);Testpublic void testConsumer() throws Exception {// 创建一个connectionConnection connection RabbitMqConnectionFactory.getSingleInstanceConnection();// 创建一个channelChannel channel connection.createChannel();// 消费消息的回调DeliverCallback deliverCallback (consumerTag, message) - {logger.info(消费消息成功,消息内容为: new String(message.getBody()));};// 取消消费的回调CancelCallback cancelCallback (consumerTag) - {logger.info(消息消费被中断);};/*消费者消费消息1.消费的队列名称2.消费成功之后是否要自动应答(true代表自动应答,false代表手动应答)3.消费者消费消息的回调(函数式接口)4.消费者取消消费的回调(函数式接口)*/channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);} }工作模式 配置文件 application-work.yml spring:rabbitmq:host: 123.123.123.123port: 5672username: Gettlerpassword: ********virtual-host: /queue: work-queue生产者 package com.gettler.rabbitmq.work;import com.gettler.rabbitmq.RabbitmqApplication; import com.gettler.rabbitmq.config.RabbitMqConnectionFactory; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.junit4.SpringRunner;import java.util.Scanner;ActiveProfiles(work) RunWith(SpringRunner.class) SpringBootTest(classes RabbitmqApplication.class, webEnvironment SpringBootTest.WebEnvironment.RANDOM_PORT) public class ProducerTest {// 队列名称Value(${spring.rabbitmq.queue})public String QUEUE_NAME;private static final Logger logger LoggerFactory.getLogger(ProducerTest.class);Value(${spring.rabbitmq.host})private String host;Value(${spring.rabbitmq.username})private String username;Value(${spring.rabbitmq.password})private String password;Testpublic void testProducer() throws Exception {System.out.println(this.host);// 创建一个connectionConnection connection RabbitMqConnectionFactory.getSingleInstanceConnection();// 创建一个channelChannel channel connection.createChannel();// 声明交换机channel.exchangeDeclare(fanout, BuiltinExchangeType.FANOUT);/*创建一个队列1.队列名称2.队列里面的消息是否持久化(默认为false,代表消息存储在内存中)3.该队列是否只供一个消费者进行消费,是否进行共享(true表示可以多个消费者消费)4.表示最后一个消费者断开连接以后,该队列是否自动删除(true表示自动删除)5.其他参数*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);Scanner scanner new Scanner(System.in);while (scanner.hasNext()) {String message scanner.next();/*发送一个消息1.发送到那个交换机(空代表默认交换机)2.路由key3.其他的参数信息4.发送消息的消息体*/channel.basicPublish(, QUEUE_NAME, null, message.getBytes());logger.info(消息发送完毕);}} }消费者A package com.gettler.rabbitmq.work;import com.gettler.rabbitmq.RabbitmqApplication; import com.gettler.rabbitmq.config.RabbitMqConnectionFactory; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DeliverCallback; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.junit4.SpringRunner;ActiveProfiles(work) RunWith(SpringRunner.class) SpringBootTest(classes RabbitmqApplication.class, webEnvironment SpringBootTest.WebEnvironment.RANDOM_PORT) public class ConsumerATest {// 队列名称Value(${spring.rabbitmq.queue})public String QUEUE_NAME;private static final Logger logger LoggerFactory.getLogger(ConsumerATest.class);Testpublic void testConsumerA() throws Exception {// 创建一个connectionConnection connection RabbitMqConnectionFactory.getSingleInstanceConnection();// 创建一个channelChannel channel connection.createChannel();// 消费消息的回调DeliverCallback deliverCallback (consumerTag, message) - {logger.info(消费消息成功,消息内容为: new String(message.getBody()));};// 取消消费的回调CancelCallback cancelCallback (consumerTag) - {logger.info(消息消费被中断);};/*消费者消费消息1.消费的队列名称2.消费成功之后是否要自动应答(true代表自动应答,false代表手动应答)3.消费者消费消息的回调(函数式接口)4.消费者取消消费的回调(函数式接口)*/channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);} }消费者B package com.gettler.rabbitmq.work;import com.gettler.rabbitmq.RabbitmqApplication; import com.gettler.rabbitmq.config.RabbitMqConnectionFactory; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DeliverCallback; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.junit4.SpringRunner;ActiveProfiles(work) RunWith(SpringRunner.class) SpringBootTest(classes RabbitmqApplication.class, webEnvironment SpringBootTest.WebEnvironment.RANDOM_PORT) public class ConsumerBTest {// 队列名称Value(${spring.rabbitmq.queue})public String QUEUE_NAME;private static final Logger logger LoggerFactory.getLogger(ConsumerBTest.class);Testpublic void testConsumerB() throws Exception {// 创建一个connectionConnection connection RabbitMqConnectionFactory.getSingleInstanceConnection();// 创建一个channelChannel channel connection.createChannel();// 消费消息的回调DeliverCallback deliverCallback (consumerTag, message) - {logger.info(消费消息成功,消息内容为: new String(message.getBody()));};// 取消消费的回调CancelCallback cancelCallback (consumerTag) - {logger.info(消息消费被中断);};/*消费者消费消息1.消费的队列名称2.消费成功之后是否要自动应答(true代表自动应答,false代表手动应答)3.消费者消费消息的回调(函数式接口)4.消费者取消消费的回调(函数式接口)*/channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);} }路由模式 配置文件 application-direct.yml spring:rabbitmq:host: 123.123.123.123port: 5672username: Gettlerpassword: ********virtual-host: / 生产者 package com.gettler.rabbitmq.direct;import com.gettler.rabbitmq.RabbitmqApplication; import com.gettler.rabbitmq.config.RabbitMqConnectionFactory; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.junit4.SpringRunner;import java.util.HashMap; import java.util.Map;ActiveProfiles(direct) RunWith(SpringRunner.class) SpringBootTest(classes RabbitmqApplication.class, webEnvironment SpringBootTest.WebEnvironment.RANDOM_PORT) public class ProducerTest {private static final Logger logger LoggerFactory.getLogger(ProducerTest.class);Testpublic void testProducer() throws Exception {// 创建channelConnection connection RabbitMqConnectionFactory.getSingleInstanceConnection();Channel channel connection.createChannel();// 声明交换机channel.exchangeDeclare(direct, BuiltinExchangeType.DIRECT);MapString, String messageMap new HashMap();messageMap.put(info, 普通 info 信息);messageMap.put(warning, 警告 warning 信息);messageMap.put(error, 错误 error 信息);messageMap.put(debug, 调试 debug 信息);for (Map.EntryString, String mes : messageMap.entrySet()) {String routingKey mes.getKey();String message mes.getValue();channel.basicPublish(direct, routingKey, null, message.getBytes());logger.info(消息发送完毕);}} }消费者A package com.gettler.rabbitmq.direct;import com.gettler.rabbitmq.RabbitmqApplication; import com.gettler.rabbitmq.config.RabbitMqConnectionFactory; import com.rabbitmq.client.*; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.junit4.SpringRunner;ActiveProfiles(direct) RunWith(SpringRunner.class) SpringBootTest(classes RabbitmqApplication.class, webEnvironment SpringBootTest.WebEnvironment.RANDOM_PORT) public class ConsumerATest {private static final Logger logger LoggerFactory.getLogger(ConsumerATest.class);Testpublic void testConsumerA() throws Exception {// 创建一个connectionConnection connection RabbitMqConnectionFactory.getSingleInstanceConnection();// 创建一个channelChannel channel connection.createChannel();// 创建channel// 声明交换机channel.exchangeDeclare(direct, BuiltinExchangeType.DIRECT);// 声明临时队列channel.queueDeclare(console, false, false, false, null);// 绑定队列与交换机channel.queueBind(console, direct, info);channel.queueBind(console, direct, warning);// 消费消息DeliverCallback deliverCallback (consumerTag, message) - {logger.info(获得消息: new String(message.getBody()));};CancelCallback cancelCallback (consumerTag) - {logger.info(消息消费被中断);};channel.basicConsume(console, true, deliverCallback, cancelCallback);} }消费者B package com.gettler.rabbitmq.direct;import com.gettler.rabbitmq.RabbitmqApplication; import com.gettler.rabbitmq.config.RabbitMqConnectionFactory; import com.rabbitmq.client.*; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.junit4.SpringRunner;ActiveProfiles(direct) RunWith(SpringRunner.class) SpringBootTest(classes RabbitmqApplication.class, webEnvironment SpringBootTest.WebEnvironment.RANDOM_PORT) public class ConsumerBTest {private static final Logger logger LoggerFactory.getLogger(ConsumerBTest.class);Testpublic void testConsumerB() throws Exception {// 创建一个connectionConnection connection RabbitMqConnectionFactory.getSingleInstanceConnection();// 创建一个channelChannel channel connection.createChannel();// 声明交换机channel.exchangeDeclare(direct, BuiltinExchangeType.DIRECT);// 声明临时队列channel.queueDeclare(disk, false, false, false, null);// 绑定队列与交换机channel.queueBind(disk, direct, error);// 消费消息DeliverCallback deliverCallback (consumerTag, message) - {logger.info(获得消息: new String(message.getBody()));};CancelCallback cancelCallback (consumerTag) - {logger.info(消息消费被中断);};channel.basicConsume(disk, true, deliverCallback, cancelCallback);} }广播模式 配置文件 application-fanout.yml spring:rabbitmq:host: 123.123.123.123port: 5672username: Gettlerpassword: ********virtual-host: / 生产者 package com.gettler.rabbitmq.fanout;import com.gettler.rabbitmq.RabbitmqApplication; import com.gettler.rabbitmq.config.RabbitMqConnectionFactory; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.junit4.SpringRunner;ActiveProfiles(fanout) RunWith(SpringRunner.class) SpringBootTest(classes RabbitmqApplication.class, webEnvironment SpringBootTest.WebEnvironment.RANDOM_PORT) public class ProducerTest {private static final Logger logger LoggerFactory.getLogger(ProducerTest.class);Testpublic void testProducer() throws Exception {// 创建channelConnection connection RabbitMqConnectionFactory.getSingleInstanceConnection();Channel channel connection.createChannel();// 声明交换机channel.exchangeDeclare(fanout, BuiltinExchangeType.FANOUT);// 发送10条消息for (int i 0; i 10; i) {String message i ;channel.basicPublish(fanout, , null, message.getBytes());logger.info(消息发送完毕 message);}} }消费者A package com.gettler.rabbitmq.fanout;import com.gettler.rabbitmq.RabbitmqApplication; import com.gettler.rabbitmq.config.RabbitMqConnectionFactory; import com.rabbitmq.client.*; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.junit4.SpringRunner;ActiveProfiles(fanout) RunWith(SpringRunner.class) SpringBootTest(classes RabbitmqApplication.class, webEnvironment SpringBootTest.WebEnvironment.RANDOM_PORT) public class ConsumerATest {private static final Logger logger LoggerFactory.getLogger(ConsumerATest.class);Testpublic void testConsumerA() throws Exception {// 创建一个connectionConnection connection RabbitMqConnectionFactory.getSingleInstanceConnection();// 创建一个channelChannel channel connection.createChannel();// 声明交换机channel.exchangeDeclare(fanout, BuiltinExchangeType.FANOUT);// 声明临时队列String queueName channel.queueDeclare().getQueue();// 绑定队列与交换机channel.queueBind(queueName, fanout, );// 消费消息DeliverCallback deliverCallback (consumerTag, message) - {logger.info(获得消息: new String(message.getBody()));};CancelCallback cancelCallback (consumerTag) - {logger.info(消息消费被中断);};channel.basicConsume(queueName, true, deliverCallback, cancelCallback);} }消费者B package com.gettler.rabbitmq.fanout;import com.gettler.rabbitmq.RabbitmqApplication; import com.gettler.rabbitmq.config.RabbitMqConnectionFactory; import com.rabbitmq.client.*; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.junit4.SpringRunner;ActiveProfiles(fanout) RunWith(SpringRunner.class) SpringBootTest(classes RabbitmqApplication.class, webEnvironment SpringBootTest.WebEnvironment.RANDOM_PORT) public class ConsumerBTest {private static final Logger logger LoggerFactory.getLogger(ConsumerBTest.class);Testpublic void testConsumerB() throws Exception {// 创建一个connectionConnection connection RabbitMqConnectionFactory.getSingleInstanceConnection();// 创建一个channelChannel channel connection.createChannel();// 声明交换机channel.exchangeDeclare(fanout, BuiltinExchangeType.FANOUT);// 声明临时队列String queueName channel.queueDeclare().getQueue();// 绑定队列与交换机channel.queueBind(queueName, fanout, );// 消费消息DeliverCallback deliverCallback (consumerTag, message) - {logger.info(获得消息: new String(message.getBody()));};CancelCallback cancelCallback (consumerTag) - {logger.info(消息消费被中断);};channel.basicConsume(queueName, true, deliverCallback, cancelCallback);} }主题模式 配置文件 application-topic.yml spring:rabbitmq:host: 123.123.123.123port: 5672username: Gettlerpassword: ********virtual-host: / 生产者 package com.gettler.rabbitmq.topic;import com.gettler.rabbitmq.RabbitmqApplication; import com.gettler.rabbitmq.config.RabbitMqConnectionFactory; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.junit4.SpringRunner;import java.util.HashMap; import java.util.Map;/*** author Gettler* date 2024/06/13*/ ActiveProfiles(topic) RunWith(SpringRunner.class) SpringBootTest(classes RabbitmqApplication.class, webEnvironment SpringBootTest.WebEnvironment.RANDOM_PORT) public class ProducerTest {private static final Logger logger LoggerFactory.getLogger(ProducerTest.class);Testpublic void testProducer() throws Exception {// 创建channelConnection connection RabbitMqConnectionFactory.getSingleInstanceConnection();Channel channel connection.createChannel();// 声明交换机channel.exchangeDeclare(topic, BuiltinExchangeType.TOPIC);MapString, String messageMap new HashMap();messageMap.put(class1.DB.exam, 一班数据库考试通知);messageMap.put(class1.OS.exam, 一班操作系统考试通知);messageMap.put(class2.DB.exam, 二班数据库考试通知);messageMap.put(class2.OS.exam, 二班操作系统考试通知);for (Map.EntryString, String mes : messageMap.entrySet()) {String routingKey mes.getKey();String message mes.getValue();channel.basicPublish(topic, routingKey, null, message.getBytes());logger.info(消息发送完毕);}} }消费者A模拟一班的学生 package com.gettler.rabbitmq.topic;import com.gettler.rabbitmq.RabbitmqApplication; import com.gettler.rabbitmq.config.RabbitMqConnectionFactory; import com.rabbitmq.client.*; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.junit4.SpringRunner;/*** author Gettler* date 2024/06/13*/ ActiveProfiles(topic) RunWith(SpringRunner.class) SpringBootTest(classes RabbitmqApplication.class, webEnvironment SpringBootTest.WebEnvironment.RANDOM_PORT) public class StudentOfClass1Consumer {private static final Logger logger LoggerFactory.getLogger(StudentOfClass1Consumer.class);Testpublic void testStudentOfClass1Consumer() throws Exception {// 创建一个connectionConnection connection RabbitMqConnectionFactory.getSingleInstanceConnection();// 创建一个channelChannel channel connection.createChannel();// 声明交换机channel.exchangeDeclare(topic, BuiltinExchangeType.TOPIC);// 创建Q1队列channel.queueDeclare(student_of_class1, false, false, false, null);// 绑定队列与交换机channel.queueBind(student_of_class1, topic, class1.#);// 消费消息DeliverCallback deliverCallback (consumerTag, message) - {logger.info(获得消息: new String(message.getBody()));};CancelCallback cancelCallback (consumerTag) - {logger.info(消息消费被中断);};channel.basicConsume(student_of_class1, true, deliverCallback, cancelCallback);} }消费者B模拟操作系统老师 package com.gettler.rabbitmq.topic;import com.gettler.rabbitmq.RabbitmqApplication; import com.gettler.rabbitmq.config.RabbitMqConnectionFactory; import com.rabbitmq.client.*; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.junit4.SpringRunner;/*** author Gettler* date 2024/06/13*/ ActiveProfiles(topic) RunWith(SpringRunner.class) SpringBootTest(classes RabbitmqApplication.class, webEnvironment SpringBootTest.WebEnvironment.RANDOM_PORT) public class TeacherConsumer {private static final Logger logger LoggerFactory.getLogger(TeacherConsumer.class);Testpublic void testTeacherConsumer() throws Exception {// 创建一个connectionConnection connection RabbitMqConnectionFactory.getSingleInstanceConnection();// 创建一个channelChannel channel connection.createChannel();// 声明交换机channel.exchangeDeclare(topic, BuiltinExchangeType.TOPIC);// 创建Q1队列channel.queueDeclare(teacher_of_OS, false, false, false, null);// 绑定队列与交换机channel.queueBind(teacher_of_OS, topic, #.OS.#);// 消费消息DeliverCallback deliverCallback (consumerTag, message) - {System.out.println(获得消息: new String(message.getBody()));};CancelCallback cancelCallback (consumerTag) - {System.out.println(消息消费被中断);};channel.basicConsume(teacher_of_OS, true, deliverCallback, cancelCallback);} }谷粒商城 RabbitMQ 学习笔记 新建Maven项目 添加依赖 dependencygroupIdcom.rabbitmq/groupIdartifactIdamqp-client/artifactIdversion5.0.0/version /dependency编写发送端 package org.example;import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Send { //队列名称 private final static String QUEUE_NAME helloMQ; public static void main(String[] argv) throws java.io.IOException, TimeoutException { /** * 创建连接连接到MabbitMQ */ ConnectionFactory factory new ConnectionFactory(); //设置MabbitMQ所在主机ip或者主机名 factory.setHost(localhost); //创建一个连接 Connection connection factory.newConnection(); //创建一个频道 Channel channel connection.createChannel(); //指定一个队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //发送的消息 String message hello world!; //往队列中发出一条消息 channel.basicPublish(, QUEUE_NAME, null, message.getBytes()); System.out.println( [x] Sent message ); //关闭频道和连接 channel.close(); connection.close(); } } 编写接收端 package org.example;import com.rabbitmq.client.*;import java.io.IOException;public class Recv {// 队列名称private final static String QUEUE_NAME helloMQ;public static void main(String[] argv) throws Exception {// 打开连接和创建频道与发送端一样ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);Connection connection factory.newConnection();Channel channel connection.createChannel();//声明队列主要为了防止消息接收者先运行此程序队列还不存在时创建队列。channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println( [*] Waiting for messages. To exit press CTRLC);//创建消费者Consumer consumer new DefaultConsumer(channel) {Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {String message new String(body, UTF-8);System.out.println( [x] Received message );}};channel.basicConsume(QUEUE_NAME, true, consumer);} }运行接收端 运行发送端每运行一次发送一次消息 管理网站上有接收端的连接发送端发送后便断开连接了 添加依赖 dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId /dependency编写配置文件 spring.rabbitmq.host192.168.3.200 spring.rabbitmq.port5672 spring.rabbitmq.virtual-host/创建Exchange public void createExchange() {DirectExchange directExchange new DirectExchange(hello-java-exchange, true, false);amqpAdmin.declareExchange(directExchange); }创建Queue public void createQueue() {Queue queue new Queue(hello-java-queue, true, false, false);amqpAdmin.declareQueue(queue); }连接Queue和Exchange public void createBinding() {Binding binding new Binding(hello-java-queue, Binding.DestinationType.QUEUE, hello-java-exchange, hello.java, null);amqpAdmin.declareBinding(binding); }发送消息 public void sendMessage() {String msg hello world;ListString s new ArrayList();s.add(msg);s.add(List);rabbitTemplate.convertAndSend(hello-java-exchange, hello.java, s, new CorrelationData(UUID.randomUUID().toString())); }接收消息 想要接受对象消息需使用JSON序列化机制进行消息转换 编写MyRabbitConfig配置类 Configuration public class MyRabbitConfig {AutowiredRabbitTemplate rabbitTemplate;/*** 使用JSON序列化机制进行消息转换* return*/Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();} }使用RabbitListener注解监听队列该注解参数可以是Object content, Message message, Channel channel。 RabbitListener(queues {hello-java-queue}) public void receiveMessage(Object message) {System.out.println(接受到消息内容 message); }可靠抵达 编写配置文件 # 开启发送端确认 spring.rabbitmq.publisher-confirm-typecorrelated # 开启发送端消息抵达队列的确认 spring.rabbitmq.publisher-returnstrue # 抵达队列后以异步发送优先回调抵达队列后的回调returnconfirm spring.rabbitmq.template.mandatorytrue # 手动ack消息 spring.rabbitmq.listener.simple.acknowledge-modemanual将MyRabbitConfig修改为 Configuration public class MyRabbitConfig {AutowiredRabbitTemplate rabbitTemplate;/*** 使用JSON序列化机制进行消息转换** return*/Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}PostConstruct // MyRabbitConfig对象创建完成后执行该方法public void initRabbitTemplate() {rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/*** 消息抵达节点的话ack就为true* param correlationData 当前消息的唯一关联数据消息唯一ID* param ack 消息是否成功收到* param cause 失败原因*/Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println(confirming...correlationData{ correlationData }ack{ ack },cause{ cause });}});rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {/*** 没抵达队列触发这个失败回调函数* param message* param replyCode* param replyText* param exchange* param routingKey*/Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println(Unreachable...message{ message }replyCode{ replyText }exchange{ exchange }routingKey{ routingKey });}});} }监听队列方法修改为 RabbitListener(queues {hello-java-queue}) public void receiveMessage(Message message, List list, Channel channel) throws IOException {System.out.println(接受到消息内容 list);// channel内按顺序递增long deliveryTag message.getMessageProperties().getDeliveryTag();System.out.println(deliveryTag);// 签收try {channel.basicAck(deliveryTag, false); // 是否批量签收} catch (Exception e) {// 网络中断// b1 false 丢弃 b1 true 发回服务器服务器重新入队。channel.basicNack(deliveryTag, false, false);} }
http://www.dnsts.com.cn/news/194753.html

相关文章:

  • 建设卡开通网银网站云南 网站建设
  • 简约网站内容布局vr功能网站建设
  • 网站开发和推广方案ui设计软件sketch
  • 网站优化外链wordpress分享微信插件下载地址
  • 长春专业网站建设公司网站建设 素材
  • 加强红色网站建设广告代运营公司
  • 爱站网长尾关键词挖掘工具网站设计流行趋势
  • 网站海报是怎么做的做机网站
  • 网站服务器和空间有没有做软件的外包网站
  • 查看网站开发鞍山网站建设企业
  • 自己做付费网站义乌多语言网站建设
  • 成都网站建设培训学校祥云平台做网站好不好
  • 京东网站建设设计框架图如何在网上做网站推广
  • 找人做淘宝网站需要多少钱福田做商城网站建设哪家效益快
  • 上海网站建设代码萧涵wordpress主题
  • 顺德网站开发怎么做领券网站
  • 广州万网建网站南昌市住房和城乡建设网站
  • 做动态图片的网站长沙建一个网站多少钱
  • 微商城网站建设网站制作好后 怎样上线
  • 互联网推广网站建设网页游戏挂机软件
  • 网站建设类论文格式脑卒中中心建设网站
  • 如何利用开源代码做网站石家庄seo网站优化电话
  • 十堰网站建设哪家专业青海城乡建设厅网站 官网
  • 网站标题关键词中国响应式网站案例
  • 求一外国h网站网站设计开发软件
  • 个人网站模板响应式wordpress地址怎么找回
  • 网站空间费用福州开发公司
  • 萧山做网站设计网络广告营销的概念
  • 重庆网站推广人员亚马逊网站
  • 做直播导航网站好本地搭建linux服务器做网站