贵阳建设网站公司,wordpress代码修改用户权限,为什么做的网站要续费,企业空间目录 1.介绍2.安装1.RabbitMq2.客户端库 3.AMQP-CPP 简单使用1.介绍2.使用 4.类与接口1.Channel2.ev 5.使用1.publish.cc2.consume.cc3.makefile 1.介绍
RabbitMQ#xff1a;消息队列组件#xff0c;实现两个客户端主机之间消息传输的功能(发布订阅)核心概念#xff1… 目录 1.介绍2.安装1.RabbitMq2.客户端库 3.AMQP-CPP 简单使用1.介绍2.使用 4.类与接口1.Channel2.ev 5.使用1.publish.cc2.consume.cc3.makefile 1.介绍
RabbitMQ消息队列组件实现两个客户端主机之间消息传输的功能(发布订阅)核心概念交换机、队列、绑定、消息交换机类型 广播交换当交换机收到消息则将消息发布到所有绑定的队列中直接交换根据消息中的bkey与绑定的rkey对比一致则放入队列主题交换使用bkey与绑定的rkey进行规则匹配成功则放入队列 2.安装
1.RabbitMq
安装sudo apt install rabbitmq-server简单使用# 安装完成的时候默认有个用户guest但是权限不够要创建一个administrator用户才可以做为远程登录和发表订阅消息#添加用户
sudo rabbitmqctl add_user root PASSWORD#设置用户tag
sudo rabbitmqctl set_user_tags root administrator #设置用户权限
sudo rabbitmqctl set_permissions -p / root . . .* # RabbitMQ自带了web管理界面, 执行下面命令开启, 默认端口15672
sudo rabbitmq-plugins enable rabbitmq_management 2.客户端库
C语言库C库sudo apt install libev-dev #libev 网络库组件
git clone https://github.com/CopernicaMarketingSoftware/AMQP-CPP.git
cd AMQP-CPP/
make
make install如果安装时出现以下报错则表示ssl版本出现问题/usr/include/openssl/macros.h:147:4: error: #error
OPENSSL_API_COMPAT expresses an impossible API compatibility
level 147 | # error OPENSSL_API_COMPAT expresses an impossible API
compatibility level | ^~~~~
In file included from /usr/include/openssl/ssl.h:18, from linux_tcp/openssl.h:20, from linux_tcp/openssl.cpp:12:
/usr/include/openssl/bio.h:687:1: error: expected constructor,
destructor, or type conversion before ‘DEPRECATEDIN_1_1_0’ 687 | DEPRECATEDIN_1_1_0(int BIO_get_port(const char *str,
unsigned short *port_ptr))解决方案卸载当前的ssl库重新进行修复安装dpkg -l | grep ssl
sudo dpkg -P --force-all libevent-openssl-2.1-7
sudo dpkg -P --force-all openssl
sudo dpkg -P --force-all libssl-dev
sudo apt --fix-broken install3.AMQP-CPP 简单使用
1.介绍
AMQP-CPP是用于与RabbitMq消息中间件通信的C库 它能解析从RabbitMq服务发送来的数据也可以生成发向RabbitMq的数据包AMQP-CPP库不会向RabbitMq建立网络连接所有的网络IO由用户完成 AMQP-CPP提供了可选的网络层接口它预定义了TCP模块用户就不用自己实现网络IO 也可以选择libevent、libev、libuv、asio等异步通信组件 需要手动安装对应的组件 AMQP-CPP完全异步没有阻塞式的系统调用不使用线程就能够应用在高性能应用中注意它需要C17的支持 2.使用
AMQP-CPP的使用有两种模式 使用默认的TCP模块进行网络通信使用扩展的libevent、libev、libuv、asio异步通信组件进行通信 此处以libev为例不需要自己实现monitor函数可以直接使用AMQP::LibEvHandler 4.类与接口
1.Channel
channel是一个虚拟连接一个连接上可以建立多个通道 并且所有的RabbitMq指令都是通过channel传输 所以连接建立后的第一步就是建立channel 因为所有操作是异步的所以在channel上执行指令的返回值并不能作为操作执行结果 实际上它返回的是Deferred类可以使用它安装处理函数
namespace AMQP
{ /** * Generic callbacks that are used by many deferred objects */ using SuccessCallback std::functionvoid(); using ErrorCallback std::functionvoid(const char *message); using FinalizeCallback std::functionvoid();/** * Declaring and deleting a queue */ using QueueCallback std::functionvoid(const std::string name, uint32_t messagecount, uint32_t consumercount);using DeleteCallback std::functionvoid(uint32_t deletedmessages); using MessageCallback std::functionvoid(const Message message, uint64_t deliveryTag, bool redelivered); // 当使用发布者确认时当服务器确认消息已被接收和处理时将调用AckCallback using AckCallback std::functionvoid(uint64_t deliveryTag, bool multiple);// 使用确认包裹通道时当消息被ack/nacked时会调用这些回调 using PublishAckCallback std::functionvoid(); using PublishNackCallback std::functionvoid(); using PublishLostCallback std::functionvoid(); // 信道类class Channel { Channel(Connection *connection); bool connected();/** *声明交换机 *如果提供了一个空名称则服务器将分配一个名称。 *以下flags可用于交换机 * *-durable 持久化重启后交换机依然有效 *-autodelete 删除所有连接的队列后自动删除交换 *-passive 仅被动检查交换机是否存在 *-internal 创建内部交换 * *param name 交换机的名称 *param-type 交换类型 enum ExchangeType { fanout, 广播交换绑定的队列都能拿到消息 direct, 直接交换只将消息交给routingkey一致的队列 topic, 主题交换将消息交给符合bindingkey规则的队列 headers, consistent_hash, message_deduplication }; *param flags 交换机标志 *param arguments其他参数 * *此函数返回一个延迟处理程序。可以安装回调 using onSuccess(), onError() and onFinalize() methods. */ Deferred declareExchange(const std::string_view name,ExchangeType type,int flags,const Table arguments);/** *声明队列 *如果不提供名称服务器将分配一个名称。 *flags可以是以下值的组合 * *-durable 持久队列在代理重新启动后仍然有效 *-autodelete 当所有连接的使用者都离开时自动删除队列 *-passive 仅被动检查队列是否存在 *-exclusive 队列仅存在于此连接并且在连接断开时自动删除 * *param name 队列的名称 *param flags 标志组合 *param arguments 可选参数 * *此函数返回一个延迟处理程序。可以安装回调 *使用onSuccess、onError和onFinalize方法。 * Deferred onError(const char *message) * *可以安装的onSuccess回调应该具有以下签名 void myCallback(const std::string name, uint32_t messageCount, uint32_t consumerCount); 例如 channel.declareQueue(myqueue).onSuccess( [](const std::string name, uint32_t messageCount, uint32_t consumerCount) { std::cout Queue name ; std::cout has been declared with ; std::cout messageCount; std::cout messages and ; std::cout consumerCount; std::cout consumers std::endl; * }); */ DeferredQueue declareQueue(const std::string_view name,int flags,const Table arguments);/** *将队列绑定到交换机 * *param exchange 源交换机 *param queue 目标队列 *param routingkey 路由密钥 *param arguments 其他绑定参数 * *此函数返回一个延迟处理程序。可以安装回调 *使用onSuccess、onError和onFinalize方法。 */ Deferred bindQueue(const std::string_view exchange,const std::string_view queue,const std::string_view routingkey,const Table arguments);/** *将消息发布到exchange*您必须提供交换机的名称和路由密钥。 然后RabbitMQ将尝试将消息发送到一个或多个队列。 使用可选的flags参数可以指定如果消息无法路由到队列时应该发生的情况。默认情况下不可更改的消息将被静默地丢弃。 * *如果设置了mandatory或immediate标志 则无法处理的消息将返回到应用程序。 在开始发布之前请确保您已经调用了recall-方法 并设置了所有适当的处理程序来处理这些返回的消息。 * *可以提供以下flags * *-mandatory 如果设置服务器将返回未发送到队列的消息 *-immediate 如果设置服务器将返回无法立即转发给使用者的消息。 *param exchange要发布到的交易所 *param routingkey路由密钥 *param envelope要发送的完整信封 *param message要发送的消息 *param size消息的大小 *param flags可选标志 */ bool publish(const std::string_view exchange,const std::string_view routingKey,const std::string message,int flags 0);/** *告诉RabbitMQ服务器已准备好使用消息-也就是 订阅队列消息 * *调用此方法后RabbitMQ开始向客户端应用程序传递消息。 consumer tag是一个字符串标识符 如果您以后想通过channel:cancel调用停止它 可以使用它来标识使用者。 *如果您没有指定使用者tag服务器将为您分配一个。 * *支持以下flags * *-nolocal 如果设置了则不会同时消耗在此通道上发布的消息 *-noack 如果设置了则不必对已消费的消息进行确认 *-exclusive 请求独占访问只有此使用者可以访问队列 * *param queue 您要使用的队列 *param tag 将与此消费操作关联的消费者标记 *param flags 其他标记 *param arguments其他参数 * *此函数返回一个延迟处理程序。 可以使用onSuccess、onError和onFinalize方法安装回调可以安装的onSuccess回调应该具有以下格式 void myCallbackconst std:string_viewtag 样例 channel.consume(myqueue).onSuccess( [](const std::string_view tag) { std::cout Started consuming under tag std::cout tag std::endl; }); */ DeferredConsumer consume(const std::string_view queue,const std::string_view tag,int flags,const Table arguments);/** *确认接收到的消息 **消费者客户端对收到的消息进行确认应答**当在DeferredConsumer:onReceived()方法中接收到消息时 必须确认该消息 以便RabbitMQ将其从队列中删除除非使用noack选项消费* *支持以下标志 * *-多条确认多条消息之前传递的所有未确认消息也会得到确认 * *param deliveryTag 消息的唯一delivery标签 *param flags 可选标志 *return bool */ bool ack(uint64_t deliveryTag, int flags0);};class DeferredConsumer { /* 注册一个回调函数该函数在消费者启动时被调用void onSuccess(const std::string consumertag) */ DeferredConsumer onSuccess(const ConsumeCallback callback);/* 注册回调函数用于接收到一个完整消息的时候被调用 void MessageCallback(const AMQP::Message message, uint64_t deliveryTag, bool redelivered) */ DeferredConsumer onReceived(const MessageCallback callback);/* Alias for onReceived() */ DeferredConsumer onMessage(const MessageCallback callback);/* 注册要在服务器取消消费者时调用的函数 void CancelCallback(const std::string tag) */ DeferredConsumer onCancelled(const CancelCallback callback);};class Message : public Envelope{ const std::string exchange();const std::string routingkey();};class Envelope : public MetaData{ const char *body(); // 获取消息正文uint64_t bodySize(); // 获取消息正文大小};
}2.ev
typedef struct ev_async
{ EV_WATCHER (ev_async);EV_ATOMIC_T sent; /* private */
}ev_async; //break type
enum
{ EVBREAK_CANCEL 0, /* undo unloop */ EVBREAK_ONE 1, /* unloop once */ EVBREAK_ALL 2 /* unloop all loops */
}; // 实例化并获取IO事件监控接口句柄
struct ev_loop *ev_default_loop (unsigned int flags EV_CPP ( 0));
# define EV_DEFAULT ev_default_loop (0) // 开始运行IO事件监控, 这是一个阻塞接口
int ev_run (struct ev_loop *loop);/* break out of the loop */
// 结束IO监控
// 如果在主线程进行ev_run(), 则可以直接调用,
// 如果在其他线程中进行ev_run(), 需要通过异步通知进行
void ev_break (struct ev_loop *loop, int32_t break_type) ; void (*callback)(struct ev_loop *loop, ev_async *watcher, int32_t revents);// 初始化异步事件结构, 并设置回调函数
void ev_async_init(ev_async *w, callback cb);
// 启动事件监控循环中的异步任务处理
void ev_async_start(struct ev_loop *loop, ev_async *w);
// 发送当前异步事件到异步线程中执行
void ev_async_send(struct ev_loop *loop, ev_async *w);5.使用
1.publish.cc
#include ev.h
#include amqpcpp.h
#include amqpcpp/libev.h
#include openssl/ssl.h
#include openssl/opensslv.hint main()
{// 1.实例化底层网络通信框架的IO事件监控句柄auto *loop EV_DEFAULT;// 2.实例化libEvHandler句柄 - 将AMQP框架与事件监控关联起来AMQP::LibEvHandler handler(loop);// 3.实例化连接对象AMQP::Address address(amqp://root:SnowK8989127.0.0.1:5672/);AMQP::TcpConnection connection(handler, address);// 4.实例化信道对象AMQP::TcpChannel channel(connection);// 5.声明交换机channel.declareExchange(test-exchange, AMQP::ExchangeType::direct).onError([](const char *message){ std::cout 声明交换机失败: message std::endl; }).onSuccess([](){ std::cout test-exchange 交换机创建成功 std::endl; });// 6.声明队列channel.declareQueue(test-queue).onError([](const char *message){ std::cout 声明队列失败: message std::endl; }).onSuccess([](){ std::cout test-queue 队列创建成功 std::endl; });// 7.针对交换机和队列进行绑定channel.bindQueue(test-exchange, test-queue, test-queue-key).onError([](const char *message){ std::cout test-exchange - test-queue 绑定失败: \ message std::endl; }).onSuccess([](){ std::cout test-exchange - test-queue 绑定成功 std::endl; });// 8.向交换机发布消息for (int i 0; i 5; i){std::string msg Hello SnowK- std::to_string(i);if(channel.publish(test-exchange, test-queue-key, msg) false){std::cout publish 失败 std::endl;}}// 9.启动底层网络通信框架 - 开启IOev_run(loop, 0);return 0;
}2.consume.cc
#include ev.h
#include amqpcpp.h
#include amqpcpp/libev.h
#include openssl/ssl.h
#include openssl/opensslv.hvoid MessageCB(AMQP::TcpChannel* channel, const AMQP::Message message, uint64_t deliveryTag, bool redelivered)
{std::string msg;msg.assign(message.body(), message.bodySize());// 不能这样使用, AMQP::Message后面没有存\0// std::cout message std::endl std::cout msg std::endl;channel-ack(deliveryTag);
}int main()
{// 1.实例化底层网络通信框架的IO事件监控句柄auto *loop EV_DEFAULT;// 2.实例化libEvHandler句柄 - 将AMQP框架与事件监控关联起来AMQP::LibEvHandler handler(loop);// 3.实例化连接对象AMQP::Address address(amqp://root:SnowK8989127.0.0.1:5672/);AMQP::TcpConnection connection(handler, address);// 4.实例化信道对象AMQP::TcpChannel channel(connection);// 5.声明交换机channel.declareExchange(test-exchange, AMQP::ExchangeType::direct).onError([](const char *message){ std::cout 声明交换机失败: message std::endl; }).onSuccess([](){ std::cout test-exchange 交换机创建成功 std::endl; });// 6.声明队列channel.declareQueue(test-queue).onError([](const char *message){ std::cout 声明队列失败: message std::endl; }).onSuccess([](){ std::cout test-queue 队列创建成功 std::endl; });// 7.针对交换机和队列进行绑定channel.bindQueue(test-exchange, test-queue, test-queue-key).onError([](const char *message){ std::cout test-exchange - test-queue 绑定失败: \ message std::endl; }).onSuccess([](){ std::cout test-exchange - test-queue 绑定成功; });// 8.订阅消息对垒 - 设置消息处理回调函数auto callback std::bind(MessageCB, channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);channel.consume(test-queue, consume-tag).onReceived(callback).onError([](const char *message){ std::cout 订阅 test-queue 队列消息失败: message std::endl;exit(0); });// 9.启动底层网络通信框架 - 开启IOev_run(loop, 0);return 0;
}3.makefile
all: publish consume
publish: publish.ccg -o $ $^ -lamqpcpp -lev -stdc17
consume: consume.ccg -o $ $^ -lamqpcpp -lev -stdc17.PHONY:clean
clean:rm publish consume