做收钱的网站要什么条件,网站建设管理工作小结,wordpress中文用户名,北京网站制作公司兴田德润实惠背景
在实际业务开发中#xff0c;我们会遇到以下场景#xff1a;请求数据库#xff0c;批量获取1000条数据记录后#xff0c;处理数据 为了减少因一次批量获取的数据太多#xff0c;导致的数据库延时增加#xff0c;我们可以把一次请求拆分成多次请求#xff0c;并发去…背景
在实际业务开发中我们会遇到以下场景请求数据库批量获取1000条数据记录后处理数据 为了减少因一次批量获取的数据太多导致的数据库延时增加我们可以把一次请求拆分成多次请求并发去处理当所有的并发请求完成后再继续处理这些返回的数据 golang中的WaitGroup就可以帮助我们实现上述的场景
快速入门
背景开启10个goroutine并发执行等待所有goroutine执行完成后当前goroutine打印执行完成
func TestWaitGroup(t *testing.T) {var wg sync.WaitGroupfor i : 0; i 10; i {index : igo func() {wg.Add(1)defer wg.Done()fmt.Println(fmt.Sprintf(%v 正在执行, index))}()}wg.Wait()fmt.Println(TestWaitGroup method done)
}源码分析
golang版本1.18.2
源码路径src/sync/waitgroup.go
// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
// goroutines to wait for. Then each of the goroutines
// runs and calls Done when finished. At the same time,
// Wait can be used to block until all goroutines have finished.
//
// A WaitGroup must not be copied after first use.
// WaitGroup 等待 goroutine 集合完成
// 主 goroutine 调用 Add 设置等待的 goroutine 数量
// 然后每个 goroutine 运行并在完成时调用 Done
// 同时Wait 可以用来阻塞直到所有 goroutine 都完成
type WaitGroup struct {noCopy noCopy// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.// 64-bit atomic operations require 64-bit alignment, but 32-bit// compilers only guarantee that 64-bit fields are 32-bit aligned.// For this reason on 32 bit architectures we need to check in state()// if state1 is aligned or not, and dynamically swap the field order if// needed.// 64位值高32位是计数器低32位是waiter计数// 64位原子操作需要64位对齐但32位编译器仅保证64位字段是32位对齐的// 因此在 32 位架构上我们需要在 state() 中检查 state1 是否对齐并在需要时动态“交换”字段顺序state1 uint64state2 uint32
}
noCopyWaitGroup在首次使用后不能被复制 state1state2一共占用12字节保存了三类信息4字节保存goroutine计数4字节保存waiter计数4字节保存信号量 WaitGroup对外提供了以下三个方法
// 设置等待的goroutine数量
func (wg *WaitGroup) Add(delta int)
// goroutine执行完成
func (wg *WaitGroup) Done()
// 阻塞等待所有的goroutine都执行完成
func (wg *WaitGroup) Wait()Add
// state returns pointers to the state and sema fields stored within wg.state*.
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {if unsafe.Alignof(wg.state1) 8 || uintptr(unsafe.Pointer(wg.state1))%8 0 {// state1 is 64-bit aligned: nothing to do.return wg.state1, wg.state2} else {// state1 is 32-bit aligned but not 64-bit aligned: this means that// (state1)4 is 64-bit aligned.state : (*[3]uint32)(unsafe.Pointer(wg.state1))return (*uint64)(unsafe.Pointer(state[1])), state[0]}
}// Add adds delta, which may be negative, to the WaitGroup counter.
// If the counter becomes zero, all goroutines blocked on Wait are released.
// If the counter goes negative, Add panics.
//
// Note that calls with a positive delta that occur when the counter is zero
// must happen before a Wait. Calls with a negative delta, or calls with a
// positive delta that start when the counter is greater than zero, may happen
// at any time.
// Typically this means the calls to Add should execute before the statement
// creating the goroutine or other event to be waited for.
// If a WaitGroup is reused to wait for several independent sets of events,
// new Add calls must happen after all previous Wait calls have returned.
// See the WaitGroup example.
// Add 将 delta可能为负添加到 WaitGroup 计数器。
// 如果计数器变为零则所有在 Wait 上阻塞的 goroutine 都会被释放。
// 如果计数器变为负数则添加panic。
// 请注意计数器为零时发生的具有正增量的调用必须在等待之前发生。
// 具有负增量的调用或在计数器大于零时开始的具有正增量的调用可能随时发生。
// 通常这意味着对 Add 的调用应该在创建 goroutine 或其他要等待的事件的语句之前执行。
// 如果重用一个 WaitGroup 来等待几个独立的事件集新的 Add 调用必须在所有先前的 Wait 调用返回后发生。
func (wg *WaitGroup) Add(delta int) {statep, semap : wg.state()if race.Enabled {_ *statep // trigger nil deref earlyif delta 0 {// Synchronize decrements with Wait.race.ReleaseMerge(unsafe.Pointer(wg))}race.Disable()defer race.Enable()}// 记录goroutine计数state : atomic.AddUint64(statep, uint64(delta)32)// 获取goroutine计数v : int32(state 32)// 获取waiter计数w : uint32(state)if race.Enabled delta 0 v int32(delta) {// The first increment must be synchronized with Wait.// Need to model this as a read, because there can be// several concurrent wg.counter transitions from 0.race.Read(unsafe.Pointer(semap))}// goroutine计数小于0if v 0 {panic(sync: negative WaitGroup counter)}// w ! 0说明已经执行了Wait且还有阻塞等待的goroutine此时不允许在执行Addif w ! 0 delta 0 v int32(delta) {panic(sync: WaitGroup misuse: Add called concurrently with Wait)}// 存在没有执行完成的goroutine或者当前没有waiter直接返回if v 0 || w 0 {return}// This goroutine has set counter to 0 when waiters 0.// Now there cant be concurrent mutations of state:// - Adds must not happen concurrently with Wait,// - Wait does not increment waiters if it sees counter 0.// Still do a cheap sanity check to detect WaitGroup misuse.// 此时goroutine计数为0且waiter计数大于0不然上一步就返回了// 现在以下状态不能同时发生// 1. 并发调用Add和Wait// 2. 当goroutine计数为0时Wait不会继续增加waiter计数// 仍然做一个廉价的健全性检查来检测 WaitGroup 的滥用防止以上情况发生if *statep ! state {panic(sync: WaitGroup misuse: Add called concurrently with Wait)}// Reset waiters count to 0.// 重置waiter计数*statep 0// 唤醒所有的waiterfor ; w ! 0; w-- {runtime_Semrelease(semap, false, 0)}
}delta代表本次需要记录的goroutine计数可能为负数 64位原子操作需要64位对齐但32位编译器仅保证64位字段是32位对齐的 当state1是64位对齐时state1高32位是goroutine计数低32位是waiter计数 当state1不是64位对齐时动态“交换”字段顺序 记录goroutine计数的变化delta 如果goroutine计数小于0则直接panic 如果已经执行了Wait且还有阻塞等待的goroutine此时不允许在执行Add 如果存在没有执行完成的goroutine或者当前没有waiter直接返回 当goroutine计数为0且waiter计数大于0时现在以下状态不能同时发生
并发调用Add和Wait 当goroutine计数为0时Wait不会继续增加waiter计数
简单校验通过后重置waiter计数为0唤醒所有阻塞等待的waiter
Done
// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {wg.Add(-1)
}调用Adddelta -1代表goroutine计数-1
Wait
// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {statep, semap : wg.state()if race.Enabled {_ *statep // trigger nil deref earlyrace.Disable()}for {state : atomic.LoadUint64(statep)// 获取goroutine计数v : int32(state 32)// 获取waiter计数w : uint32(state)// goroutine计数为0不需要等待直接返回if v 0 {// Counter is 0, no need to wait.if race.Enabled {race.Enable()race.Acquire(unsafe.Pointer(wg))}return}// Increment waiters count.// waiter计数1if atomic.CompareAndSwapUint64(statep, state, state1) {if race.Enabled w 0 {// Wait must be synchronized with the first Add.// Need to model this is as a write to race with the read in Add.// As a consequence, can do the write only for the first waiter,// otherwise concurrent Waits will race with each other.race.Write(unsafe.Pointer(semap))}// 阻塞等待goroutine计数为0后唤醒继续执行runtime_Semacquire(semap)// Wait还没有执行完成就开始复用WaitGroupif *statep ! 0 {panic(sync: WaitGroup is reused before previous Wait has returned)}if race.Enabled {race.Enable()race.Acquire(unsafe.Pointer(wg))}return}}
}调用state()保证字段内存对齐 如果goroutine计数为0不需要等待直接返回 尝试对waiter计数1若失败则继续下一轮重试 对waiter计数1成功则阻塞当前goroutine等待goroutine计数为0后唤醒继续执行 唤醒继续执行后简单判断是否存在Wait还没有执行完成就开始复用WaitGroup的情况如果有则panic如果没有则直接返回