sync.Cond:被忽视的同步机制

Golang条件变量的概念在过去很长一段时间一直困扰着我,好像它能派上用场的地方,用经典的channel也没有问题。VictoriaMetrics这篇post很大程度上释清了我的困惑,我将它翻译过来,希望对其它Golang新手有所帮助。

Falldio深圳

原帖见 Go sync.Cond, the Most Overlooked Sync Mechanism

文中对 Signal() 原理的阐述相当精彩。

BTW,我目前要做的一些开发工作和 VictoriaMetrics 性能监控强相关,未来会写一些 VM 和 Prometheus 的介绍博客也说不定……

sync.Cond 是 Go 语言的一种同步原语,但不像 sync.Mutexsync.WaitGroup 那样被广泛使用。我们一般很少在项目代码,甚至标准库代码中用到它,反而倾向使用别的同步机制替代。

尽管如此,作为一个 Go 开发工程师,你应该不太会想在读用了 sync.Cond 的代码时理解困难,毕竟它也是标准库的一部分呀。

那么这篇文章就能帮你弭平差距,甚至让你清晰地理解条件变量实际上是怎么工作的。

什么是 sync.Cond?

现在我们开始拆解 sync.Cond

当一个 goroutine 需要等待特定事件发生,比如某些共享数据的更新,它可以 **“阻塞”**,也就是暂停工作,直到得到了恢复运行的消息。要实现这一点,最基本的方法是使用循环,或许再加一句 time.Sleep ,防止 CPU 大量空转。

这种方法大概是这样的:

go
// 等待直到条件为true
for !condition {
}

// 或者这样
for !condition {
    time.Sleep(100 * time.Millisecond)
}

但这样做其实不是特别高效,因为循环仍然在后台运行,甚至在无事发生的时候还在占用着 CPU 循环。

所以我们就需要使用 sync.Cond ,它提供了一种更好的手段。如果我们更学术一点,我们应该称其为 “条件变量”。

  • 当一个 goroutine 在等待某事发生时(等待特定条件变为真),它可以调用 Wait()
  • 另一个 goroutine,一旦发现条件被满足,可以调用 Signal()Broadcast() 来唤醒等待的 goroutine,让它们继续工作。

以下是 sync.Cond 提供的基本接口:

go
// 在条件满足之前阻塞调用它的goroutine
func (c *Cond) Wait() {}

// 如果有的话,唤醒一个正在等待的goroutine
func (c *Cond) Signal() {}

// 唤醒所有正在等待的goroutine
func (c *Cond) Broadcast() {}

来看一个例子。假设我们在等待一种特定的宝可梦,它出现的时候我们想要提醒别的 goroutine。

go
var pokemonList = []string{"Pikachu", "Charmander", "Squirtle", "Bulbasaur", "Jigglypuff"}
var cond = sync.NewCond(&sync.Mutex{})
var pokemon = ""

func main() {
	// 消费者
	go func() {
		cond.L.Lock()
		defer cond.L.Unlock()

		// 一直等到皮卡丘出现
		for pokemon != "Pikachu" {
			cond.Wait()
		}
		println("Caught" + pokemon)
		pokemon = ""
	}()

    // 生产者
	go func() {
		// 每1ms随机出现一只宝可梦
		for i := 0; i < 100; i++ {
			time.Sleep(time.Millisecond)

			cond.L.Lock()
			pokemon = pokemonList[rand.Intn(len(pokemonList))]
			cond.L.Unlock()

			cond.Signal()
		}
	}()

	time.Sleep(100 * time.Millisecond) // lazy wait
}

// Output:
// Caught Pikachu

在这个例子里,一个 goroutine 在等待皮卡丘出现,另一个从列表里随机选择一只宝可梦,并告知消费者。

当生产者发送信号时,消费者被唤醒,并检查是不是出现的要等待的那只宝可梦。如果是,我们就抓住它,反之我们继续等待下一只出现。

问题是,在生产者发送信号和消费者被唤醒之间是有一个时间差的。此时,出现的宝可梦可能改变,因为消费者被唤醒花费的时间可能多于 1ms,又或者别的 goroutine 改变了共享的宝可梦数据。因此, sync.Cond 实际上再说:“嘿!有事情发生啦!快醒过来看看,但你要是赖床,事情可能又要变啦。”

要是消费者被唤醒得太迟,宝可梦可能会逃跑,它就不得不重新阻塞。

可我明明可以用 channel 向其他的 goroutine 发送宝可梦名称或者信号呀

这当然没问题。事实上,channel 常被用来替代 sync.Go ,就是因为它更简单、更惯用,大多数开发者更熟悉。

在上面的例子里,你可以简单实用一个 channel 发送宝可梦的名字,或者用一个空结构体 struct{} 发送信号,而不传递任何数据。但我们的问题不只是用 channel 传递消息,而是如何处理共享状态。

我们的例子很简单,但要是多个 goroutine 同时访问共享的宝可梦变量,使用 channel 会导致什么呢?

  • 要是我们用 channel 传递宝可梦名称,我们就还要用一把互斥锁保护共享变量。
  • 如果我们仅使用 channel 发送信号,我们仍然需要互斥锁管理共享状态。
  • 如果我们在生产者里确认宝可梦是皮卡丘,再把它发送给 channel,我们还是需要互斥锁。更何况,这种做法违反了分工解耦的设计原则:生产者在处理属于消费者的逻辑。

综上,只要是多个 goroutine 同时改变共享数据的场景,我们就需要互斥锁来保护共享状态。所以我们经常看到 channel 和 mutex 搭配使用的方式,以确保正确同步和数据安全。

行吧,那广播信号又是怎么一回事呢?

好问题!你确实可以通过关闭 channel 的方式( close(ch) )广播信号给所有等待的 goroutine。当你这样做的时候,所有从该 channel 接收消息的 goroutine 都会得到提醒。但当心,被关闭的 channel 是不能复用的,一旦关闭就永远关闭了。

BTW,在 Go 2 里面一直有移除 sync.Cond 的讨论:proposal: sync: remove the Cond type

那既然如此, sync.Cond 到底擅长做什么呢?

在一些场景里, sync.Cond 会比 channel 更合适:

  1. 你可以用 channel 给 goroutine 发送信号(给 channel 传值),或者通知所有的 goroutine(关闭 channel),但你不能同时做这两件事。 sync.Cond 支持更细粒度的控制。你能用 Signal() 唤醒单独的 goroutine,或者用 Broadcast() 唤醒全部 goroutine。
  2. 此外,你还可以无限次使用 Broadcast() ,但你只能关闭一次 channel,再关闭一次就 panic 了。
  3. channel 不支持保护共享数据,你必须额外使用互斥锁。但 sync.Cond 却在同一个包里继承了锁机制和信号机制(性能也更好)。

为什么 sync.Cond 要内嵌一个锁?

理论上, sync.Cond 这样的条件变量没必要附加锁。

用户完全可以在条件变量之外自行管理锁,这样听起来更灵活。这样设计并非出自技术限制,而是为了避免人为错误。

人为管理锁很容易出错,因为条件变量的使用方式不直观:你得先解锁,再 Wait() ,然后在 goroutine 被唤醒时再拿锁。这个过程有几分尴尬,而且容易出错(忘了拿锁,或者在错误的时机解锁)。

但为什么这种锁机制看上去这么古怪呢?

通常,调用 cond.Wait() 的 goroutine 需要在循环中检查一些贡献状态,比如:

go
for !checkSomeSharedState() {
    cond.Wait()
}

把锁嵌入 sync.Cond 能帮我们处理前述加锁 / 解锁的过程,代码会更清晰,不容易出错,后文我们会详细讨论。

怎么使用 sync.Cond?

如果你认真看了前面的例子,不难发现消费者里的使用模式:我们总是获取互斥锁后再等待( .Wait() )条件满足;我们总在条件满足后再解锁。

除此之外,我们把等待条件写进循环里:

go
// 消费者
go func() {
    cond.L.Lock()
    defer cond.L.Unlock()

    // 等待皮卡丘出现
    for pokemon != "Pikachu" {
        cond.Wait()
    }
    println("Caught" + pokemon)
}()

Cond.Wait()

当我们调用 sync.CondWait() 方法时,我们是在命令当前 goroutine 阻塞直至某个条件满足。

这幕后其实发生了这些事:

  1. goroutine 被添加到一个等待列表中(记录了等待相同条件的 goroutine),这些 goroutine 都被阻塞,也即,除非被 Singnal()Broadcast() 唤醒,不得继续工作。
  2. 关键部分:互斥锁必须在 Wait() 之前上锁,因为 Wait() 会在 goroutine 沉睡前自动释放锁(调用 Unlock() )。这将允许其它 goroutine 获取锁,并在之前 goroutine 等待时完成它们的工作。
  3. 当 goroutine 被唤醒时(通过 Signal()Broadcast() ),它会先重新获取锁( Lock() ),再工作。

Wait() 底层是这样工作的:

go
func (c *Cond) Wait() {
	// 检查Cond实例是否有被复制
	c.checker.check()

	// 获取ticket
	t := runtime_notifyListAdd(&c.notify)

	// 解锁
	c.L.Unlock()

	// 阻塞goroutine
	runtime_notifyListWait(&c.notify, t)

	// 重新拿锁
	c.L.Lock()
}

尽管这个过程不难,我们还是总结出了 4 点:

  1. checker 用于防止 Cond 实例被复制,如果这样,它会 panic。
  2. 调用 cond.Wait() 会立即释放互斥锁,故互斥锁在 cond.Wait() 被调用之前必须被获取到,否则会 panic。
  3. 在被唤醒之后, cond.Wait() 会重新获取互斥锁,这表明完成了对共享数据的操作后需要再次将之解锁。
  4. sync.Cond 的大部分功能都基于内部数据结构 notifyList 实现,这个结构使用基于 ticket 的通知方式。

因为存在这种加锁 / 解锁的行为,你在使用 sync.Cond.Wait() 时必须遵从特定规范以免出错:

go
c.L.Lock()
for !condition() {
	c.Wait()
}
// 基于条件执行任务
c.L.Unlock()

为什么不直接使用 c.Wait() ,非要在循环里面用?

Wait() 返回时,我们不能假定等待的条件立刻为真。在我们的 goroutine 被唤醒时,别的 goroutine 可能已经操作了共享状态,导致条件为假。因此,为了应付这种情况,我们把 Wait() 放进循环。

在宝可梦的例子里,我们也提及过这个问题。

循环持续检查条件,只有条件为真时 goroutine 才能跳出来。

Cond.Signal() & Cond.Broadcast()

Signal() 用于唤醒一个正在等待条件变量的 goroutine。

  • 如果没有在等待的 goroutine, Signal() 不会做任何操作。
  • 如果有 goroutine 等待, Signal() 会唤醒队列中的第一个 goroutine。所以,如果你启动了大量 goroutine,从 0 到 n,只有第 0 个会被 Signal() 唤醒。

我们过一个例子:

go
func main() {
	cond := sync.NewCond(&sync.Mutex{})
	for i := range 10 {
		go func(i int) {
			cond.L.Lock()
			defer cond.L.Unlock()
			cond.Wait()

			fmt.Println(i)
		}(i)

		time.Sleep(time.Millisecond)
	}

	time.Sleep(100 * time.Millisecond) // 等待goroutine启动
	cond.Signal()
	time.Sleep(100 * time.Millisecond) // 等待goroutine被唤醒
}

// Output:
// 0

Signal() 被用来唤醒一个 goroutine,告诉它条件可能被满足。它的实现是这样的:

go
func (c *Cond) Signal() {
	c.checker.check()
	runtime_notifyListNotifyOne(&c.notify)
}

你不需要在调用 Signal() 之前拿锁,但这么做通常是个好主意,尤其是你想修改共享数据,而这份数据又在被并发访问的情况下。

cond.Broadcast() 呢?

go
func (c *Cond) Broadcast() {
	c.checker.check()
	runtime_notifyListNotifyAll(&c.notify)
}

Broadcast() 被调用时唤醒所有的 goroutine,将它们从队列清除。它的内部逻辑很简单,隐藏在 runtime_notifyListNotifyAll() 里。

go
func main() {
	cond := sync.NewCond(&sync.Mutex{})
	for i := range 10 {
		go func(i int) {
			cond.L.Lock()
			defer cond.L.Unlock()

			cond.Wait()
			fmt.Println(i)
		}(i)
	}

	time.Sleep(100 * time.Millisecond) // 等待goroutine启动
	cond.Broadcast()
	time.Sleep(100 * time.Millisecond) // 等待goroutine被唤醒
}

// Output:
// 8
// 6
// 3
// 2
// 4
// 5
// 1
// 0
// 9
// 7

此时,所有的 goroutine 在 100 毫秒内被唤醒,但顺序无法保证。

Broadcast() 被调用时,它把所有等待的 goroutine 标记为准备运行,但它们不会立刻启动,而是被 Go 调度器的底层算法选择,其顺序是不可预测的。

内部原理

我们所有的 Go 博客里都有一节内部原理。理解设计思路和其解决的问题通常很有意义。

Copy checker

sync 包里的 copyChecker 被用来检查 Cond 对象在初次使用后被复制的情况。这里 “初次使用” 包括 Wait()Signal() 以及 Broadcast()

如果 Cond 在这些操作后被复制,程序会 panic:“sync.Cond is copied”。

sync.WaitGroupsync.Pool 里也有类似的设计,它们使用 noCopy 字段防止复制,但这些例子里不会有 panic 发生。

Go sync.WaitGroup and The Alignment Problem

Go sync.Pool and the Mechanics Behind it

此处的 copyChecker 事实上只是一个 uintptr ,即保存内存地址的整型,它是这样工作的:

  • 初次使用 sync.Cond 之后, copyChecker 保存它自己的内存地址,指向 cond.copyChecker 对象。
  • 如果该对象被拷贝, copyChecker 的地址( &cond.copyChecker )会发生变化(因为新的拷贝位于不同的内存地址),但 copyChecker 包含的 uintptr 不会变。

这里的检查很简单:对比 copyChecker 地址和 uintptr 保存的值,要是不同就 panic。

即使逻辑很简单,如果不熟悉 Go 的原子操作和 unsafe 包的话,其实现看起来会有点古怪:

go
// copyChecker 保存指向自己的指针来检查对象拷贝
type copyChecker uintptr

func (c *copyChecker) check() {
	if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
		!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
		uintptr(*c) != uintptr(unsafe.Pointer(c)) {
		panic("sync.Cond is copied")
	}
}

由于第一项和最后一项检查基本相同,我们可以将上述代码拆解成两个主要的检查步骤。

第一项检查, uintptr(*c) != uintptr(unsafe.Pointer(c)) ,确认内存地址是否发生变化,如果是,则对象有被拷贝。但这里有一个问题,如果 copyChecker 是第一次被使用,由于还未初始化,两边都是零值。

第二项检查, !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) ,使用 CAS 操作同时处理初始化和检查任务:

  • 如果 CAS 成功,说明 copyChecker 刚刚被初始化,故对象还没有被复制。
  • 如果 CAS 失败,说明 copyChecker 已经初始化,那么我们就需要进行最终检查 uintptr(*c) != uintptr(unsafe.Pointer(c)) ,以确认对象未被拷贝。

最后一次检查(和第一次检查完全一致),确保对象在上面的操作之后也没有被复制。

为什么最后还有一次检查?两次检查还不够吗?

第三次检查的原因是前两次检查操作不是原子性的。

如果检查时 copyChecker 是第一次使用,此时它还没有被初始化,为零值。如果这样,前两次检查就会错误通过,尽管对象没有被拷贝,但它也没有被初始化。

notifyList:基于 tick 的提醒列表

除了锁和复制检查机制, sync.Cond 的另一个重要部分就是 notifyList

go
type Cond struct {
	noCopy noCopy
	L Locker

	notify  notifyList

	checker copyChecker
}

type notifyList struct {
	wait   uint32
	notify uint32
	lock   uintptr
	head   unsafe.Pointer
	tail   unsafe.Pointer
}

如今,sync 包和运行时包的 notifyList 不一样,但使用了相同的名字和内存布局(这是故意为之)。为了理解其运行逻辑,我们得看看运行时包中的版本:

go
type notifyList struct {
	wait atomic.Uint32
	notify uint32

	lock mutex

	head *sudog
	tail *sudog
}

如果你看到 head 和 tail,你大概会猜这是某种链表的实现,那你就猜对了。这是一个 sudog 链表(含义是伪 goroutine,pseudo-goroutine),代表着等待同步事件的 goroutine,比如等待从 channel 收发数据,等待一个条件变量。

headtail 是链表中首尾两个 goroutine。同时, waitnotify 字段是持续增长的 ticket 号,代表队列中的一个位置。

  • wait :代表要发放给等待状态 goroutine 的下一个 ticket。
  • notify :代表下一个应该被通知,或者说唤醒的位置。

这就是 notifyList 背后的核心思想了,结合起来之后是这样的。

notifyListAdd()

当一个 goroutine 将要等待一次通知时,它会调用 notifyListAdd() 来获取 ticket。

go
func (c *Cond) Wait() {
	c.checker.check()
	// 获取ticket
	t := runtime_notifyListAdd(&c.notify)
	c.L.Unlock()
	// 将goroutine加入队列并阻塞
	runtime_notifyListWait(&c.notify, t)
	c.L.Lock()
}

func notifyListAdd(l *notifyList) uint32 {
	return l.wait.Add(1) - 1
}

ticket 分配由一个原子计数器负责。因此,当一个 goroutine 的调用 notifyListAdd() 时,计数器递增,goroutine 会拿到下一个可用的 ticket 号。

每一个 goroutine 都能拿到一个唯一的 ticket 号,这一过程无需任何锁。这也意味着多个 goroutine 能够同时请求 ticket,无需等待。

例如,如果 ticket 计数器当前指向 5,调用 notifyListAdd() 的 goroutine 会拿到 ticket 号 5,计数器会自增到 6,等待下一次调用。 wait 字段总是指向下一个被发放的 ticket 号。

但这里事情稍微有些绕了。

由于多个 goroutine 能够同时获取 ticket,它们调用 notifyListAdd() 和进入 notifyListWait() 的时机之间有些微间隙。所以尽管 ticket 号是递增发放的,它们的顺序也不能被保证。goroutine 加入链表的顺序可能是 1, 2, 3 ,但最后的顺序可能是 3, 2, 1 或者 2, 1, 3 ,这都取决于间隔时间。

拿到 ticket 之后,goroutine 的下一步就是等待被通知。这发生在 goroutine 调用 notifyListWait(t) 之时, t 就是它刚刚获取的 ticket 号。

go
func notifyListWait(l *notifyList, t uint32) {
	lockWithRank(&l.lock, lockRankNotifyList)

	// 如果这张ticket已经被通知,立刻返回
	if less(t, l.notify) {
		unlock(&l.lock)
		return
	}

	// 加入链表
	s := acquireSudog()
	...

	if l.tail == nil {
		l.head = s
	} else {
		l.tail.next = s
	}
	l.tail = s
	goparkunlock(&l.lock, waitReasonSyncCondWait, traceBlockCondWait, 3)
	...

	releaseSudog(s)
}

首先,goroutine 检查自己的 ticket 是不是已经被通知。

它对比自己的 ticket( t )和当前的通知号。如果通知号已经大于自己的 ticket,他就没必要等待了,会直接跳转到共享资源开始工作。

这种快速检查很重要,特别是在我们分开 Signal()Broadcast() 的情况下。但如果 goroutine 的 ticket 还没被通知,它就把自己加入链表,进入睡眠状态,直到被通知。

notifyListNotifyOne()

当需要通知等待中的 goroutine 时,系统从当前未被通知的最小 ticket 号开始,这由 l.notify 追踪。

go
func notifyListNotifyOne(l *notifyList) {
	// 快速检查:要是没人等待,直接退出
	if l.wait.Load() == atomic.Load(&l.notify) {
		return
	}

	lockWithRank(&l.lock, lockRankNotifyList)

	// 再次检查,确保真的需要执行后续逻辑
	t := l.notify
	if t == l.wait.Load() {
		unlock(&l.lock)
		return
	}

	// 更新到下一个需要通知的ticket
	atomic.Store(&l.notify, t+1)

	// 在链表中找到对应的goroutine
	for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
		if s.ticket == t {
			// 找到了匹配的goroutine
			n := s.next
			if p != nil {
				p.next = n
			} else {
				l.head = n
			}
			if n == nil {
				l.tail = p
			}
			unlock(&l.lock)
			s.next = nil
			readyWithTime(s, 4) // 标记goroutine为就绪状态
			return
		}
	}
	unlock(&l.lock)
}

还记得我们说过 ticket 不保证顺序?

我们可能有 goroutine 的 ticket 分别是 2, 1, 3 ,但通知号总是顺序递增的。所以,当系统要唤醒一个 goroutine 时,就需要遍历链表,找到拿着下一张 ticket 的 goroutine。一旦找到了,它就把它从链表中移除,将之标记为就绪状态。

但这里还有些有趣的地方,有时候这里还会有时机问题。假设一个 goroutine 拿到了 ticket,但是在这个函数运行时还没有被加入链表。

接下来会发生什么?比如,代码执行顺序是: notifyListAdd() -> notifyListNotifyOne() -> notifyListWait()

这种情况下,函数遍历链表,但没有找到拿着对应 ticket 的 goroutine。但是不要担心, notifyListWait() 会在 goroutine 调用它时处理这种情况。

还记得前面提到的一次重要检查吗?在 notifyListWait() 里: if less(t, l.notify) {...}

这项检查很重要,因为它允许一个 ticket 号小于 l.notify 的 goroutine 立刻就绪。此时,goroutine 跳过等待阶段,直接访问共享资源。

所以,即使 goroutine 还没有加入链表,只要它拿着有效的 ticket,它就也能被通知到。这让整个设计变得非常丝滑,每个 goroutine 都能立刻拿到 ticket,不需要额外等待。一切都不会被阻塞。

notifyListNotifyAll()

现在我们来看最后一部分, Broadcast 或者 notifyListNoftifyAll() 。和 notifyListNotifyOne() 相比,这个函数要简单得多:

go
func notifyListNotifyAll(l *notifyList) {
	// 快速检查:没人等待时立刻退出
	if l.wait.Load() == atomic.Load(&l.notify) {
		return
	}

	lockWithRank(&l.lock, lockRankNotifyList)
	s := l.head
	l.head = nil
	l.tail = nil

	atomic.Store(&l.notify, l.wait.Load())
	unlock(&l.lock)

	// 将链表中的所有goroutine标记为就绪
	for s != nil {
		next := s.next
		s.next = nil
		readyWithTime(s, 4)
		s = next
	}
}

这段代码特简单,我想你已经看明白了。基本上, Broadcast() 把链表中所有的 goroutine 标记为就绪,然后清空整个链表。

最后我们用一个警告来结尾:我们很容易就会误用 sync.Cond ,引入一些棘手的,很难 debug 的问题。了解了技术实现后,我建议看看工程师角度的讨论:proposal:sync:remove the Cond type