方便的并发,是Golang的一大特色优势,而使用并发,对sync包的WaitGroup不会陌生。WaitGroup主要用来做Golang并发实例即Goroutine的等待,当使用go启动多个并发程序,通过waitgroup可以等待所有go程序结束后再执行后面的代码逻辑,比如:

func Main() {
wg := sync.WaitGroup{}
for i := ; i < ; i++ {
wg.Add()
go func() {
defer wg.Done()
time.Sleep( * time.Second)
}() }
wg.Wait() // 等待在此,等所有go func里都执行了Done()才会退出
}

WaitGroup对外提供三个方法,Add(int),Done()和Wait(), 其中Done()是调用了Add(-1),一般使用方法是,先统一Add,在goroutine里并发的Done,然后Wait。

WaitGroup主要维护了2个计数器,一个是请求计数器 v,一个是等待计数器 w,二者组成一个64bit的值,请求计数器占高32bit,等待计数器占低32bit。

每次Add执行,请求计数器v加1,Done方法执行,请求计数器减1,v为0时通过信号量唤醒Wait()。

那么等待计数器拿来干嘛?是因为同一个实例的Wait()方法支持多处调用,每一次Wait()方法执行,等待计数器 w 就会加1,而当请求计数器v为0触发Wait()时,要根据w的数量发送w份的信号量,正确的触发所有的Wait(),这虽然不是常用的一个特性,但是在一些特殊场合是有用处的(比如多个并发都依赖于WaitGroup的实例的结束信号来进行下一个action),演示代码如下:

func main() {
wg := sync.WaitGroup{}
for i := ; i < ; i++ {
wg.Add()
go func() {
defer wg.Done()
​ }()
}
time.Sleep( * time.Second)
for j := ; j < ; j++ {
go func(i int) {
// 3个地方调用Wait(),通过等待j计时器,每个Wati都会被hu唤醒
wg.Wait()
fmt.Println("wait done now ", i)
}(j)
}
time.Sleep( * time.Second)
return
}
/*
输出如下,数字出现的顺序随机
wait done now 1
wait done now 0
wait done now 2
*/

同时,WaitGroup里还对使用逻辑进行了严格的检查,比如Wait()一旦开始不能Add().

下面是带注释的代码,去掉了不影响代码逻辑的trace部分:

func (wg *WaitGroup) Add(delta int) {
statep := wg.state()
// 更新statep,statep将在wait和add中通过原子操作一起使用
state := atomic.AddUint64(statep, uint64(delta)<<)
v := int32(state >> )
w := uint32(state)
if v < {
panic("sync: negative WaitGroup counter")
}
if w != && delta > && v == int32(delta) {
// wait不等于0说明已经执行了Wait,此时不容许Add
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 正常情况,Add会让v增加,Done会让v减少,如果没有全部Done掉,此处v总是会大于0的,直到v为0才往下走
// 而w代表是有多少个goruntine在等待done的信号,wait中通过compareAndSwap对这个w进行加1
if v > || w == {
return
}
// This goroutine has set counter to 0 when waiters > 0.
// Now there can't be concurrent mutations of state:
// - Adds must not happen concurrently with Wait,
// - Wait does not increment waiters if it sees counter == 0.
// Still do a cheap sanity check to detect WaitGroup misuse.
// 当v为0(Done掉了所有)或者w不为0(已经开始等待)才会到这里,但是在这个过程中又有一次Add,导致statep变化,panic
if *statep != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// Reset waiters count to 0.
// 将statep清0,在Wait中通过这个值来保护信号量发出后还对这个Waitgroup进行操作
*statep =
// 将信号量发出,触发wait结束
for ; w != ; w-- {
runtime_Semrelease(&wg.sema, false)
}
} // Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
wg.Add(-)
} // Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
statep := wg.state()
for {
state := atomic.LoadUint64(statep)
v := int32(state >> )
w := uint32(state)
if v == {
// Counter is 0, no need to wait.
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
// Increment waiters count.
// 如果statep和state相等,则增加等待计数,同时进入if等待信号量
// 此处做CAS,主要是防止多个goroutine里进行Wait()操作,每有一个goroutine进行了wait,等待计数就加1
// 如果这里不相等,说明statep,在 从读出来 到 CAS比较 的这个时间区间内,被别的goroutine改写了,那么不进入if,回去再读一次,这样写避免用锁,更高效些
if atomic.CompareAndSwapUint64(statep, state, state+) {
if race.Enabled && w == {
// Wait must be synchronized with the first Add.
// Need to model this is as a write to race with the read in Add.
// As a consequence, can do the write only for the first waiter,
// otherwise concurrent Waits will race with each other.
race.Write(unsafe.Pointer(&wg.sema))
}
// 等待信号量
runtime_Semacquire(&wg.sema)
// 信号量来了,代表所有Add都已经Done
if *statep != {
// 走到这里,说明在所有Add都已经Done后,触发信号量后,又被执行了Add
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}

最新文章

  1. RunLoop
  2. Spring中配置和读取多个Properties文件--转
  3. go语言的selector
  4. 平衡二叉树,AVL树之图解篇
  5. 自己使用Fresco时遇到的相关问题
  6. 各种类型的Writable(Text、ByteWritable、NullWritable、ObjectWritable、GenericWritable、ArrayWritable、MapWritable、SortedMapWritable)转
  7. 淘宝HSF服务的原理以及简单的实现
  8. 用BroadcastReceiver监听网络状态的变化
  9. js深入研究之Person类案例
  10. Servlet的学习笔记
  11. hdu1024
  12. JavaScript操作符汇总
  13. 【一天一道LeetCode】#21. Merge Two Sorted Lists
  14. 【TCP协议】(3)---TCP粘包黏包
  15. 关于word-break和word-wrap的使用和区别
  16. jQuery手机触屏拖动滑块验证跳转插件
  17. 【原创】大数据基础之Spark(3)Spark Thrift实现原理及代码实现
  18. Servlet+jSP+java实现商品信息和流水的操作
  19. YTKNetwork源码详解
  20. dart实例

热门文章

  1. 对Java中可变参数的理解
  2. 两个大数相乘 - 高精度FFT
  3. Nginx的一理解(1)
  4. scratch3.0二次开发scratch3.0基本介绍(第一章)
  5. &lt;s:select&gt;自动加标签
  6. 代码审计之CVE-2018-7600-Drupal远程代码执行漏洞-Render API
  7. Solution: 题解 CF1196E Connected Component on a Chessboard
  8. HttpClient介绍和使用
  9. C语言博客作业8
  10. 压力测试---Jemeter的使用