优秀网站模板欣赏,网络规划设计师贴吧,用js做简单的网站页面,社区网站建设方案书示例一、Routing exchange类型direct#xff0c;根据消息的routekey将消息直接转发到指定队列。producer.ts 生产者主要发送消息#xff0c;consumer.ts负责接收消息#xff0c;同时也都可以创建exchange交换机#xff0c;创建队列#xff0c;为队列绑定exchange#xff…示例一、Routing exchange类型direct根据消息的routekey将消息直接转发到指定队列。producer.ts 生产者主要发送消息consumer.ts负责接收消息同时也都可以创建exchange交换机创建队列为队列绑定exchange为避免重复简化代码提高可维护性队列相关操作移动到消费者端。队列exchange交换机推荐在启动程序前手动创建好。
producer.ts
import RabbitMQ from amqplib/callback_api;function start() {RabbitMQ.connect(amqp://admin:admin1234localhost:5672?heartbeat60, function (err0, conn) {if (err0) {console.error([AMQP], err0.message);return setTimeout(start, 1000);}conn.on(error, function (err1) {if (err1.message ! Connection closing) {console.error([AMQP] conn error, err1.message);}});conn.on(close, function () {console.error([AMQP] reconnecting);return setTimeout(start, 1000);});console.log([AMQP] connected);conn.createChannel(async (err2, channel) {if (err2) {console.error([AMQP], err2.message);return setTimeout(start, 1000);}const exchangeName exchange1;channel.assertExchange(exchangeName,direct,{durable: true},(err, ok) {if (err) {console.log(exchange路由转发创建失败, err);} else {let args [info, warn, error];for (let i 0; i 10; i) {// console.log(message send!, channel.sendToQueue(// queueName,// Buffer.from(发送消息,${i}${Math.ceil(Math.random() * 100000)}),// { persistent: true, correlationId: ooooooooooooooo },// 消息持久化重启后存在// // (err: any, ok: Replies.Empty){}// ));const routeKey args[Math.floor(Math.random() * 3)];console.log(消息发送是否成功, channel.publish(exchangeName,routeKey,Buffer.from(发送消息,${i}${Math.ceil(Math.random() * 100000)},${routeKey}),{ persistent: true },));}}});});setTimeout(() {conn.close();process.exit(0);}, 1000);});
}start();consumer.ts
import RabbitMQ, { type Replies } from amqplib/callback_api;RabbitMQ.connect(amqp://admin:admin1234localhost:5672, (err0, conn) {if (err0) {console.error(err0);return;}conn.createChannel(function (err1, channel) {const queueName queue1;channel.assertQueue(queueName, { durable: true }, (err2) {if (err2) {console.log(队列创建失败, err2);return;}console.log([*] waiting...);// 一次只有一个未确认消息防止消费者过载channel.prefetch(1);channel.bindQueue(queueName, exchange1, info, {}, (err3, ok) {console.log(queueName, 队列绑定结果, err3, ok);});channel.bindQueue(queueName, exchange1, warn, {}, (err3, ok) {console.log(queueName, 队列绑定结果, err3, ok);});channel.bindQueue(queueName, exchange1, error, {}, (err3, ok) {console.log(queueName, 队列绑定结果, err3, ok);});channel.consume(queueName,function (msg) {console.log(接收到的消息, msg?.content.toString());/*// 手动确认取消channel.ack(msg); noAck:false,// 自动确认消息// if (msg) {// channel.ack(msg);// } */},{noAck: true, // 是否自动确认消息// noAck: false},(err3: any, ok: Replies.Empty) {console.log(err3, ok);},);});});conn.on(error, function (err1) {if (err1.message ! Connection closing) {console.error([AMQP] conn error, err1.message);}});conn.on(close, function () {console.error([AMQP] reconnecting);});
});consumer2.ts
import RabbitMQ from amqplib;const conn await RabbitMQ.connect(amqp://admin:admin1234localhost:5672);conn.on(error, function (err1) {if (err1.message ! Connection closing) {console.error([AMQP] conn error, err1.message);}
});conn.on(close, function () {console.error([AMQP] reconnecting);
});const channel await conn.createChannel();const queueName queue2;await channel.assertQueue(queueName, { durable: true });console.log([*] waiting...);// 一次只有一个未确认消息防止消费者过载
await channel.prefetch(1);await channel.bindQueue(queueName, exchange1, error, {});channel.consume(queueName,function (msg) {console.log(接收到的消息, msg?.content.toString());/*// 手动确认取消channel.ack(msg); noAck:false,// 自动确认消息// if (msg) {// channel.ack(msg);// } */},{noAck: true, // 是否自动确认消息// noAck: false},
);示例二、Topic
exchange的topic类型和direct类似使用的仍然是routeKey进行匹配转发topic支持通过*和#进行模糊查询。*代码一个具体单词,#代码0或多个单词。 producer.ts
import RabbitMQ from amqplib;async function start() {const conn await RabbitMQ.connect(amqp://admin:admin1234localhost:5672?heartbeat60);conn.on(error, function (err1) {if (err1.message ! Connection closing) {console.error([AMQP] conn error, err1.message);}});conn.on(close, function () {console.error([AMQP] reconnecting);return setTimeout(start, 1000);});try {const channel await conn.createChannel();console.log([AMQP] connected);const exchangeName exchange4;await channel.assertExchange(exchangeName, topic, { durable: true });let args [123.orange.456, 123.456.rabbit, lazy, lazy.123, lazy.123.456];for (let i 0; i 20; i) {// console.log(message send!, channel.sendToQueue(// queueName,// Buffer.from(发送消息,${i}${Math.ceil(Math.random() * 100000)}),// { persistent: true, correlationId: ooooooooooooooo },// 消息持久化重启后存在// // (err: any, ok: Replies.Empty){}// ));const routeKey args[Math.floor(Math.random() * args.length)];console.log(消息发送是否成功, channel.publish(exchangeName,routeKey,Buffer.from(发送消息,${i}${Math.ceil(Math.random() * 100000)},${routeKey}),{ persistent: true },));}} catch (err) {console.error([AMQP], err);return setTimeout(start, 1000);}setTimeout(() {conn.close();process.exit(0);}, 1000);
}start();consumer.ts
import RabbitMQ from amqplib;const conn await RabbitMQ.connect(amqp://admin:admin1234localhost:5672);conn.on(error, function (err1) {if (err1.message ! Connection closing) {console.error([AMQP] conn error, err1.message);}
});conn.on(close, function () {console.error([AMQP] reconnecting);
});const channel await conn.createChannel();const queueName queue1;channel.assertQueue(queueName, { durable: true });console.log([*] waiting...);// 一次只有一个未确认消息防止消费者过载
await channel.prefetch(1);// *代码一个具体单词,#代码0或多个单词
await channel.bindQueue(queueName, exchange4, *.orange.*, {});channel.consume(queueName, function (msg) {console.log(接收到的消息, msg?.content.toString());// 手动确认取消channel.ack(msg);设置noAck:false,// 自动确认消息noAck:true,不需要channel.ack(msg);try {if (msg) {channel.ack(msg);}} catch (err) {if (msg) {// 第二个参数false拒绝当前消息// 第二个参数true拒绝小于等于当前消息// 第三个参数3false从队列中清除// 第三个参数4true从新在队列中排队channel.nack(msg, false, false);}console.log(err);}
}, {// noAck: true, // 是否自动确认消息为true不需要调用channel.ack(msg);noAck: false
});consumer2.ts
import RabbitMQ, { type Replies } from amqplib/callback_api;RabbitMQ.connect(amqp://admin:admin1234localhost:5672, (err0, conn) {if (err0) {console.error(err0);return;}conn.createChannel(function (err1, channel) {const queueName queue2;channel.assertQueue(queueName, { durable: true });console.log([*] waiting...);// 一次只有一个未确认消息防止消费者过载channel.prefetch(1);channel.bindQueue(queueName, exchange4, *.*.rabbit, {}, (err, ok) {console.log(queueName, 队列绑定结果, err, ok);});channel.bindQueue(queueName, exchange4, lazy.#, {}, (err, ok) {console.log(queueName, 队列绑定结果, err, ok);});channel.consume(queueName, function (msg) {console.log(接收到的消息, msg?.content.toString());// 手动确认取消channel.ack(msg);设置noAck:false,// 自动确认消息noAck:true,不需要channel.ack(msg);try {if (msg) {channel.ack(msg);}} catch (err) {if (msg) {// 第二个参数false拒绝当前消息// 第二个参数true拒绝小于等于当前消息// 第三个参数3false从队列中清除// 第三个参数4true从新在队列中排队channel.nack(msg, false, false);}console.log(err);}}, {// noAck: true, // 是否自动确认消息为true不需要调用channel.ack(msg);noAck: false});// returnerror事件不会把消息重新放回队列channel.on(return, (msg) {console.error(消息发送失败:, msg);});channel.on(error, (err) {console.error(通道发生错误:, err);});});conn.on(error, function (err1) {if (err1.message ! Connection closing) {console.error([AMQP] conn error, err1.message);}});conn.on(close, function () {console.error([AMQP] reconnecting);});
});示例三、Headers exchange类型headers根据传递的头部信息进行转发头部信息类型为object对象。在头部信息中要设置x-match属性x-match: any any下方消息匹配上一个就可以。all下方消息要全部匹配。
producer.ts
import RabbitMQ from amqplib/callback_api;function start() {RabbitMQ.connect(amqp://admin:admin1234localhost:5672?heartbeat60, function (err0, conn) {if (err0) {console.error([AMQP], err0.message);return setTimeout(start, 1000);}conn.on(error, function (err1) {if (err1.message ! Connection closing) {console.error([AMQP] conn error, err1.message);}});conn.on(close, function () {console.error([AMQP] reconnecting);return setTimeout(start, 1000);});console.log([AMQP] connected);conn.createChannel(async (err2, channel) {if (err2) {console.error([AMQP], err2.message);return setTimeout(start, 1000);}const exchangeName exchange5;channel.assertExchange(exchangeName,headers,{durable: true},(err, ok) {if (err) {console.log(exchange路由转发创建失败, err);} else {let args [{// x-match: any, // any,下方消息匹配上一个就可以; all,下方消息要全部匹配loglevel: info,// buslevel: product,// syslevel: admin},{// x-match: any, // any,下方消息匹配上一个就可以; all,下方消息要全部匹配// loglevel: info,buslevel: product,syslevel: admin},{// x-match: any, // any,下方消息匹配上一个就可以; all,下方消息要全部匹配// loglevel: info,buslevel: product,// syslevel: admin},{// x-match: all, // any,下方消息匹配上一个就可以; all,下方消息要全部匹配loglevel: info,buslevel: product,syslevel: admin},];for (let i 0; i 20; i) {// console.log(message send!, channel.sendToQueue(// queueName,// Buffer.from(发送消息,${i}${Math.ceil(Math.random() * 100000)}),// { persistent: true, correlationId: ooooooooooooooo },// 消息持久化重启后存在// // (err: any, ok: Replies.Empty){}// ));const routeKey args[Math.floor(Math.random() * args.length)];console.log(消息发送是否成功, routeKey, channel.publish(exchangeName,,Buffer.from(发送消息,${i}${Math.ceil(Math.random() * 100000)},${JSON.stringify(routeKey)}),{persistent: true,headers: routeKey},));}}});});setTimeout(() {conn.close();process.exit(0);}, 1000);});
}start();consumer.ts
import RabbitMQ, { type Replies } from amqplib/callback_api;RabbitMQ.connect(amqp://admin:admin1234localhost:5672, (err0, conn) {if (err0) {console.error(err0);return;}conn.createChannel(function (err1, channel) {const queueName queue1;channel.assertQueue(queueName, { durable: true });console.log([*] waiting...);// 一次只有一个未确认消息防止消费者过载channel.prefetch(1);// *代码一个具体单词,#代码0或多个单词channel.bindQueue(queueName,exchange5,,{x-match: any, // any,下方消息匹配上一个就可以; all,下方消息要全部匹配loglevel: info,buslevel: product,syslevel: admin},(err, ok) {console.log(queueName, 队列绑定结果, err, ok);},);channel.consume(queueName, function (msg) {console.log(接收到的消息, msg?.content.toString());// 手动确认取消channel.ack(msg);设置noAck:false,// 自动确认消息noAck:true,不需要channel.ack(msg);try {if (msg) {channel.ack(msg);}} catch (err) {if (msg) {// 第二个参数false拒绝当前消息// 第二个参数true拒绝小于等于当前消息// 第三个参数3false从队列中清除// 第三个参数4true从新在队列中排队channel.nack(msg, false, false);}console.log(err);}}, {// noAck: true, // 是否自动确认消息为true不需要调用channel.ack(msg);noAck: false}, (err: any, ok: Replies.Empty) {console.log(err, ok);});// returnerror事件不会把消息重新放回队列channel.on(return, (msg) {console.error(消息发送失败:, msg);});channel.on(error, (err) {console.error(通道发生错误:, err);});});conn.on(error, function (err1) {if (err1.message ! Connection closing) {console.error([AMQP] conn error, err1.message);}});conn.on(close, function () {console.error([AMQP] reconnecting);});
});consumer2.ts
import RabbitMQ, { type Replies } from amqplib/callback_api;RabbitMQ.connect(amqp://admin:admin1234localhost:5672, (err0, conn) {if (err0) {console.error(err0);return;}conn.createChannel(function (err1, channel) {const queueName queue2;channel.assertQueue(queueName, { durable: true });console.log([*] waiting...);// 一次只有一个未确认消息防止消费者过载channel.prefetch(1);channel.bindQueue(queueName,exchange5,,{x-match: all, // any,下方消息匹配上一个就可以; all,下方消息要全部匹配loglevel: info,buslevel: product,syslevel: admin},(err, ok) {console.log(queueName, 队列绑定结果, err, ok);},);channel.consume(queueName,function (msg) {console.log(接收到的消息, msg?.content.toString());/*// 手动确认取消channel.ack(msg); noAck:false,// 自动确认消息// if (msg) {// channel.ack(msg);// } */},{noAck: true, // 是否自动确认消息// noAck: false},(err: any, ok: Replies.Empty) {console.log(err, ok);},);// returnerror事件不会把消息重新放回队列channel.on(return, (msg) {console.error(消息发送失败:, msg);});channel.on(error, (err) {console.error(通道发生错误:, err);});});conn.on(error, function (err1) {if (err1.message ! Connection closing) {console.error([AMQP] conn error, err1.message);}});conn.on(close, function () {console.error([AMQP] reconnecting);});
});