广东佛山建网站,竞价开户推广,wordpress转成中文版,html网站开发心得体会只要摆脱锁#xff0c;实现支持安全并发访问的数据结构#xff0c;就有可能解决大粒度锁影响并发程度以及错误的加锁方式导致死锁的问题。这种数据结构称为无锁数据结构。
在了解本文时#xff0c;务必读懂内存次序章节。
在设计无锁数据结构时#xff0c;需要极为小心谨…只要摆脱锁实现支持安全并发访问的数据结构就有可能解决大粒度锁影响并发程度以及错误的加锁方式导致死锁的问题。这种数据结构称为无锁数据结构。
在了解本文时务必读懂内存次序章节。
在设计无锁数据结构时需要极为小心谨慎因为它们的正确实现相当不容易。导致代码出错的情形难以复现。 1 定义和推论
算法和数据结构中只要采用了互斥条件变量或future进行同步操作就称之为阻塞型算法和阻塞型数据结构。
如果应用程序调用某些库函数发起调用的线程便会暂停运行即在函数的调用点阻塞等到另一线程完成某项相关操作阻塞才会解除前者才会继续运行。这种库函数的调用被命名为阻塞型调用。 1.1 非阻塞型数据结构
在实践中我们需要参考下列定义根据适用的条款分辨该型别/函数属于哪一类
1 无阻碍假定其他线程全部暂停则目标线程将在有限步骤内完成自己的操作。
2 无锁如果多个线程共同操作一份数据那么在有限步骤内其中某一线程能够完成自己的操作。
3 免等在某份数据上每个线程经过有限步骤就能完成自己的操作即使该份数据同时被其他多个线程所操作。
绝大多数时候无障碍算法并不切实有用因为其他线程全部暂停这对于一个项目来说是一个非常匪夷所思的场景。
1.2 无锁数据结构
免等和无锁数据结构能够避免线程受饿问题也就是说两个并发执行的线程其中一个按部就班的执行操作另一个总是在错误的时机开始执行操作导致被迫中止反复开始试图完成操作。 1.3 免等的数据结构
是具备额外功能的无锁数据结构如果它被多个线程访问不论其他线程上发生了什么每个线程都能在有限的步骤内完成自己的操作。若多个线程之间存在冲突导致某算法无限制地反复尝试执行操作那它就是免等算法比如使用while循环进行的一些操作在里面执行比较-交换操作。 1.4 无锁数据结构的优点和缺点
1.4.1 优点
本质上使用无锁数据结构的首要原因是最大限度地实现并发。
基于锁的实现往往导致线程需要阻塞在无锁数据结构上总是存在某个线程能执行下一步操作。免等数据结构则完全无需等待但是难以实现很容易写成自旋锁。
还有一点是代码健壮性。
假设数据结构的写操作受锁保护如果线程在持锁期间终止那么该数据结构仅完成了部分改动且此后无从修补。但是若某线程操作无锁数据结构时意外终结则丢失的数据仅限于它持有的部分其他数据依然完好能被别的线程进行处理不会阻塞等待锁 1.4.2 缺点。需要注意的地方
1.4.2.1 不变量相关
力求保持不变量成立或选取别的可以一直成立的不变量作为替代。 1.4.2.2 留心内存次序约束 1.4.2.3 数据修改使用原子操作 1.4.2.4 就其他线程所见各项修改步骤次序正确 1.4.2.5 避免活锁
假设两个线程同时更改同一份数据结构若它们所做的改动都导致对方从头开始操作那双方就会反复循环不断重试这种现象称为活锁。它的出现与否完全取决于现成的调度次序故往往只会短暂存在。因此它们仅降低程序性能不会导致严重问题。也因此可能会导致提高了操作同一个数据结构的并发程度缩短了单个线程因等待消耗的时间却降低了整体的性能。 1.4.2.6 缓存乒乓
并且如果多个线程访问相同的原子变量硬件必须在线程之间同步数据这还会造成缓存乒乓现象导致严重的性能损耗。 2 无锁数据结构范例 无锁数据结构依赖原子操作和内存次序约束作用是令其他线程按正确的内存次序见到数据操作的过程默认内存次序std::memory_order_seq_cst最易于分析和推理全部该次序的操作形成确定且唯一的总序列 2.1 实现线程安全的无锁栈
需要保证
1 一旦某线程将一项数据加入栈容器就能立即安全地被另一线程取出
2 只有唯一一个线程能获取该项数据 #include atomic
#include memorytemplatetypename T
class lock_free_stack {
private:struct node {std::shared_ptrT data;node* next;node(T const data_) : data(std::make_sharedT(data_)) {}};std::atomicnode* head;
public:void push(T const data) {node* const new_node new node(data);new_node-next head.load();while (!head.compare_exchange_weak(new_node-next, new_node));}std::shared_ptrT pop() {node* old_head head.load();while(old_head !head.compare_exchange_weak(old_head, old_head-next));return old_head ? old_head-data : std::shared_ptrT();}
};
比较交换操作如果head指针与第一个参数new_node-next所存储值相同将head改为指向第二个参数new_nodecompare_exchange_weak返回true。如果head指针与第一个参数new_node-next所存储值不同表示head指针被其他线程修改过第一个参数new_node-next就被更新成head指针的当前值并且compare_exchange_weak返回false让循环继续。 上述代码虽然是无锁实现但是却是非免等的如果compare_exchange_weak总是false理论上push和pop中的while循环要持续进行。 2.2 制止内存泄漏在无锁数据结构中管理内存
本质问题是若要删除某节点必须先行确认其他线程并未持有指向该节点的指针。
对于上述实现若有多个线程同时调用pop需要采取措施判断何时删除节点。
可以维护一个等待删除链表。
#include atomic
#include memorytemplatetypename T
class lock_free_stack {
private:struct node {std::shared_ptrT data;node* next;node(T const data_) : data(std::make_sharedT(data_)) {}};std::atomicnode* head;std::atomicunsigned threads_in_pop;std::atomicnode * to_be_deleted;static void delete_nodes(node* nodes) {while (nodes) {node* next nodes-next;delete nodes;nodes next;}}void try_reclaim(node* old_head) {if (threads_in_pop 1) {node* nodes_to_delete to_be_deleted.exchange(nullptr);if (!--threads_in_pop) {delete_nodes(nodes_to_delete);} else if (nodes_to_delete) {chain_pending_nodes(nodes_to_delete);}delete old_head;} else {chain_pending_node(old_head);--threads_in_pop;}}void chain_pending_nodes(node* nodes) {node* last nodes;while (node* const next last-next) {last next;}chain_pending_nodes(nodes, last);}void chain_pending_nodes(node* first, node* last) {last-next to_be_deleted;while (!to_be_deleted.compare_exchange_weak(last-next, first));}void chain_pending_node(node* n) {chain_pending_nodes(n, n);}
public:void push(T const data) {node* const new_node new node(data);new_node-next head.load();while (!head.compare_exchange_weak(new_node-next, new_node));}std::shared_ptrT pop() {threads_in_pop;node* old_head head.load();while(old_head !head.compare_exchange_weak(old_head, old_head-next));std::shared_ptrT res;if (old_head) {res.swap(old_head-data);}try_reclaim(old_head);return res;}
}; 2.3 运用风险指针检测无法回收的节点
术语“风险指针”是一种技法得名缘由是若某节点仍被其他线程指涉而我们依然删除它此举便成了“冒险”动作。删除目标节点后别的线程还持有指向它的引用还通过这一引用对其进行访问便会导致程序产生未定义行为。
上述机制产生的基本思想假设当前线程要访问某对象而它却即将被别的线程删除那就让当前线程设置指涉目标对象的风险指针以通知其他线程删除该对象将产生实质风险。若程序不再需要该对象风险指针被清零。
#include atomic
#include thread
unsigned const max_hazard_pointers100;
struct hazard_pointer {std::atomicstd::thread::id id;std::atomicvoid* pointer;
};hazard_pointer hazard_pointers[max_hazard_pointers];class hp_owner {hazard_pointer* hp;public:hp_owner(hp_owner const)delete;hp_owner operator(hp_owner const)delete;hp_owner(): hp(nullptr) {for (unsigned i 0; i max_hazard_pointers; i) {std::thread::id old_id;if (hazard_pointers[i].id.compare_exchange_strong(old_id, std::this_thread::get_id())) {hp hazard_pointers[i];break;}}if (!hp) {throw std::runtime_error(No hazard);}}std::atomicvoid* get_pointer() {return hp-pointer;}~hp_owner() {hp-pointer.store(nullptr);hp-id.store(std::thread::id());}
};std::atomicvoid* get_hazard_pointer_for_current_thread() {thread_local static hp_owner hazard;return hazard.get_pointer();
} 2.4 借引用计数检测正在使用中的节点 #include atomic
#include memorytemplatetypename T
class lock_free_stack {
private:struct node {std::shared_ptrT data;node* next;node(T const data_) : data(std::make_sharedT(data_)) {}};std::shared_ptrnode head;
public:void push(T const data) {std::shared_ptrnode const new_node std::make_sharednode(data);new_node-next std::atomic_load(head);while (!std::atomic_compare_exchange_weak(head, new_node-next, new_node));}std::shared_ptrT pop() {std::shared_ptrnode old_head std::atomic_load(head);while (old_head std::atomic_compare_exchange_weak(head, old_head, std::atomic_load(old_head-next)));if (old_head) {std::atomic_store(old_head-next, std::shared_ptrnode());return old_head-next;}return std::shared_ptrT();}~lock_free_stack() {while (pop());}
};
引用计数针对各个节点分别维护一个计数器随时了解访问它的线程数目。
std::shared_ptr的引用计数在这里无法借鉴因为他的原子特性不一定通过无锁机制实现若强行按照无锁方式实现该指针类的原子操作很可能造成额外开销。 2.4.1 std::experimental::atomic_shared_ptrT
std::shared_ptr无法结合std::atomic使用原因是std::shared_ptrT并不具备平实拷贝语义。但是std::experimental::atomic_shared_ptrT支持因此可以正确的处理引用计数同时令操作原子化。
#include atomic
#include memorytemplatetypename T
class lock_free_stack {
private:struct node {std::shared_ptrT data;std::experimental::atomic_shared_ptrnode next;node(T const data_) : data(std::make_sharedT(data_)) {}};std::experimental::atomic_shared_ptrnode head;
public:void push(T const data) {std::shared_ptrnode const new_node std::make_sharednode(data);new_node-next std::atomic_load(head);while (!std::atomic_compare_exchange_weak(head, new_node-next, new_node));}std::shared_ptrT pop() {std::shared_ptrnode old_head std::atomic_load(head);while (old_head std::atomic_compare_exchange_weak(head, old_head, std::atomic_load(old_head-next)));if (old_head) {std::atomic_store(old_head-next, std::shared_ptrnode());return old_head-data;}return std::shared_ptrT();}~lock_free_stack() {while (pop());}
};
2.4.2 内、外部计数器进行引用计数
一种经典的实现是使用两个计数器内、外部计数器各一。两个计数器之和即为节点的总引用数目。
外部计数器与节点的指针组成结构体每当指针被读取外部计数器自增。
内部计数器位于节点之中随着节点读取完成自减。
#include atomic
#include memorytemplatetypename T
class lock_free_stack {
private:struct node;struct counted_node_ptr {int external_count;node* ptr;};struct node {std::shared_ptrT data;std::atomicint internal_count;counted_node_ptr next;node(T const data_) : data(std::make_sharedT(data_)), internal_count(0) {}};std::atomiccounted_node_ptr head;void increase_head_count(counted_node_ptr old_counter) {counted_node_ptr new_counter;do {new_counter old_counter;new_counter.external_count;} while (!head.compare_exchange_strong(old_counter, new_counter));old_counter.external_count new_counter.external_count;}
public:std::shared_ptrT pop() {counted_node_ptr old_head head.load();for (;;) {increase_head_count(old_head);node* const ptr old_head.ptr;if (!ptr) {return std::shared_ptrT();}if (head.compare_exchange_strong(old_head, ptr-next)) {std::shared_ptrT res;res.swap(ptr-data);int const count_increase old_head.external_count - 2;if (ptr-internal_count.fetch_add(count_increase) -count_increase) {delete ptr;}return res;} else if (ptr-internal_count.fetch_sub(1) 1) {delete ptr;}}}void push(T const data) {counted_node_ptr new_node;new_node.ptr new node(data);new_node.external_count 1; // head指针本身算作一个外部引用new_node.ptr-next head.load();while (!head.compare_exchange_weak(new_node.ptr-next, new_node));}~lock_free_stack() {while (pop());}
};
结构体counted_node_ptr的尺寸足够小如果硬件平台支持双字 比较-交换 操作那么std::atomiccounted_node_ptr就属于无锁数据。若不支持那么std::atomic涉及的结构体的尺寸过大无法直接通过原子指令操作便会采用互斥来保证操作原子化。使得“无锁”数据结构和算法成为基于锁的实现。
如果想要缩小结构体counted_node_ptr的尺寸可以采取另一种方法替代假定在硬件平台上指针型别有空余的位。例如硬件寻址空间只有48位指针型别的大小是64位他们可以用来放置计数器借此将counted_node_ptr结构体缩成单个机器字长。
使用分离引用计数的原因我们通过外部引用计数的自增来保证在访问目标节点的过程中其指针依然安全有效。先自增再读取被指涉后自增的值保护了节点不被删除
具体流程详解见《cpp 并发实战》p236
以上实例使用的内存次序是std::memory_order_seq_cst。同步开销较大下面对于内存次序进行优化。
2.5 为无锁栈容器施加内存模型
需要先确认各项操作哪些存在内存次序关系。
1 next指针只是一个未被原子化的普通对象所以为了安全读取其值存储操作必须再载入操作发生之前前者由压入数据的线程执行后者由弹出数据的线程执行。 #include atomic
#include memorytemplatetypename T
class lock_free_stack {
private:struct node;struct counted_node_ptr {int external_count;node* ptr;};struct node {std::shared_ptrT data;std::atomicint internal_count;counted_node_ptr next;node(T const data_) : data(std::make_sharedT(data_)), internal_count(0) {}};std::atomiccounted_node_ptr head;void increase_head_count(counted_node_ptr old_counter) {counted_node_ptr new_counter;do {new_counter old_counter;new_counter.external_count;} while (!head.compare_exchange_strong(old_counter, new_counter, std::memory_order_acquire, std::memory_order_relaxed));old_counter.external_count new_counter.external_count;}
public:std::shared_ptrT pop() {counted_node_ptr old_head head.load(std::memory_order_relaxed);for (;;) {increase_head_count(old_head);node* const ptr old_head.ptr;if (!ptr) {return std::shared_ptrT();}if (head.compare_exchange_strong(old_head, ptr-next, std::memory_order_relaxed)) {std::shared_ptrT res;res.swap(ptr-data);int const count_increase old_head.external_count - 2;if (ptr-internal_count.fetch_add(count_increase, std::memory_order_relaxed) -count_increase) { // ?delete ptr;}return res;} else if (ptr-internal_count.fetch_add(-1, std::memory_order_relaxed) 1) {ptr-internal_count.load(std::memory_order_acquire);delete ptr;}}}void push(T const data) {counted_node_ptr new_node;new_node.ptr new node(data);new_node.external_count 1;new_node.ptr-next head.load(std::memory_order_relaxed);while (!head.compare_exchange_weak(new_node.ptr-next, new_node, std::memory_order_release, std::memory_order_relaxed));}~lock_free_stack() {while (pop());}
};
这里的push唯一的原子操作是compare_exchange_weak如果需要在线程间构成先行关系则代码需要一项释放操作因此compare_exchange_weak必须采用std::memory_order_release或者更严格的内存次序。
若compare_exchange_weak执行失败则指针head和new_node均无变化代码继续执行这种情况下使用memory_order_relaxed即可。
没懂为什么是这两个次序 这里的pop访问next指针前进行了额外操作。也就是先调用了increase_head_count()该操作收到memory_order_acquire或者更严格的内存次序约束。因为这里通过原子操作获取的head指针旧值访问next指针。对原子操作失败的情况则使用宽松次序。
因为push中的存储行为是释放操作popincrease_head这里的是获取操作因此存储行为和载入操作同步构成先行关系。因此对于push中的成员指针ptr的存储操作先行发生然后pop才会在increase_head_count()中访问ptr-next代码符合线程安全。
push中的head.load不影响上述内存次序关系的分析
剩余的很多没看懂在《cpp并发实战》p240有空多看看。 2.6 实现线程安全的无锁队列
对于队列其pop和push分别访问不同的部分。
#include atomic
#include memory
#include mutextemplatetypename T
class lock_free_queue {
private:struct node {std::shared_ptrT data;node* next;node() : next(nullptr) {}};std::atomicnode* head;std::atomicnode* tail;std::unique_ptrnode pop_head() {node* const old_head head.load();if (head.get() tail) {return nullptr;}head.store(old_head-next);return old_head;}public:lock_free_queue() : head(new node), tail(head.load()) {}lock_free_queue(const lock_free_queue other) delete;lock_free_queue operator(const lock_free_queue other) delete;~lock_free_queue() {while (node* const old_head head.load()) {head.store(old_head-next);delete old_head;}}std::shared_ptrT pop() {node* old_head pop_head();if (!old_head) {return std::shared_ptrT();}std::shared_ptrT const res(old_head-data);delete old_head;return res;}void push(T new_value) {std::shared_ptrT new_data(std::make_sharedT(std::move(new_value)));node* p new node;node* const old_tail tail.load();old_tail-data.swap(new_data);old_tail-next p;tail.store(p);}
};
tail指针的存储操作storepush的最后一句和载入操作loadpop_head的if里面的条件判断存在同步。 但是若是由多个线程进行操作上述代码就不可行。 其余细节具体实现见书。 3 实现无锁数据结构的原则 3.1 在原型设计中使用std::memory_order_seq_cst次序
它令全部操作形成一个确定的总序列比较好分析。在这种意义上使用其他内存次序就成为了一种优化。 3.2 使用无锁的内存回收方案
无所代码中的内存管理很难。最基本的要求是只要目标对象仍然有可能背其他线程指涉就不删除。
在这里介绍了三种方法来满足及时删除无用对象
1 暂缓所有删除对象的动作等到五线程访问再删除类似gc
2 风险指针
3 引用计数 3.3 防范ABA问题
在所有设计 比较-交换 的算法中都要防范ABA问题。
该问题产生的过程如下
步骤1线程甲读取原子变量x得知其值为A。
步骤2线程甲根据A执行某项操作比如查找或如果x是指针则依据它提取出相关值。
步骤3线程甲因操作系统调度而发生阻塞。
步骤4另一线程对原子变量x执行别的操作将其值改成B。
步骤5又有线程改变了与A相关的数据使得线程甲原本持有的值失效步骤2中的相关值。这种情形也许是A表示某内存地址而改动操作则是释放指针的目标内存或变更目标数据最后将产生严重后果。
步骤6原子变量x再次被某线程改动重新变回A。若x属于指针型别其指向目标可能在步骤5被改换程一个新对象。
步骤7线程甲继续运行在原子变量x上执行 比较-交换 操作与A进行对比。因此 比较-交换 操作成功执行因为x的值仍然为A但A的关联数据却不再有效即原本在步骤2取的相关值已经失效但是线程甲却无从分辨这将破坏数据结构。 该问题最常见的解决方法之一是在原子变量x中引入一个ABA计数器。将变量x和计数器组成单一结构作为一个整体执行 比较-交换 操作。 3.4 找出忙等循环协助其他线程
若两个线程同时执行push操作那么必须等另一个结束才可以继续运行。这是忙等浪费cpu。在本例中将非原子变量的数据成员改为原子变量并采用 比较-交换 操作设置其值。 4 小结
这节很难