原帖见Go sync.Cond, the Most Overlooked Sync Mechanism。
文中对
Signal()
原理的阐述相当精彩。BTW,我目前要做的一些开发工作和VictoriaMetrics性能监控强相关,未来会写一些VM和Prometheus的介绍博客也说不定……
sync.Cond
是Go语言的一种同步原语,但不像sync.Mutex
或sync.WaitGroup
那样被广泛使用。我们一般很少在项目代码,甚至标准库代码中用到它,反而倾向使用别的同步机制替代。
尽管如此,作为一个Go开发工程师,你应该不太会想在读用了sync.Cond
的代码时理解困难,毕竟它也是标准库的一部分呀。
那么这篇文章就能帮你弭平差距,甚至让你清晰地理解条件变量实际上是怎么工作的。
什么是sync.Cond?
现在我们开始拆解sync.Cond
。
当一个goroutine需要等待特定事件发生,比如某些共享数据的更新,它可以**“阻塞”**,也就是暂停工作,直到得到了恢复运行的消息。要实现这一点,最基本的方法是使用循环,或许再加一句time.Sleep
,防止CPU大量空转。
这种方法大概是这样的:
// 等待直到条件为true
for !condition {
}
// 或者这样
for !condition {
time.Sleep(100 * time.Millisecond)
}
但这样做其实不是特别高效,因为循环仍然在后台运行,甚至在无事发生的时候还在占用着CPU循环。
所以我们就需要使用sync.Cond
,它提供了一种更好的手段。如果我们更学术一点,我们应该称其为“条件变量”。
- 当一个goroutine在等待某事发生时(等待特定条件变为真),它可以调用
Wait()
。 - 另一个goroutine,一旦发现条件被满足,可以调用
Signal()
或Broadcast()
来唤醒等待的goroutine,让它们继续工作。
以下是sync.Cond
提供的基本接口:
// 在条件满足之前阻塞调用它的goroutine
func (c *Cond) Wait() {}
// 如果有的话,唤醒一个正在等待的goroutine
func (c *Cond) Signal() {}
// 唤醒所有正在等待的goroutine
func (c *Cond) Broadcast() {}
来看一个例子。假设我们在等待一种特定的宝可梦,它出现的时候我们想要提醒别的goroutine。
var pokemonList = []string{"Pikachu", "Charmander", "Squirtle", "Bulbasaur", "Jigglypuff"}
var cond = sync.NewCond(&sync.Mutex{})
var pokemon = ""
func main() {
// 消费者
go func() {
cond.L.Lock()
defer cond.L.Unlock()
// 一直等到皮卡丘出现
for pokemon != "Pikachu" {
cond.Wait()
}
println("Caught" + pokemon)
pokemon = ""
}()
// 生产者
go func() {
// 每1ms随机出现一只宝可梦
for i := 0; i < 100; i++ {
time.Sleep(time.Millisecond)
cond.L.Lock()
pokemon = pokemonList[rand.Intn(len(pokemonList))]
cond.L.Unlock()
cond.Signal()
}
}()
time.Sleep(100 * time.Millisecond) // lazy wait
}
// Output:
// Caught Pikachu
在这个例子里,一个goroutine在等待皮卡丘出现,另一个从列表里随机选择一只宝可梦,并告知消费者。
当生产者发送信号时,消费者被唤醒,并检查是不是出现的要等待的那只宝可梦。如果是,我们就抓住它,反之我们继续等待下一只出现。
问题是,在生产者发送信号和消费者被唤醒之间是有一个时间差的。此时,出现的宝可梦可能改变,因为消费者被唤醒花费的时间可能多于1ms,又或者别的goroutine改变了共享的宝可梦数据。因此,sync.Cond
实际上再说:“嘿!有事情发生啦!快醒过来看看,但你要是赖床,事情可能又要变啦。”
要是消费者被唤醒得太迟,宝可梦可能会逃跑,它就不得不重新阻塞。
可我明明可以用channel向其他的goroutine发送宝可梦名称或者信号呀
这当然没问题。事实上,channel常被用来替代sync.Go
,就是因为它更简单、更惯用,大多数开发者更熟悉。
在上面的例子里,你可以简单实用一个channel发送宝可梦的名字,或者用一个空结构体struct{}
发送信号,而不传递任何数据。但我们的问题不只是用channel传递消息,而是如何处理共享状态。
我们的例子很简单,但要是多个goroutine同时访问共享的宝可梦变量,使用channel会导致什么呢?
- 要是我们用channel传递宝可梦名称,我们就还要用一把互斥锁保护共享变量。
- 如果我们仅使用channel发送信号,我们仍然需要互斥锁管理共享状态。
- 如果我们在生产者里确认宝可梦是皮卡丘,再把它发送给channel,我们还是需要互斥锁。更何况,这种做法违反了分工解耦的设计原则:生产者在处理属于消费者的逻辑。
综上,只要是多个goroutine同时改变共享数据的场景,我们就需要互斥锁来保护共享状态。所以我们经常看到channel和mutex搭配使用的方式,以确保正确同步和数据安全。
行吧,那广播信号又是怎么一回事呢?
好问题!你确实可以通过关闭channel的方式(close(ch)
)广播信号给所有等待的goroutine。当你这样做的时候,所有从该channel接收消息的goroutine都会得到提醒。但当心,被关闭的channel是不能复用的,一旦关闭就永远关闭了。
BTW,在Go 2里面一直有移除sync.Cond
的讨论:proposal: sync: remove the Cond type。
那既然如此,
sync.Cond
到底擅长做什么呢?
在一些场景里,sync.Cond
会比channel更合适:
- 你可以用channel给goroutine发送信号(给channel传值),或者通知所有的goroutine(关闭channel),但你不能同时做这两件事。
sync.Cond
支持更细粒度的控制。你能用Signal()
唤醒单独的goroutine,或者用Broadcast()
唤醒全部goroutine。 - 此外,你还可以无限次使用
Broadcast()
,但你只能关闭一次channel,再关闭一次就panic了。 - channel不支持保护共享数据,你必须额外使用互斥锁。但
sync.Cond
却在同一个包里继承了锁机制和信号机制(性能也更好)。
为什么
sync.Cond
要内嵌一个锁?
理论上,sync.Cond
这样的条件变量没必要附加锁。
用户完全可以在条件变量之外自行管理锁,这样听起来更灵活。这样设计并非出自技术限制,而是为了避免人为错误。
人为管理锁很容易出错,因为条件变量的使用方式不直观:你得先解锁,再Wait()
,然后在goroutine被唤醒时再拿锁。这个过程有几分尴尬,而且容易出错(忘了拿锁,或者在错误的时机解锁)。
但为什么这种锁机制看上去这么古怪呢?
通常,调用cond.Wait()
的goroutine需要在循环中检查一些贡献状态,比如:
for !checkSomeSharedState() {
cond.Wait()
}
把锁嵌入sync.Cond
能帮我们处理前述加锁/解锁的过程,代码会更清晰,不容易出错,后文我们会详细讨论。
怎么使用sync.Cond?
如果你认真看了前面的例子,不难发现消费者里的使用模式:我们总是获取互斥锁后再等待(.Wait()
)条件满足;我们总在条件满足后再解锁。
除此之外,我们把等待条件写进循环里:
// 消费者
go func() {
cond.L.Lock()
defer cond.L.Unlock()
// 等待皮卡丘出现
for pokemon != "Pikachu" {
cond.Wait()
}
println("Caught" + pokemon)
}()
Cond.Wait()
当我们调用sync.Cond
的Wait()
方法时,我们是在命令当前goroutine阻塞直至某个条件满足。
这幕后其实发生了这些事:
- goroutine被添加到一个等待列表中(记录了等待相同条件的goroutine),这些goroutine都被阻塞,也即,除非被
Singnal()
或Broadcast()
唤醒,不得继续工作。 - 关键部分:互斥锁必须在
Wait()
之前上锁,因为Wait()
会在goroutine沉睡前自动释放锁(调用Unlock()
)。这将允许其它goroutine获取锁,并在之前goroutine等待时完成它们的工作。 - 当goroutine被唤醒时(通过
Signal()
或Broadcast()
),它会先重新获取锁(Lock()
),再工作。
Wait()
底层是这样工作的:
func (c *Cond) Wait() {
// 检查Cond实例是否有被复制
c.checker.check()
// 获取ticket
t := runtime_notifyListAdd(&c.notify)
// 解锁
c.L.Unlock()
// 阻塞goroutine
runtime_notifyListWait(&c.notify, t)
// 重新拿锁
c.L.Lock()
}
尽管这个过程不难,我们还是总结出了4点:
- checker用于防止
Cond
实例被复制,如果这样,它会panic。 - 调用
cond.Wait()
会立即释放互斥锁,故互斥锁在cond.Wait()
被调用之前必须被获取到,否则会panic。 - 在被唤醒之后,
cond.Wait()
会重新获取互斥锁,这表明完成了对共享数据的操作后需要再次将之解锁。 sync.Cond
的大部分功能都基于内部数据结构notifyList
实现,这个结构使用基于ticket的通知方式。
因为存在这种加锁/解锁的行为,你在使用sync.Cond.Wait()
时必须遵从特定规范以免出错:
c.L.Lock()
for !condition() {
c.Wait()
}
// 基于条件执行任务
c.L.Unlock()
为什么不直接使用
c.Wait()
,非要在循环里面用?
当Wait()
返回时,我们不能假定等待的条件立刻为真。在我们的goroutine被唤醒时,别的goroutine可能已经操作了共享状态,导致条件为假。因此,为了应付这种情况,我们把Wait()
放进循环。
在宝可梦的例子里,我们也提及过这个问题。
循环持续检查条件,只有条件为真时goroutine才能跳出来。
Cond.Signal()
& Cond.Broadcast()
Signal()
用于唤醒一个正在等待条件变量的goroutine。
- 如果没有在等待的goroutine,
Signal()
不会做任何操作。 - 如果有goroutine等待,
Signal()
会唤醒队列中的第一个goroutine。所以,如果你启动了大量goroutine,从0到n,只有第0个会被Signal()
唤醒。
我们过一个例子:
func main() {
cond := sync.NewCond(&sync.Mutex{})
for i := range 10 {
go func(i int) {
cond.L.Lock()
defer cond.L.Unlock()
cond.Wait()
fmt.Println(i)
}(i)
time.Sleep(time.Millisecond)
}
time.Sleep(100 * time.Millisecond) // 等待goroutine启动
cond.Signal()
time.Sleep(100 * time.Millisecond) // 等待goroutine被唤醒
}
// Output:
// 0
Signal()
被用来唤醒一个goroutine,告诉它条件可能被满足。它的实现是这样的:
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
你不需要在调用Signal()
之前拿锁,但这么做通常是个好主意,尤其是你想修改共享数据,而这份数据又在被并发访问的情况下。
那cond.Broadcast()
呢?
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
Broadcast()
被调用时唤醒所有的goroutine,将它们从队列清除。它的内部逻辑很简单,隐藏在runtime_notifyListNotifyAll()
里。
func main() {
cond := sync.NewCond(&sync.Mutex{})
for i := range 10 {
go func(i int) {
cond.L.Lock()
defer cond.L.Unlock()
cond.Wait()
fmt.Println(i)
}(i)
}
time.Sleep(100 * time.Millisecond) // 等待goroutine启动
cond.Broadcast()
time.Sleep(100 * time.Millisecond) // 等待goroutine被唤醒
}
// Output:
// 8
// 6
// 3
// 2
// 4
// 5
// 1
// 0
// 9
// 7
此时,所有的goroutine在100毫秒内被唤醒,但顺序无法保证。
当Broadcast()
被调用时,它把所有等待的goroutine标记为准备运行,但它们不会立刻启动,而是被Go调度器的底层算法选择,其顺序是不可预测的。
内部原理
我们所有的Go博客里都有一节内部原理。理解设计思路和其解决的问题通常很有意义。
Copy checker
sync包里的copyChecker
被用来检查Cond
对象在初次使用后被复制的情况。这里“初次使用”包括Wait()
、Signal()
以及Broadcast()
。
如果Cond
在这些操作后被复制,程序会panic:“sync.Cond is copied”。
在sync.WaitGroup
和sync.Pool
里也有类似的设计,它们使用noCopy
字段防止复制,但这些例子里不会有panic发生。
此处的copyChecker
事实上只是一个uintptr
,即保存内存地址的整型,它是这样工作的:
- 初次使用
sync.Cond
之后,copyChecker
保存它自己的内存地址,指向cond.copyChecker
对象。 - 如果该对象被拷贝,
copyChecker
的地址(&cond.copyChecker
)会发生变化(因为新的拷贝位于不同的内存地址),但copyChecker
包含的uintptr
不会变。
这里的检查很简单:对比copyChecker
地址和uintptr
保存的值,要是不同就panic。
即使逻辑很简单,如果不熟悉Go的原子操作和unsafe包的话,其实现看起来会有点古怪:
// copyChecker 保存指向自己的指针来检查对象拷贝
type copyChecker uintptr
func (c *copyChecker) check() {
if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
uintptr(*c) != uintptr(unsafe.Pointer(c)) {
panic("sync.Cond is copied")
}
}
由于第一项和最后一项检查基本相同,我们可以将上述代码拆解成两个主要的检查步骤。
第一项检查,uintptr(*c) != uintptr(unsafe.Pointer(c))
,确认内存地址是否发生变化,如果是,则对象有被拷贝。但这里有一个问题,如果copyChecker
是第一次被使用,由于还未初始化,两边都是零值。
第二项检查,!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c)))
,使用CAS操作同时处理初始化和检查任务:
- 如果CAS成功,说明
copyChecker
刚刚被初始化,故对象还没有被复制。 - 如果CAS失败,说明
copyChecker
已经初始化,那么我们就需要进行最终检查uintptr(*c) != uintptr(unsafe.Pointer(c))
,以确认对象未被拷贝。
最后一次检查(和第一次检查完全一致),确保对象在上面的操作之后也没有被复制。
为什么最后还有一次检查?两次检查还不够吗?
第三次检查的原因是前两次检查操作不是原子性的。
如果检查时copyChecker
是第一次使用,此时它还没有被初始化,为零值。如果这样,前两次检查就会错误通过,尽管对象没有被拷贝,但它也没有被初始化。
notifyList:基于tick的提醒列表
除了锁和复制检查机制,sync.Cond
的另一个重要部分就是notifyList
。
type Cond struct {
noCopy noCopy
L Locker
notify notifyList
checker copyChecker
}
type notifyList struct {
wait uint32
notify uint32
lock uintptr
head unsafe.Pointer
tail unsafe.Pointer
}
如今,sync包和运行时包的notifyList
不一样,但使用了相同的名字和内存布局(这是故意为之)。为了理解其运行逻辑,我们得看看运行时包中的版本:
type notifyList struct {
wait atomic.Uint32
notify uint32
lock mutex
head *sudog
tail *sudog
}
如果你看到head和tail,你大概会猜这是某种链表的实现,那你就猜对了。这是一个sudog链表(含义是伪goroutine,pseudo-goroutine),代表着等待同步事件的goroutine,比如等待从channel收发数据,等待一个条件变量。
head
和tail
是链表中首尾两个goroutine。同时,wait
和notify
字段是持续增长的ticket号,代表队列中的一个位置。
wait
:代表要发放给等待状态goroutine的下一个ticket。notify
:代表下一个应该被通知,或者说唤醒的位置。
这就是notifyList
背后的核心思想了,结合起来之后是这样的。
notifyListAdd()
当一个goroutine将要等待一次通知时,它会调用notifyListAdd()
来获取ticket。
func (c *Cond) Wait() {
c.checker.check()
// 获取ticket
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
// 将goroutine加入队列并阻塞
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
func notifyListAdd(l *notifyList) uint32 {
return l.wait.Add(1) - 1
}
ticket分配由一个原子计数器负责。因此,当一个goroutine的调用notifyListAdd()
时,计数器递增,goroutine会拿到下一个可用的ticket号。
每一个goroutine都能拿到一个唯一的ticket号,这一过程无需任何锁。这也意味着多个goroutine能够同时请求ticket,无需等待。
例如,如果ticket计数器当前指向5,调用notifyListAdd()
的goroutine会拿到ticket号5,计数器会自增到6,等待下一次调用。wait
字段总是指向下一个被发放的ticket号。
但这里事情稍微有些绕了。
由于多个goroutine能够同时获取ticket,它们调用notifyListAdd()
和进入notifyListWait()
的时机之间有些微间隙。所以尽管ticket号是递增发放的,它们的顺序也不能被保证。goroutine加入链表的顺序可能是1, 2, 3
,但最后的顺序可能是3, 2, 1
或者2, 1, 3
,这都取决于间隔时间。
拿到ticket之后,goroutine的下一步就是等待被通知。这发生在goroutine调用notifyListWait(t)
之时,t
就是它刚刚获取的ticket号。
func notifyListWait(l *notifyList, t uint32) {
lockWithRank(&l.lock, lockRankNotifyList)
// 如果这张ticket已经被通知,立刻返回
if less(t, l.notify) {
unlock(&l.lock)
return
}
// 加入链表
s := acquireSudog()
...
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
goparkunlock(&l.lock, waitReasonSyncCondWait, traceBlockCondWait, 3)
...
releaseSudog(s)
}
首先,goroutine检查自己的ticket是不是已经被通知。
它对比自己的ticket(t
)和当前的通知号。如果通知号已经大于自己的ticket,他就没必要等待了,会直接跳转到共享资源开始工作。
这种快速检查很重要,特别是在我们分开Signal()
和Broadcast()
的情况下。但如果goroutine的ticket还没被通知,它就把自己加入链表,进入睡眠状态,直到被通知。
notifyListNotifyOne()
当需要通知等待中的goroutine时,系统从当前未被通知的最小ticket号开始,这由l.notify
追踪。
func notifyListNotifyOne(l *notifyList) {
// 快速检查:要是没人等待,直接退出
if l.wait.Load() == atomic.Load(&l.notify) {
return
}
lockWithRank(&l.lock, lockRankNotifyList)
// 再次检查,确保真的需要执行后续逻辑
t := l.notify
if t == l.wait.Load() {
unlock(&l.lock)
return
}
// 更新到下一个需要通知的ticket
atomic.Store(&l.notify, t+1)
// 在链表中找到对应的goroutine
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
if s.ticket == t {
// 找到了匹配的goroutine
n := s.next
if p != nil {
p.next = n
} else {
l.head = n
}
if n == nil {
l.tail = p
}
unlock(&l.lock)
s.next = nil
readyWithTime(s, 4) // 标记goroutine为就绪状态
return
}
}
unlock(&l.lock)
}
还记得我们说过ticket不保证顺序?
我们可能有goroutine的ticket分别是2, 1, 3
,但通知号总是顺序递增的。所以,当系统要唤醒一个goroutine时,就需要遍历链表,找到拿着下一张ticket的goroutine。一旦找到了,它就把它从链表中移除,将之标记为就绪状态。
但这里还有些有趣的地方,有时候这里还会有时机问题。假设一个goroutine拿到了ticket,但是在这个函数运行时还没有被加入链表。
接下来会发生什么?比如,代码执行顺序是:notifyListAdd()
-> notifyListNotifyOne()
-> notifyListWait()
。
这种情况下,函数遍历链表,但没有找到拿着对应ticket的goroutine。但是不要担心,notifyListWait()
会在goroutine调用它时处理这种情况。
还记得前面提到的一次重要检查吗?在notifyListWait()
里:if less(t, l.notify) {...}
。
这项检查很重要,因为它允许一个ticket号小于l.notify
的goroutine立刻就绪。此时,goroutine跳过等待阶段,直接访问共享资源。
所以,即使goroutine还没有加入链表,只要它拿着有效的ticket,它就也能被通知到。这让整个设计变得非常丝滑,每个goroutine都能立刻拿到ticket,不需要额外等待。一切都不会被阻塞。
notifyListNotifyAll()
现在我们来看最后一部分,Broadcast
或者notifyListNoftifyAll()
。和notifyListNotifyOne()
相比,这个函数要简单得多:
func notifyListNotifyAll(l *notifyList) {
// 快速检查:没人等待时立刻退出
if l.wait.Load() == atomic.Load(&l.notify) {
return
}
lockWithRank(&l.lock, lockRankNotifyList)
s := l.head
l.head = nil
l.tail = nil
atomic.Store(&l.notify, l.wait.Load())
unlock(&l.lock)
// 将链表中的所有goroutine标记为就绪
for s != nil {
next := s.next
s.next = nil
readyWithTime(s, 4)
s = next
}
}
这段代码特简单,我想你已经看明白了。基本上,Broadcast()
把链表中所有的goroutine标记为就绪,然后清空整个链表。
最后我们用一个警告来结尾:我们很容易就会误用sync.Cond
,引入一些棘手的,很难debug的问题。了解了技术实现后,我建议看看工程师角度的讨论:proposal:sync:remove the Cond type