什么是网站优化,wordpress php 5.2.17,住建局人员名单,如何评价一个网页的设计ConcurrentQueue开源库介绍
ConcurrentQueue是一个高性能的、线程安全的并发队列库。它旨在提供高效、无锁的数据结构#xff0c;适用于多线程环境中的数据交换。concurrentqueue 支持多个生产者和多个消费者#xff0c;并且提供了多种配置选项来优化性能和内存使用。
Conc…ConcurrentQueue开源库介绍
ConcurrentQueue是一个高性能的、线程安全的并发队列库。它旨在提供高效、无锁的数据结构适用于多线程环境中的数据交换。concurrentqueue 支持多个生产者和多个消费者并且提供了多种配置选项来优化性能和内存使用。
ConcurrentQueue使用
0x01 使用场景说明
我的数据平台在接收到四种不同的业务数据时需要按数据分类写进RocketMQ。
0x02 自定义类用来存放和区分数据流
设计BusinessFlowMsg
该类有定义消息的类型该类中设计ST_BusinessSign结构体消息头用来区分消息和获取消息体的长度BusinessFlowMsg类可以存放的数据长度为512KB
#ifndef BUSINESSFLOWMSG_HPP
#define BUSINESSFLOWMSG_HPP#include string.h#define MSG_ROCKETMQ_PNG 0x01
#define MSG_ROCKETMQ_AIS 0x02
#define MSG_ROCKETMQ_ROUTE 0x03
#define MSG_ROCKETMQ_VOYAGE 0x04#pragma pack(push)
#pragma pack(1)// 消息头
typedef struct s_BusinessSign
{int sign; // 业务标识unsigned int length; // 消息体的长度
}ST_BusinessSign;#pragma pack(pop)class BusinessFlowMsg
{
public:BusinessFlowMsg() default;~BusinessFlowMsg() default;char* get_data(){return _data;}int data_size(){return businessSign.length;}char* get_body(){return _data sizeof(ST_BusinessSign);}int body_size(){return data_size() - sizeof(ST_BusinessSign);}ST_BusinessSign* header(){return businessSign;}bool set_data(const char* data, int length, int sign){if(length (max_body_len sizeof(ST_BusinessSign))){return false;}businessSign.sign sign;businessSign.length length;memcpy(_data sizeof(ST_BusinessSign), data, length);return true;}private:enum{max_body_len 512 * 1024 // 512KB};ST_BusinessSign businessSign;char _data[max_body_len];
};#endif // BUSINESSFLOWMSG_HPP0x03 创建PngUnit类模拟接到不同的业务数据
PngUnit类型创建了四个线程来模拟不同的数据流。PngUnit类多线程中并未使用互斥锁因为ConcurrentQueue是一个线程安全的并发队列库事实证明确实如此。
#ifndef PNGUNIT_H
#define PNGUNIT_H#include thread
#include mutexclass PngUnit
{
public:PngUnit();~PngUnit() default;void start();void sendPNG(int sign);void sendAIS(int sign);void sendRoute(int sign);void sendVoyage(int sign);private:std::thread m_th_png;std::thread m_th_ais;std::thread m_th_route;std::thread m_th_voyage;// std::mutex queue_mutex; // 互斥锁
};#endif // PNGUNIT_H#include pngunit.h
#include unistd.h
#include rocketmqutils.h
#include BusinessFlowMsg.hpp
#include json11/json11.hppPngUnit::PngUnit()
{}void PngUnit::start()
{// m_th std::thread([this](){// sendPNG(100);// });if(m_th_png.joinable()){printf([%s:%d] %s\n, __FILE__, __LINE__, m_th_png is running);return;}m_th_png std::thread(std::bind(PngUnit::sendPNG, this, MSG_ROCKETMQ_PNG));m_th_ais std::thread(std::bind(PngUnit::sendAIS, this, MSG_ROCKETMQ_AIS));m_th_route std::thread(std::bind(PngUnit::sendRoute, this, MSG_ROCKETMQ_ROUTE));m_th_voyage std::thread(std::bind(PngUnit::sendVoyage, this, MSG_ROCKETMQ_VOYAGE));
}void PngUnit::sendPNG(int sign)
{while (true){BusinessFlowMsg pngMsg;const char* pngFile 1234567890ABCDEF;int fileLen strlen(pngFile) 1;pngMsg.set_data(pngFile, fileLen, sign);{// std::lock_guardstd::mutex lock(queue_mutex);if(!RocketMQUtils::Instance()-g_businessQueue.enqueue(pngMsg)){printf(Failed to set PNG message data);}}sleep(1);}
}void PngUnit::sendAIS(int sign)
{json11::Json::object obj {{message, AIS},{response, success}};std::string jsonStr json11::Json(obj).dump();while (true){BusinessFlowMsg aisMsg;int jsonStrLen jsonStr.size();aisMsg.set_data(jsonStr.c_str(), jsonStrLen, sign);{// std::lock_guardstd::mutex lock(queue_mutex);if(!RocketMQUtils::Instance()-g_businessQueue.enqueue(aisMsg)){printf(Failed to set AIS message data);}}sleep(2);}
}void PngUnit::sendRoute(int sign)
{json11::Json::object obj {{message, Route},{response, success}};std::string jsonStr json11::Json(obj).dump();while (true){BusinessFlowMsg routeMsg;int jsonStrLen jsonStr.size();routeMsg.set_data(jsonStr.c_str(), jsonStrLen, sign);{// std::lock_guardstd::mutex lock(queue_mutex);if(!RocketMQUtils::Instance()-g_businessQueue.enqueue(routeMsg)){printf(Failed to set ROUTE message data);}}sleep(3);}
}void PngUnit::sendVoyage(int sign)
{json11::Json::object obj {{message, Voyage},{response, success}};std::string jsonStr json11::Json(obj).dump();while (true){BusinessFlowMsg voyageMsg;int jsonStrLen jsonStr.size();voyageMsg.set_data(jsonStr.c_str(), jsonStrLen, sign);{// std::lock_guardstd::mutex lock(queue_mutex);if(!RocketMQUtils::Instance()-g_businessQueue.enqueue(voyageMsg)){printf(Failed to set VOYAGE message data);}}sleep(4);}
}0x04 创建RocketMQUtils类在ConcurrentQueue队列中获取数据写进RocketMQ
RocketMQUtils类是一个单例类
#ifndef ROCKETMQUTILS_H
#define ROCKETMQUTILS_H#include thread
#include concurrentqueue/moodycamel/concurrentqueue.h
#include BusinessFlowMsg.hppclass RocketMQUtils
{
public:static RocketMQUtils* Instance();private:RocketMQUtils();~RocketMQUtils()default;RocketMQUtils(const RocketMQUtils ) delete;RocketMQUtils operator(const RocketMQUtils ) delete;RocketMQUtils(RocketMQUtils ) delete;RocketMQUtils operator(RocketMQUtils ) delete;public:void start();void push();void poll();bool write(char *data, int len, int sign);public:moodycamel::ConcurrentQueueBusinessFlowMsg g_businessQueue;private:static RocketMQUtils* _instance;std::thread _pushThread;
};#endif // ROCKETMQUTILS_H#include rocketmqutils.h
#include unistd.hRocketMQUtils * RocketMQUtils::_instance nullptr;RocketMQUtils *RocketMQUtils::Instance()
{if(_instance nullptr){_instance new RocketMQUtils();}return _instance;
}RocketMQUtils::RocketMQUtils()
{}void RocketMQUtils::start()
{if(_pushThread.joinable()){return;}_pushThread std::thread(RocketMQUtils::push, this);
}void RocketMQUtils::push()
{while (true){BusinessFlowMsg busiMsg;if(g_businessQueue.try_dequeue(busiMsg)){write(busiMsg.get_body(), busiMsg.header()-length, busiMsg.header()-sign);}else{printf([%s:%d] %s\n, __FILE__, __LINE__, g_businessQueue is empty);sleep(2);}}
}void RocketMQUtils::poll()
{}bool RocketMQUtils::write(char *data, int len, int sign)
{std::string msg(data, len);if (sign MSG_ROCKETMQ_PNG){printf([%s:%d] [%d] %s\n, __FILE__, __LINE__, sign, msg.c_str());}else if (sign MSG_ROCKETMQ_AIS){printf([%s:%d] [%d] %s\n, __FILE__, __LINE__, sign, msg.c_str());}else if (sign MSG_ROCKETMQ_ROUTE){printf([%s:%d] [%d] %s\n, __FILE__, __LINE__, sign, msg.c_str());}else if (sign MSG_ROCKETMQ_VOYAGE){printf([%s:%d] [%d] %s\n, __FILE__, __LINE__, sign, msg.c_str());}else{printf([%s:%d] data sign error\n, __FILE__, __LINE__);}
}0x05 使用演示
#include iostream
#include memory
#include rocketmqutils.h
#include pngunit.husing namespace std;int main()
{cout Start endl;RocketMQUtils* rocketmq RocketMQUtils::Instance();rocketmq-start();std::shared_ptrPngUnit ptrPngUnit std::make_sharedPngUnit();ptrPngUnit-start();getchar();cout Over endl;return 0;
}