sync.Cond:被忽视的同步机制
Golang条件变量的概念在过去很长一段时间一直困扰着我,好像它能派上用场的地方,用经典的channel也没有问题。VictoriaMetrics这篇post很大程度上释清了我的困惑,我将它翻译过来,希望对其它Golang新手有所帮助。
原帖见 Go sync.Cond, the Most Overlooked Sync Mechanism。
文中对
Signal()原理的阐述相当精彩。BTW,我目前要做的一些开发工作和 VictoriaMetrics 性能监控强相关,未来会写一些 VM 和 Prometheus 的介绍博客也说不定……
sync.Cond 是 Go 语言的一种同步原语,但不像 sync.Mutex 或 sync.WaitGroup 那样被广泛使用。我们一般很少在项目代码,甚至标准库代码中用到它,反而倾向使用别的同步机制替代。
尽管如此,作为一个 Go 开发工程师,你应该不太会想在读用了 sync.Cond 的代码时理解困难,毕竟它也是标准库的一部分呀。
那么这篇文章就能帮你弭平差距,甚至让你清晰地理解条件变量实际上是怎么工作的。
什么是 sync.Cond?
现在我们开始拆解 sync.Cond 。
当一个 goroutine 需要等待特定事件发生,比如某些共享数据的更新,它可以 **“阻塞”**,也就是暂停工作,直到得到了恢复运行的消息。要实现这一点,最基本的方法是使用循环,或许再加一句 time.Sleep ,防止 CPU 大量空转。
这种方法大概是这样的:
// 等待直到条件为true
for !condition {
}
// 或者这样
for !condition {
time.Sleep(100 * time.Millisecond)
}但这样做其实不是特别高效,因为循环仍然在后台运行,甚至在无事发生的时候还在占用着 CPU 循环。
所以我们就需要使用 sync.Cond ,它提供了一种更好的手段。如果我们更学术一点,我们应该称其为 “条件变量”。
- 当一个 goroutine 在等待某事发生时(等待特定条件变为真),它可以调用
Wait()。 - 另一个 goroutine,一旦发现条件被满足,可以调用
Signal()或Broadcast()来唤醒等待的 goroutine,让它们继续工作。
以下是 sync.Cond 提供的基本接口:
// 在条件满足之前阻塞调用它的goroutine
func (c *Cond) Wait() {}
// 如果有的话,唤醒一个正在等待的goroutine
func (c *Cond) Signal() {}
// 唤醒所有正在等待的goroutine
func (c *Cond) Broadcast() {}
来看一个例子。假设我们在等待一种特定的宝可梦,它出现的时候我们想要提醒别的 goroutine。
var pokemonList = []string{"Pikachu", "Charmander", "Squirtle", "Bulbasaur", "Jigglypuff"}
var cond = sync.NewCond(&sync.Mutex{})
var pokemon = ""
func main() {
// 消费者
go func() {
cond.L.Lock()
defer cond.L.Unlock()
// 一直等到皮卡丘出现
for pokemon != "Pikachu" {
cond.Wait()
}
println("Caught" + pokemon)
pokemon = ""
}()
// 生产者
go func() {
// 每1ms随机出现一只宝可梦
for i := 0; i < 100; i++ {
time.Sleep(time.Millisecond)
cond.L.Lock()
pokemon = pokemonList[rand.Intn(len(pokemonList))]
cond.L.Unlock()
cond.Signal()
}
}()
time.Sleep(100 * time.Millisecond) // lazy wait
}
// Output:
// Caught Pikachu在这个例子里,一个 goroutine 在等待皮卡丘出现,另一个从列表里随机选择一只宝可梦,并告知消费者。
当生产者发送信号时,消费者被唤醒,并检查是不是出现的要等待的那只宝可梦。如果是,我们就抓住它,反之我们继续等待下一只出现。
问题是,在生产者发送信号和消费者被唤醒之间是有一个时间差的。此时,出现的宝可梦可能改变,因为消费者被唤醒花费的时间可能多于 1ms,又或者别的 goroutine 改变了共享的宝可梦数据。因此, sync.Cond 实际上再说:“嘿!有事情发生啦!快醒过来看看,但你要是赖床,事情可能又要变啦。”
要是消费者被唤醒得太迟,宝可梦可能会逃跑,它就不得不重新阻塞。
可我明明可以用 channel 向其他的 goroutine 发送宝可梦名称或者信号呀
这当然没问题。事实上,channel 常被用来替代 sync.Go ,就是因为它更简单、更惯用,大多数开发者更熟悉。
在上面的例子里,你可以简单实用一个 channel 发送宝可梦的名字,或者用一个空结构体 struct{} 发送信号,而不传递任何数据。但我们的问题不只是用 channel 传递消息,而是如何处理共享状态。
我们的例子很简单,但要是多个 goroutine 同时访问共享的宝可梦变量,使用 channel 会导致什么呢?
- 要是我们用 channel 传递宝可梦名称,我们就还要用一把互斥锁保护共享变量。
- 如果我们仅使用 channel 发送信号,我们仍然需要互斥锁管理共享状态。
- 如果我们在生产者里确认宝可梦是皮卡丘,再把它发送给 channel,我们还是需要互斥锁。更何况,这种做法违反了分工解耦的设计原则:生产者在处理属于消费者的逻辑。
综上,只要是多个 goroutine 同时改变共享数据的场景,我们就需要互斥锁来保护共享状态。所以我们经常看到 channel 和 mutex 搭配使用的方式,以确保正确同步和数据安全。
行吧,那广播信号又是怎么一回事呢?
好问题!你确实可以通过关闭 channel 的方式( close(ch) )广播信号给所有等待的 goroutine。当你这样做的时候,所有从该 channel 接收消息的 goroutine 都会得到提醒。但当心,被关闭的 channel 是不能复用的,一旦关闭就永远关闭了。
BTW,在 Go 2 里面一直有移除 sync.Cond 的讨论:proposal: sync: remove the Cond type。
那既然如此,
sync.Cond到底擅长做什么呢?
在一些场景里, sync.Cond 会比 channel 更合适:
- 你可以用 channel 给 goroutine 发送信号(给 channel 传值),或者通知所有的 goroutine(关闭 channel),但你不能同时做这两件事。
sync.Cond支持更细粒度的控制。你能用Signal()唤醒单独的 goroutine,或者用Broadcast()唤醒全部 goroutine。 - 此外,你还可以无限次使用
Broadcast(),但你只能关闭一次 channel,再关闭一次就 panic 了。 - channel 不支持保护共享数据,你必须额外使用互斥锁。但
sync.Cond却在同一个包里继承了锁机制和信号机制(性能也更好)。
为什么
sync.Cond要内嵌一个锁?
理论上, sync.Cond 这样的条件变量没必要附加锁。
用户完全可以在条件变量之外自行管理锁,这样听起来更灵活。这样设计并非出自技术限制,而是为了避免人为错误。
人为管理锁很容易出错,因为条件变量的使用方式不直观:你得先解锁,再 Wait() ,然后在 goroutine 被唤醒时再拿锁。这个过程有几分尴尬,而且容易出错(忘了拿锁,或者在错误的时机解锁)。
但为什么这种锁机制看上去这么古怪呢?
通常,调用 cond.Wait() 的 goroutine 需要在循环中检查一些贡献状态,比如:
for !checkSomeSharedState() {
cond.Wait()
}把锁嵌入 sync.Cond 能帮我们处理前述加锁 / 解锁的过程,代码会更清晰,不容易出错,后文我们会详细讨论。
怎么使用 sync.Cond?
如果你认真看了前面的例子,不难发现消费者里的使用模式:我们总是获取互斥锁后再等待( .Wait() )条件满足;我们总在条件满足后再解锁。
除此之外,我们把等待条件写进循环里:
// 消费者
go func() {
cond.L.Lock()
defer cond.L.Unlock()
// 等待皮卡丘出现
for pokemon != "Pikachu" {
cond.Wait()
}
println("Caught" + pokemon)
}()Cond.Wait()
当我们调用 sync.Cond 的 Wait() 方法时,我们是在命令当前 goroutine 阻塞直至某个条件满足。
这幕后其实发生了这些事:
- goroutine 被添加到一个等待列表中(记录了等待相同条件的 goroutine),这些 goroutine 都被阻塞,也即,除非被
Singnal()或Broadcast()唤醒,不得继续工作。 - 关键部分:互斥锁必须在
Wait()之前上锁,因为Wait()会在 goroutine 沉睡前自动释放锁(调用Unlock())。这将允许其它 goroutine 获取锁,并在之前 goroutine 等待时完成它们的工作。 - 当 goroutine 被唤醒时(通过
Signal()或Broadcast()),它会先重新获取锁(Lock()),再工作。

Wait() 底层是这样工作的:
func (c *Cond) Wait() {
// 检查Cond实例是否有被复制
c.checker.check()
// 获取ticket
t := runtime_notifyListAdd(&c.notify)
// 解锁
c.L.Unlock()
// 阻塞goroutine
runtime_notifyListWait(&c.notify, t)
// 重新拿锁
c.L.Lock()
}尽管这个过程不难,我们还是总结出了 4 点:
- checker 用于防止
Cond实例被复制,如果这样,它会 panic。 - 调用
cond.Wait()会立即释放互斥锁,故互斥锁在cond.Wait()被调用之前必须被获取到,否则会 panic。 - 在被唤醒之后,
cond.Wait()会重新获取互斥锁,这表明完成了对共享数据的操作后需要再次将之解锁。 sync.Cond的大部分功能都基于内部数据结构notifyList实现,这个结构使用基于 ticket 的通知方式。
因为存在这种加锁 / 解锁的行为,你在使用 sync.Cond.Wait() 时必须遵从特定规范以免出错:
c.L.Lock()
for !condition() {
c.Wait()
}
// 基于条件执行任务
c.L.Unlock()
为什么不直接使用
c.Wait(),非要在循环里面用?
当 Wait() 返回时,我们不能假定等待的条件立刻为真。在我们的 goroutine 被唤醒时,别的 goroutine 可能已经操作了共享状态,导致条件为假。因此,为了应付这种情况,我们把 Wait() 放进循环。
在宝可梦的例子里,我们也提及过这个问题。
循环持续检查条件,只有条件为真时 goroutine 才能跳出来。
Cond.Signal() & Cond.Broadcast()
Signal() 用于唤醒一个正在等待条件变量的 goroutine。
- 如果没有在等待的 goroutine,
Signal()不会做任何操作。 - 如果有 goroutine 等待,
Signal()会唤醒队列中的第一个 goroutine。所以,如果你启动了大量 goroutine,从 0 到 n,只有第 0 个会被Signal()唤醒。
我们过一个例子:
func main() {
cond := sync.NewCond(&sync.Mutex{})
for i := range 10 {
go func(i int) {
cond.L.Lock()
defer cond.L.Unlock()
cond.Wait()
fmt.Println(i)
}(i)
time.Sleep(time.Millisecond)
}
time.Sleep(100 * time.Millisecond) // 等待goroutine启动
cond.Signal()
time.Sleep(100 * time.Millisecond) // 等待goroutine被唤醒
}
// Output:
// 0Signal() 被用来唤醒一个 goroutine,告诉它条件可能被满足。它的实现是这样的:
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}你不需要在调用 Signal() 之前拿锁,但这么做通常是个好主意,尤其是你想修改共享数据,而这份数据又在被并发访问的情况下。
那 cond.Broadcast() 呢?
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}Broadcast() 被调用时唤醒所有的 goroutine,将它们从队列清除。它的内部逻辑很简单,隐藏在 runtime_notifyListNotifyAll() 里。
func main() {
cond := sync.NewCond(&sync.Mutex{})
for i := range 10 {
go func(i int) {
cond.L.Lock()
defer cond.L.Unlock()
cond.Wait()
fmt.Println(i)
}(i)
}
time.Sleep(100 * time.Millisecond) // 等待goroutine启动
cond.Broadcast()
time.Sleep(100 * time.Millisecond) // 等待goroutine被唤醒
}
// Output:
// 8
// 6
// 3
// 2
// 4
// 5
// 1
// 0
// 9
// 7此时,所有的 goroutine 在 100 毫秒内被唤醒,但顺序无法保证。
当 Broadcast() 被调用时,它把所有等待的 goroutine 标记为准备运行,但它们不会立刻启动,而是被 Go 调度器的底层算法选择,其顺序是不可预测的。
内部原理
我们所有的 Go 博客里都有一节内部原理。理解设计思路和其解决的问题通常很有意义。
Copy checker
sync 包里的 copyChecker 被用来检查 Cond 对象在初次使用后被复制的情况。这里 “初次使用” 包括 Wait() 、 Signal() 以及 Broadcast() 。
如果 Cond 在这些操作后被复制,程序会 panic:“sync.Cond is copied”。
在 sync.WaitGroup 和 sync.Pool 里也有类似的设计,它们使用 noCopy 字段防止复制,但这些例子里不会有 panic 发生。
此处的 copyChecker 事实上只是一个 uintptr ,即保存内存地址的整型,它是这样工作的:
- 初次使用
sync.Cond之后,copyChecker保存它自己的内存地址,指向cond.copyChecker对象。 - 如果该对象被拷贝,
copyChecker的地址(&cond.copyChecker)会发生变化(因为新的拷贝位于不同的内存地址),但copyChecker包含的uintptr不会变。
这里的检查很简单:对比 copyChecker 地址和 uintptr 保存的值,要是不同就 panic。
即使逻辑很简单,如果不熟悉 Go 的原子操作和 unsafe 包的话,其实现看起来会有点古怪:
// copyChecker 保存指向自己的指针来检查对象拷贝
type copyChecker uintptr
func (c *copyChecker) check() {
if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
uintptr(*c) != uintptr(unsafe.Pointer(c)) {
panic("sync.Cond is copied")
}
}由于第一项和最后一项检查基本相同,我们可以将上述代码拆解成两个主要的检查步骤。
第一项检查, uintptr(*c) != uintptr(unsafe.Pointer(c)) ,确认内存地址是否发生变化,如果是,则对象有被拷贝。但这里有一个问题,如果 copyChecker 是第一次被使用,由于还未初始化,两边都是零值。
第二项检查, !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) ,使用 CAS 操作同时处理初始化和检查任务:
- 如果 CAS 成功,说明
copyChecker刚刚被初始化,故对象还没有被复制。 - 如果 CAS 失败,说明
copyChecker已经初始化,那么我们就需要进行最终检查uintptr(*c) != uintptr(unsafe.Pointer(c)),以确认对象未被拷贝。
最后一次检查(和第一次检查完全一致),确保对象在上面的操作之后也没有被复制。
为什么最后还有一次检查?两次检查还不够吗?
第三次检查的原因是前两次检查操作不是原子性的。

如果检查时 copyChecker 是第一次使用,此时它还没有被初始化,为零值。如果这样,前两次检查就会错误通过,尽管对象没有被拷贝,但它也没有被初始化。
notifyList:基于 tick 的提醒列表
除了锁和复制检查机制, sync.Cond 的另一个重要部分就是 notifyList 。
type Cond struct {
noCopy noCopy
L Locker
notify notifyList
checker copyChecker
}
type notifyList struct {
wait uint32
notify uint32
lock uintptr
head unsafe.Pointer
tail unsafe.Pointer
}如今,sync 包和运行时包的 notifyList 不一样,但使用了相同的名字和内存布局(这是故意为之)。为了理解其运行逻辑,我们得看看运行时包中的版本:
type notifyList struct {
wait atomic.Uint32
notify uint32
lock mutex
head *sudog
tail *sudog
}如果你看到 head 和 tail,你大概会猜这是某种链表的实现,那你就猜对了。这是一个 sudog 链表(含义是伪 goroutine,pseudo-goroutine),代表着等待同步事件的 goroutine,比如等待从 channel 收发数据,等待一个条件变量。

head 和 tail 是链表中首尾两个 goroutine。同时, wait 和 notify 字段是持续增长的 ticket 号,代表队列中的一个位置。
wait:代表要发放给等待状态 goroutine 的下一个 ticket。notify:代表下一个应该被通知,或者说唤醒的位置。
这就是 notifyList 背后的核心思想了,结合起来之后是这样的。
notifyListAdd()
当一个 goroutine 将要等待一次通知时,它会调用 notifyListAdd() 来获取 ticket。
func (c *Cond) Wait() {
c.checker.check()
// 获取ticket
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
// 将goroutine加入队列并阻塞
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
func notifyListAdd(l *notifyList) uint32 {
return l.wait.Add(1) - 1
}ticket 分配由一个原子计数器负责。因此,当一个 goroutine 的调用 notifyListAdd() 时,计数器递增,goroutine 会拿到下一个可用的 ticket 号。
每一个 goroutine 都能拿到一个唯一的 ticket 号,这一过程无需任何锁。这也意味着多个 goroutine 能够同时请求 ticket,无需等待。
例如,如果 ticket 计数器当前指向 5,调用 notifyListAdd() 的 goroutine 会拿到 ticket 号 5,计数器会自增到 6,等待下一次调用。 wait 字段总是指向下一个被发放的 ticket 号。
但这里事情稍微有些绕了。
由于多个 goroutine 能够同时获取 ticket,它们调用 notifyListAdd() 和进入 notifyListWait() 的时机之间有些微间隙。所以尽管 ticket 号是递增发放的,它们的顺序也不能被保证。goroutine 加入链表的顺序可能是 1, 2, 3 ,但最后的顺序可能是 3, 2, 1 或者 2, 1, 3 ,这都取决于间隔时间。

拿到 ticket 之后,goroutine 的下一步就是等待被通知。这发生在 goroutine 调用 notifyListWait(t) 之时, t 就是它刚刚获取的 ticket 号。
func notifyListWait(l *notifyList, t uint32) {
lockWithRank(&l.lock, lockRankNotifyList)
// 如果这张ticket已经被通知,立刻返回
if less(t, l.notify) {
unlock(&l.lock)
return
}
// 加入链表
s := acquireSudog()
...
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
goparkunlock(&l.lock, waitReasonSyncCondWait, traceBlockCondWait, 3)
...
releaseSudog(s)
}首先,goroutine 检查自己的 ticket 是不是已经被通知。
它对比自己的 ticket( t )和当前的通知号。如果通知号已经大于自己的 ticket,他就没必要等待了,会直接跳转到共享资源开始工作。
这种快速检查很重要,特别是在我们分开 Signal() 和 Broadcast() 的情况下。但如果 goroutine 的 ticket 还没被通知,它就把自己加入链表,进入睡眠状态,直到被通知。
notifyListNotifyOne()
当需要通知等待中的 goroutine 时,系统从当前未被通知的最小 ticket 号开始,这由 l.notify 追踪。
func notifyListNotifyOne(l *notifyList) {
// 快速检查:要是没人等待,直接退出
if l.wait.Load() == atomic.Load(&l.notify) {
return
}
lockWithRank(&l.lock, lockRankNotifyList)
// 再次检查,确保真的需要执行后续逻辑
t := l.notify
if t == l.wait.Load() {
unlock(&l.lock)
return
}
// 更新到下一个需要通知的ticket
atomic.Store(&l.notify, t+1)
// 在链表中找到对应的goroutine
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
if s.ticket == t {
// 找到了匹配的goroutine
n := s.next
if p != nil {
p.next = n
} else {
l.head = n
}
if n == nil {
l.tail = p
}
unlock(&l.lock)
s.next = nil
readyWithTime(s, 4) // 标记goroutine为就绪状态
return
}
}
unlock(&l.lock)
}还记得我们说过 ticket 不保证顺序?
我们可能有 goroutine 的 ticket 分别是 2, 1, 3 ,但通知号总是顺序递增的。所以,当系统要唤醒一个 goroutine 时,就需要遍历链表,找到拿着下一张 ticket 的 goroutine。一旦找到了,它就把它从链表中移除,将之标记为就绪状态。
但这里还有些有趣的地方,有时候这里还会有时机问题。假设一个 goroutine 拿到了 ticket,但是在这个函数运行时还没有被加入链表。
接下来会发生什么?比如,代码执行顺序是: notifyListAdd() -> notifyListNotifyOne() -> notifyListWait() 。
这种情况下,函数遍历链表,但没有找到拿着对应 ticket 的 goroutine。但是不要担心, notifyListWait() 会在 goroutine 调用它时处理这种情况。

还记得前面提到的一次重要检查吗?在 notifyListWait() 里: if less(t, l.notify) {...} 。
这项检查很重要,因为它允许一个 ticket 号小于 l.notify 的 goroutine 立刻就绪。此时,goroutine 跳过等待阶段,直接访问共享资源。
所以,即使 goroutine 还没有加入链表,只要它拿着有效的 ticket,它就也能被通知到。这让整个设计变得非常丝滑,每个 goroutine 都能立刻拿到 ticket,不需要额外等待。一切都不会被阻塞。
notifyListNotifyAll()
现在我们来看最后一部分, Broadcast 或者 notifyListNoftifyAll() 。和 notifyListNotifyOne() 相比,这个函数要简单得多:
func notifyListNotifyAll(l *notifyList) {
// 快速检查:没人等待时立刻退出
if l.wait.Load() == atomic.Load(&l.notify) {
return
}
lockWithRank(&l.lock, lockRankNotifyList)
s := l.head
l.head = nil
l.tail = nil
atomic.Store(&l.notify, l.wait.Load())
unlock(&l.lock)
// 将链表中的所有goroutine标记为就绪
for s != nil {
next := s.next
s.next = nil
readyWithTime(s, 4)
s = next
}
}这段代码特简单,我想你已经看明白了。基本上, Broadcast() 把链表中所有的 goroutine 标记为就绪,然后清空整个链表。
最后我们用一个警告来结尾:我们很容易就会误用 sync.Cond ,引入一些棘手的,很难 debug 的问题。了解了技术实现后,我建议看看工程师角度的讨论:proposal:sync:remove the Cond type