公司logo查询网站,网页源代码翻译器,教育企业重庆网站建设,如何做网络营销推广员文章目录 客⼾端模块实现订阅者模块信道管理模块异步⼯作线程实现连接管理模块生产者客户端消费者客户端 客⼾端模块实现
在RabbitMQ中#xff0c;提供服务的是信道#xff0c;因此在客⼾端的实现中#xff0c;弱化了Client客⼾端的概念#xff0c;也就是说在RabbitMQ中并… 文章目录 客⼾端模块实现订阅者模块信道管理模块异步⼯作线程实现连接管理模块生产者客户端消费者客户端 客⼾端模块实现
在RabbitMQ中提供服务的是信道因此在客⼾端的实现中弱化了Client客⼾端的概念也就是说在RabbitMQ中并不会向⽤⼾展⽰⽹络通信的概念出来⽽是以⼀种提供服务的形式来体现。其实现思想类似于普通的功能接⼝封装⼀个接⼝实现⼀个功能接⼝内部完成向客⼾端请求的过程但是对外并不需要体现出客⼾端与服务端通信的概念⽤⼾需要什么服务就调⽤什么接⼝就⾏。
基于以上的思想客⼾端的实现共分为四⼤模块 基于以上模块实现⼀个客⼾端的流程也就⽐较简单了
实例化异步线程对象实例化连接对象通过连接对象创建信道根据信道获取⾃⼰所需服务关闭信道关闭连接
订阅者模块
与服务端并⽆太⼤差别客⼾端这边虽然订阅者的存在感微弱了很多但是还是有的当进⾏队列消息订阅的时候会伴随着⼀个订阅者对象的创建⽽这个订阅者对象有以下⼏个作⽤
描述当前信道订阅了哪个队列的消息。描述了收到消息后该如何对这条消息进⾏处理描述收到消息后是否需要进⾏确认回复 订阅者信息 订阅者标识订阅队列名是否⾃动确认标志回调处理函数收到消息后该如何处理的回调函数对象 using ConsumerCallback std::functionvoid(const std::string, const BasicProperties *bp, const std::string);struct Consumer {using ptr std::shared_ptrConsumer;std::string tag; //消费者标识std::string qname; //消费者订阅的队列名称bool auto_ack; //自动确认标志ConsumerCallback callback;Consumer(){DLOG(new Consumer: %p, this);}Consumer(const std::string ctag, const std::string queue_name, bool ack_flag, const ConsumerCallback cb):tag(ctag), qname(queue_name), auto_ack(ack_flag), callback(std::move(cb)) {DLOG(new Consumer: %p, this);}~Consumer() {DLOG(del Consumer: %p, this);}};信道管理模块
同样的客⼾端也有信道其功能与服务端⼏乎⼀致或者说不管是客⼾端的channel还是服务端的channel都是为了⽤⼾提供具体服务⽽存在的只不过服务端是为客⼾端的对应请求提供服务⽽客⼾端的接⼝服务是为了⽤⼾具体需要服务也可以理解是⽤⼾通过客⼾端channel的接⼝调⽤来向服务端发送对应请求获取请求的服务。 信道信息 a. 信道IDb. 信道关联的⽹络通信连接对象c. protobuf协议处理对象d. 信道关联的消费者e. 请求对应的响应信息队列这⾥队列使⽤请求ID响应hash表以便于查找指定的响应f. 互斥锁条件变量⼤部分的请求都是阻塞操作发送请求后需要等到响应才能继续但是muduo库的通信是异步的因此需要我们⾃⼰在收到响应后通过判断是否是等待的指定响应来进⾏同步 信道操作 a. 提供创建信道操作b. 提供删除信道操作c. 提供声明交换机操作强断⾔-有则OK没有则创建d. 提供删除交换机e. 提供创建队列操作强断⾔-有则OK没有则创建f. 提供删除队列操作g. 提供交换机-队列绑定操作h. 提供交换机-队列解除绑定操作i . 提供添加订阅操作j. 提供取消订阅操作k. 提供发布消息操作l. 提供确认消息操作 typedef std::shared_ptrgoogle::protobuf::Message MessagePtr;using ProtobufCodecPtr std::shared_ptrProtobufCodec;using basicConsumeResponsePtr std::shared_ptrbasicConsumeResponse;using basicCommonResponsePtr std::shared_ptrbasicCommonResponse;class Channel {public:using ptr std::shared_ptrChannel;Channel(const muduo::net::TcpConnectionPtr conn, const ProtobufCodecPtr codec):_cid(UUIDHelper::uuid()), _conn(conn), _codec(codec) {}~Channel() { basicCancel(); }std::string cid() { return _cid; }bool openChannel() {std::string rid UUIDHelper::uuid();openChannelRequest req; req.set_rid(rid);req.set_cid(_cid);_codec-send(_conn, req);basicCommonResponsePtr resp waitResponse(rid);return resp-ok();}void closeChannel() {std::string rid UUIDHelper::uuid();closeChannelRequest req; req.set_rid(rid);req.set_cid(_cid);_codec-send(_conn, req);waitResponse(rid);return ;}bool declareExchange(const std::string name,ExchangeType type, bool durable, bool auto_delete,google::protobuf::Mapstd::string, std::string args) {//构造一个声明虚拟机的请求对象std::string rid UUIDHelper::uuid();declareExchangeRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_exchange_name(name);req.set_exchange_type(type);req.set_durable(durable);req.set_auto_delete(auto_delete);req.mutable_args()-swap(args);//然后向服务器发送请求_codec-send(_conn, req);//等待服务器的响应basicCommonResponsePtr resp waitResponse(rid);//返回return resp-ok();}void deleteExchange(const std::string name) {std::string rid UUIDHelper::uuid();deleteExchangeRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_exchange_name(name);_codec-send(_conn, req);waitResponse(rid);return ;}bool declareQueue(const std::string qname, bool qdurable, bool qexclusive,bool qauto_delete,google::protobuf::Mapstd::string, std::string qargs) {std::string rid UUIDHelper::uuid();declareQueueRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_queue_name(qname);req.set_durable(qdurable);req.set_auto_delete(qauto_delete);req.set_exclusive(qexclusive);req.mutable_args()-swap(qargs);_codec-send(_conn, req);basicCommonResponsePtr resp waitResponse(rid);return resp-ok();}void deleteQueue(const std::string qname) {std::string rid UUIDHelper::uuid();deleteQueueRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_queue_name(qname);_codec-send(_conn, req);waitResponse(rid);return ;}bool queueBind(const std::string ename, const std::string qname, const std::string key) {std::string rid UUIDHelper::uuid();queueBindRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_exchange_name(ename);req.set_queue_name(qname);req.set_binding_key(key);_codec-send(_conn, req);basicCommonResponsePtr resp waitResponse(rid);return resp-ok();}void queueUnBind(const std::string ename, const std::string qname) {std::string rid UUIDHelper::uuid();queueUnBindRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_exchange_name(ename);req.set_queue_name(qname);_codec-send(_conn, req);waitResponse(rid);return ;}void basicPublish(const std::string ename,const BasicProperties *bp,const std::string body) {std::string rid UUIDHelper::uuid();basicPublishRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_body(body);req.set_exchange_name(ename);if (bp ! nullptr) {req.mutable_properties()-set_id(bp-id());req.mutable_properties()-set_delivery_mode(bp-delivery_mode());req.mutable_properties()-set_routing_key(bp-routing_key());}_codec-send(_conn, req);waitResponse(rid);return ;}void basicAck(const std::string msgid) {if (_consumer.get() nullptr) {DLOG(消息确认时找不到消费者信息);return ;}std::string rid UUIDHelper::uuid();basicAckRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_queue_name(_consumer-qname);req.set_message_id(msgid);_codec-send(_conn, req);waitResponse(rid);return;}void basicCancel() {if (_consumer.get() nullptr) {return ;}std::string rid UUIDHelper::uuid();basicCancelRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_queue_name(_consumer-qname);req.set_consumer_tag(_consumer-tag);_codec-send(_conn, req);waitResponse(rid);_consumer.reset();return;}bool basicConsume(const std::string consumer_tag,const std::string queue_name,bool auto_ack,const ConsumerCallback cb) {if (_consumer.get() ! nullptr) {DLOG(当前信道已订阅其他队列消息);return false;}std::string rid UUIDHelper::uuid();basicConsumeRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_queue_name(queue_name);req.set_consumer_tag(consumer_tag);req.set_auto_ack(auto_ack);_codec-send(_conn, req);basicCommonResponsePtr resp waitResponse(rid);if (resp-ok() false) {DLOG(添加订阅失败);return false;}_consumer std::make_sharedConsumer(consumer_tag, queue_name, auto_ack, cb);return true;}public: //连接收到基础响应后向hash_map中添加响应void putBasicResponse(const basicCommonResponsePtr resp) {std::unique_lockstd::mutex lock(_mutex);_basic_resp.insert(std::make_pair(resp-rid(), resp));_cv.notify_all();}//连接收到消息推送后需要通过信道找到对应的消费者对象通过回调函数进行消息处理void consume(const basicConsumeResponsePtr resp) {if (_consumer.get() nullptr) {DLOG(消息处理时未找到订阅者信息);return;}if (_consumer-tag ! resp-consumer_tag()) {DLOG(收到的推送消息中的消费者标识与当前信道消费者标识不一致);return ;}_consumer-callback(resp-consumer_tag(), resp-mutable_properties(), resp-body());}private:basicCommonResponsePtr waitResponse(const std::string rid) {std::unique_lockstd::mutex lock(_mutex);_cv.wait(lock, [rid, this](){return _basic_resp.find(rid) ! _basic_resp.end();});//while(condition()) _cv.wait();basicCommonResponsePtr basic_resp _basic_resp[rid];_basic_resp.erase(rid);return basic_resp;}private:std::string _cid;muduo::net::TcpConnectionPtr _conn;ProtobufCodecPtr _codec;Consumer::ptr _consumer;std::mutex _mutex;std::condition_variable _cv;std::unordered_mapstd::string, basicCommonResponsePtr _basic_resp;};信道管理 a. 创建信道b. 查询信道c. 删除信道 class ChannelManager {public:using ptr std::shared_ptrChannelManager;ChannelManager(){}Channel::ptr create(const muduo::net::TcpConnectionPtr conn, const ProtobufCodecPtr codec) {std::unique_lockstd::mutex lock(_mutex);auto channel std::make_sharedChannel(conn, codec);_channels.insert(std::make_pair(channel-cid(), channel));return channel;}void remove(const std::string cid) {std::unique_lockstd::mutex lock(_mutex);_channels.erase(cid);}Channel::ptr get(const std::string cid) {std::unique_lockstd::mutex lock(_mutex);auto it _channels.find(cid);if (it _channels.end()) {return Channel::ptr();}return it-second;}private:std::mutex _mutex;std::unordered_mapstd::string, Channel::ptr _channels;};异步⼯作线程实现
客⼾端这边存在两个异步⼯作线程
⼀个是muduo库中客⼾端连接的异步循环线程EventLoopThread⼀个是当收到消息后进⾏异步处理的⼯作线程池。
这两项都不是以连接为单元进⾏创建的⽽是创建后可以⽤以多个连接中因此单独进⾏封装。
class AsyncWorker {public:using ptr std::shared_ptrAsyncWorker;muduo::net::EventLoopThread loopthread;threadpool pool;};连接管理模块
在客⼾端这边RabbitMQ弱化了客⼾端的概念因为⽤⼾所需的服务都是通过信道来提供的因此操作思想转换为先创建连接通过连接创建信道通过信道提供服务这⼀流程。
这个模块同样是针对muduo库客⼾端连接的⼆次封装向⽤⼾提供创建channel信道的接⼝创建信道后可以通过信道来获取指定服务。 class Connection{public:using ptr std::shared_ptrConnection;Connection(const std::string sip, int sport, const AsyncWorker::ptr worker): _latch(1), _client(worker-loopthread.startLoop(), muduo::net::InetAddress(sip, sport), Client),_dispatcher(std::bind(Connection::onUnknownMessage, this, std::placeholders::_1,std::placeholders::_2, std::placeholders::_3)),_codec(std::make_sharedProtobufCodec(std::bind(ProtobufDispatcher::onProtobufMessage, _dispatcher,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))),_worker(worker),_channel_manager(std::make_sharedChannelManager()){_dispatcher.registerMessageCallbackbasicCommonResponse(std::bind(Connection::basicResponse, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallbackbasicConsumeResponse(std::bind(Connection::consumeResponse, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_client.setMessageCallback(std::bind(ProtobufCodec::onMessage, _codec.get(),std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_client.setConnectionCallback(std::bind(Connection::onConnection, this, std::placeholders::_1));_client.connect();_latch.wait(); // 阻塞等待直到连接建立成功}Channel::ptr openChannel(){Channel::ptr channel _channel_manager-create(_conn, _codec);bool ret channel-openChannel();if (ret false){DLOG(打开信道失败);return Channel::ptr();}return channel;}void closeChannel(const Channel::ptr channel){channel-closeChannel();_channel_manager-remove(channel-cid());}private:void basicResponse(const muduo::net::TcpConnectionPtr conn, const basicCommonResponsePtr message, muduo::Timestamp){// 1. 找到信道Channel::ptr channel _channel_manager-get(message-cid());if (channel.get() nullptr){DLOG(未找到信道信息);return;}// 2. 将得到的响应对象添加到信道的基础响应hash_map中channel-putBasicResponse(message);}void consumeResponse(const muduo::net::TcpConnectionPtr conn, const basicConsumeResponsePtr message, muduo::Timestamp){// 1. 找到信道Channel::ptr channel _channel_manager-get(message-cid());if (channel.get() nullptr){DLOG(未找到信道信息);return;}// 2. 封装异步任务消息处理任务抛入线程池_worker-pool.push([channel, message](){ channel-consume(message); });}void onUnknownMessage(const muduo::net::TcpConnectionPtr conn, const MessagePtr message, muduo::Timestamp){LOG_INFO onUnknownMessage: message-GetTypeName();conn-shutdown();}void onConnection(const muduo::net::TcpConnectionPtr conn){if (conn-connected()){_latch.countDown(); // 唤醒主线程中的阻塞_conn conn;}else{// 连接关闭时的操作_conn.reset();}}private:muduo::CountDownLatch _latch; // 实现同步的muduo::net::TcpConnectionPtr _conn; // 客户端对应的连接muduo::net::TcpClient _client; // 客户端ProtobufDispatcher _dispatcher; // 请求分发器ProtobufCodecPtr _codec; // 协议处理器AsyncWorker::ptr _worker;ChannelManager::ptr _channel_manager;};生产者客户端 publish_client.cc 实例化异步工作线程对象实例化连接对象通过连接创建信道通过信道提供的服务完成所需循环向交换机发布消息关闭信道
#include connection.hppint main()
{//1. 实例化异步工作线程对象nzq::AsyncWorker::ptr awp std::make_sharednzq::AsyncWorker();//2. 实例化连接对象nzq::Connection::ptr conn std::make_sharednzq::Connection(127.0.0.1,8085,awp);//3. 通过连接创建信道nzq::Channel::ptr channel conn-openChannel();//4. 通过信道提供的服务完成所需// 1. 声明一个交换机exchange1, 交换机类型为广播模式google::protobuf::Mapstd::string,std::string tmp_map;channel-declareExchange(exchange1, nzq::ExchangeType::TOPIC, true, false, tmp_map);// 2. 声明一个队列queue1channel-declareQueue(queue1, true, false, false, tmp_map);// 3. 声明一个队列queue2channel-declareQueue(queue2, true, false, false, tmp_map);// 4. 绑定queue1-exchange1且binding_key设置为queue1channel-queueBind(exchange1, queue1, queue1);// 5. 绑定queue2-exchange1且binding_key设置为news.music.#channel-queueBind(exchange1, queue2, news.music.#);//5. 循环向交换机发布消息for (int i 0; i 10; i) {nzq::BasicProperties bp;bp.set_id(nzq::UUIDHelper::uuid());bp.set_delivery_mode(nzq::DeliveryMode::DURABLE);bp.set_routing_key(news.music.pop);channel-basicPublish(exchange1, bp, Hello World- std::to_string(i));}nzq::BasicProperties bp;bp.set_id(nzq::UUIDHelper::uuid());bp.set_delivery_mode(nzq::DeliveryMode::DURABLE);bp.set_routing_key(news.music.sport);channel-basicPublish(exchange1, bp, Hello linux);bp.set_routing_key(news.sport);channel-basicPublish(exchange1, bp, Hello chileme?);//6. 关闭信道conn-closeChannel(channel);return 0;return 0;
}消费者客户端 同生产者客户端如何消费需要自己设置下面只是其中一种 void cb(nzq::Channel::ptr channel, const std::string consumer_tag, const nzq::BasicProperties *bp, const std::string body)
{std::cout consumer_tag 消费了消息 body std::endl;channel-basicAck(bp-id());
}
int main(int argc, char *argv[])
{if (argc ! 2) {std::cout usage: ./consume_client queue1\n;return -1;}//1. 实例化异步工作线程对象nzq::AsyncWorker::ptr awp std::make_sharednzq::AsyncWorker();//2. 实例化连接对象nzq::Connection::ptr conn std::make_sharednzq::Connection(127.0.0.1, 8085, awp);//3. 通过连接创建信道nzq::Channel::ptr channel conn-openChannel();//4. 通过信道提供的服务完成所需// 1. 声明一个交换机exchange1, 交换机类型为广播模式google::protobuf::Mapstd::string, std::string tmp_map;channel-declareExchange(exchange1, nzq::ExchangeType::TOPIC, true, false, tmp_map);// 2. 声明一个队列queue1channel-declareQueue(queue1, true, false, false, tmp_map);// 3. 声明一个队列queue2channel-declareQueue(queue2, true, false, false, tmp_map);// 4. 绑定queue1-exchange1且binding_key设置为queue1channel-queueBind(exchange1, queue1, queue1);// 5. 绑定queue2-exchange1且binding_key设置为news.music.#channel-queueBind(exchange1, queue2, news.music.#);auto functor std::bind(cb, channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);channel-basicConsume(consumer1, argv[1], false, functor);while(1) std::this_thread::sleep_for(std::chrono::seconds(3));conn-closeChannel(channel);return 0;
}