YoloKokura

原帖见Go sync.Cond, the Most Overlooked Sync Mechanism

文中对Signal()原理的阐述相当精彩。

BTW,我目前要做的一些开发工作和VictoriaMetrics性能监控强相关,未来会写一些VM和Prometheus的介绍博客也说不定……

sync.Cond是Go语言的一种同步原语,但不像sync.Mutexsync.WaitGroup那样被广泛使用。我们一般很少在项目代码,甚至标准库代码中用到它,反而倾向使用别的同步机制替代。

尽管如此,作为一个Go开发工程师,你应该不太会想在读用了sync.Cond的代码时理解困难,毕竟它也是标准库的一部分呀。

那么这篇文章就能帮你弭平差距,甚至让你清晰地理解条件变量实际上是怎么工作的。

什么是sync.Cond?

现在我们开始拆解sync.Cond

当一个goroutine需要等待特定事件发生,比如某些共享数据的更新,它可以**“阻塞”**,也就是暂停工作,直到得到了恢复运行的消息。要实现这一点,最基本的方法是使用循环,或许再加一句time.Sleep,防止CPU大量空转。

这种方法大概是这样的:

go
// 等待直到条件为true
for !condition {
}

// 或者这样
for !condition {
    time.Sleep(100 * time.Millisecond)
}

但这样做其实不是特别高效,因为循环仍然在后台运行,甚至在无事发生的时候还在占用着CPU循环。

所以我们就需要使用sync.Cond,它提供了一种更好的手段。如果我们更学术一点,我们应该称其为“条件变量”。

  • 当一个goroutine在等待某事发生时(等待特定条件变为真),它可以调用Wait()
  • 另一个goroutine,一旦发现条件被满足,可以调用Signal()Broadcast()来唤醒等待的goroutine,让它们继续工作。

以下是sync.Cond提供的基本接口:

go
// 在条件满足之前阻塞调用它的goroutine
func (c *Cond) Wait() {}

// 如果有的话,唤醒一个正在等待的goroutine
func (c *Cond) Signal() {}

// 唤醒所有正在等待的goroutine
func (c *Cond) Broadcast() {}

来看一个例子。假设我们在等待一种特定的宝可梦,它出现的时候我们想要提醒别的goroutine。

go
var pokemonList = []string{"Pikachu", "Charmander", "Squirtle", "Bulbasaur", "Jigglypuff"}
var cond = sync.NewCond(&sync.Mutex{})
var pokemon = ""

func main() {
	// 消费者
	go func() {
		cond.L.Lock()
		defer cond.L.Unlock()

		// 一直等到皮卡丘出现
		for pokemon != "Pikachu" {
			cond.Wait()
		}
		println("Caught" + pokemon)
		pokemon = ""
	}()

    // 生产者
	go func() {
		// 每1ms随机出现一只宝可梦
		for i := 0; i < 100; i++ {
			time.Sleep(time.Millisecond)

			cond.L.Lock()
			pokemon = pokemonList[rand.Intn(len(pokemonList))]
			cond.L.Unlock()

			cond.Signal()
		}
	}()

	time.Sleep(100 * time.Millisecond) // lazy wait
}

// Output:
// Caught Pikachu

在这个例子里,一个goroutine在等待皮卡丘出现,另一个从列表里随机选择一只宝可梦,并告知消费者。

当生产者发送信号时,消费者被唤醒,并检查是不是出现的要等待的那只宝可梦。如果是,我们就抓住它,反之我们继续等待下一只出现。

问题是,在生产者发送信号和消费者被唤醒之间是有一个时间差的。此时,出现的宝可梦可能改变,因为消费者被唤醒花费的时间可能多于1ms,又或者别的goroutine改变了共享的宝可梦数据。因此,sync.Cond实际上再说:“嘿!有事情发生啦!快醒过来看看,但你要是赖床,事情可能又要变啦。”

要是消费者被唤醒得太迟,宝可梦可能会逃跑,它就不得不重新阻塞。

可我明明可以用channel向其他的goroutine发送宝可梦名称或者信号呀

这当然没问题。事实上,channel常被用来替代sync.Go,就是因为它更简单、更惯用,大多数开发者更熟悉。

在上面的例子里,你可以简单实用一个channel发送宝可梦的名字,或者用一个空结构体struct{}发送信号,而不传递任何数据。但我们的问题不只是用channel传递消息,而是如何处理共享状态。

我们的例子很简单,但要是多个goroutine同时访问共享的宝可梦变量,使用channel会导致什么呢?

  • 要是我们用channel传递宝可梦名称,我们就还要用一把互斥锁保护共享变量。
  • 如果我们仅使用channel发送信号,我们仍然需要互斥锁管理共享状态。
  • 如果我们在生产者里确认宝可梦是皮卡丘,再把它发送给channel,我们还是需要互斥锁。更何况,这种做法违反了分工解耦的设计原则:生产者在处理属于消费者的逻辑。

综上,只要是多个goroutine同时改变共享数据的场景,我们就需要互斥锁来保护共享状态。所以我们经常看到channel和mutex搭配使用的方式,以确保正确同步和数据安全。

行吧,那广播信号又是怎么一回事呢?

好问题!你确实可以通过关闭channel的方式(close(ch))广播信号给所有等待的goroutine。当你这样做的时候,所有从该channel接收消息的goroutine都会得到提醒。但当心,被关闭的channel是不能复用的,一旦关闭就永远关闭了。

BTW,在Go 2里面一直有移除sync.Cond的讨论:proposal: sync: remove the Cond type

那既然如此,sync.Cond到底擅长做什么呢?

在一些场景里,sync.Cond会比channel更合适:

  1. 你可以用channel给goroutine发送信号(给channel传值),或者通知所有的goroutine(关闭channel),但你不能同时做这两件事。sync.Cond支持更细粒度的控制。你能用Signal()唤醒单独的goroutine,或者用Broadcast()唤醒全部goroutine。
  2. 此外,你还可以无限次使用Broadcast(),但你只能关闭一次channel,再关闭一次就panic了。
  3. channel不支持保护共享数据,你必须额外使用互斥锁。但sync.Cond却在同一个包里继承了锁机制和信号机制(性能也更好)。

为什么sync.Cond要内嵌一个锁?

理论上,sync.Cond这样的条件变量没必要附加锁。

用户完全可以在条件变量之外自行管理锁,这样听起来更灵活。这样设计并非出自技术限制,而是为了避免人为错误。

人为管理锁很容易出错,因为条件变量的使用方式不直观:你得先解锁,再Wait(),然后在goroutine被唤醒时再拿锁。这个过程有几分尴尬,而且容易出错(忘了拿锁,或者在错误的时机解锁)。

但为什么这种锁机制看上去这么古怪呢?

通常,调用cond.Wait()的goroutine需要在循环中检查一些贡献状态,比如:

go
for !checkSomeSharedState() {
    cond.Wait()
}

把锁嵌入sync.Cond能帮我们处理前述加锁/解锁的过程,代码会更清晰,不容易出错,后文我们会详细讨论。

怎么使用sync.Cond?

如果你认真看了前面的例子,不难发现消费者里的使用模式:我们总是获取互斥锁后再等待(.Wait())条件满足;我们总在条件满足后再解锁。

除此之外,我们把等待条件写进循环里:

go
// 消费者
go func() {
    cond.L.Lock()
    defer cond.L.Unlock()

    // 等待皮卡丘出现
    for pokemon != "Pikachu" {
        cond.Wait()
    }
    println("Caught" + pokemon)
}()

Cond.Wait()

当我们调用sync.CondWait()方法时,我们是在命令当前goroutine阻塞直至某个条件满足。

这幕后其实发生了这些事:

  1. goroutine被添加到一个等待列表中(记录了等待相同条件的goroutine),这些goroutine都被阻塞,也即,除非被Singnal()Broadcast()唤醒,不得继续工作。
  2. 关键部分:互斥锁必须在Wait()之前上锁,因为Wait()会在goroutine沉睡前自动释放锁(调用Unlock())。这将允许其它goroutine获取锁,并在之前goroutine等待时完成它们的工作。
  3. 当goroutine被唤醒时(通过Signal()Broadcast()),它会先重新获取锁(Lock()),再工作。

Wait()底层是这样工作的:

go
func (c *Cond) Wait() {
	// 检查Cond实例是否有被复制
	c.checker.check()

	// 获取ticket
	t := runtime_notifyListAdd(&c.notify)

	// 解锁
	c.L.Unlock()

	// 阻塞goroutine
	runtime_notifyListWait(&c.notify, t)

	// 重新拿锁
	c.L.Lock()
}

尽管这个过程不难,我们还是总结出了4点:

  1. checker用于防止Cond实例被复制,如果这样,它会panic。
  2. 调用cond.Wait()会立即释放互斥锁,故互斥锁在cond.Wait()被调用之前必须被获取到,否则会panic。
  3. 在被唤醒之后,cond.Wait()会重新获取互斥锁,这表明完成了对共享数据的操作后需要再次将之解锁。
  4. sync.Cond的大部分功能都基于内部数据结构notifyList实现,这个结构使用基于ticket的通知方式。

因为存在这种加锁/解锁的行为,你在使用sync.Cond.Wait()时必须遵从特定规范以免出错:

go
c.L.Lock()
for !condition() {
	c.Wait()
}
// 基于条件执行任务
c.L.Unlock()

为什么不直接使用c.Wait(),非要在循环里面用?

Wait()返回时,我们不能假定等待的条件立刻为真。在我们的goroutine被唤醒时,别的goroutine可能已经操作了共享状态,导致条件为假。因此,为了应付这种情况,我们把Wait()放进循环。

在宝可梦的例子里,我们也提及过这个问题。

循环持续检查条件,只有条件为真时goroutine才能跳出来。

Cond.Signal() & Cond.Broadcast()

Signal()用于唤醒一个正在等待条件变量的goroutine。

  • 如果没有在等待的goroutine,Signal()不会做任何操作。
  • 如果有goroutine等待,Signal()会唤醒队列中的第一个goroutine。所以,如果你启动了大量goroutine,从0到n,只有第0个会被Signal()唤醒。

我们过一个例子:

go
func main() {
	cond := sync.NewCond(&sync.Mutex{})
	for i := range 10 {
		go func(i int) {
			cond.L.Lock()
			defer cond.L.Unlock()
			cond.Wait()

			fmt.Println(i)
		}(i)

		time.Sleep(time.Millisecond)
	}

	time.Sleep(100 * time.Millisecond) // 等待goroutine启动
	cond.Signal()
	time.Sleep(100 * time.Millisecond) // 等待goroutine被唤醒
}

// Output:
// 0

Signal()被用来唤醒一个goroutine,告诉它条件可能被满足。它的实现是这样的:

go
func (c *Cond) Signal() {
	c.checker.check()
	runtime_notifyListNotifyOne(&c.notify)
}

你不需要在调用Signal()之前拿锁,但这么做通常是个好主意,尤其是你想修改共享数据,而这份数据又在被并发访问的情况下。

cond.Broadcast()呢?

go
func (c *Cond) Broadcast() {
	c.checker.check()
	runtime_notifyListNotifyAll(&c.notify)
}

Broadcast()被调用时唤醒所有的goroutine,将它们从队列清除。它的内部逻辑很简单,隐藏在runtime_notifyListNotifyAll()里。

go
func main() {
	cond := sync.NewCond(&sync.Mutex{})
	for i := range 10 {
		go func(i int) {
			cond.L.Lock()
			defer cond.L.Unlock()

			cond.Wait()
			fmt.Println(i)
		}(i)
	}

	time.Sleep(100 * time.Millisecond) // 等待goroutine启动
	cond.Broadcast()
	time.Sleep(100 * time.Millisecond) // 等待goroutine被唤醒
}

// Output:
// 8
// 6
// 3
// 2
// 4
// 5
// 1
// 0
// 9
// 7

此时,所有的goroutine在100毫秒内被唤醒,但顺序无法保证。

Broadcast()被调用时,它把所有等待的goroutine标记为准备运行,但它们不会立刻启动,而是被Go调度器的底层算法选择,其顺序是不可预测的。

内部原理

我们所有的Go博客里都有一节内部原理。理解设计思路和其解决的问题通常很有意义。

Copy checker

sync包里的copyChecker被用来检查Cond对象在初次使用后被复制的情况。这里“初次使用”包括Wait()Signal()以及Broadcast()

如果Cond在这些操作后被复制,程序会panic:“sync.Cond is copied”。

sync.WaitGroupsync.Pool里也有类似的设计,它们使用noCopy字段防止复制,但这些例子里不会有panic发生。

Go sync.WaitGroup and The Alignment Problem

Go sync.Pool and the Mechanics Behind it

此处的copyChecker事实上只是一个uintptr,即保存内存地址的整型,它是这样工作的:

  • 初次使用sync.Cond之后,copyChecker保存它自己的内存地址,指向cond.copyChecker对象。
  • 如果该对象被拷贝,copyChecker的地址(&cond.copyChecker)会发生变化(因为新的拷贝位于不同的内存地址),但copyChecker包含的uintptr不会变。

这里的检查很简单:对比copyChecker地址和uintptr保存的值,要是不同就panic。

即使逻辑很简单,如果不熟悉Go的原子操作和unsafe包的话,其实现看起来会有点古怪:

go
// copyChecker 保存指向自己的指针来检查对象拷贝
type copyChecker uintptr

func (c *copyChecker) check() {
	if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
		!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
		uintptr(*c) != uintptr(unsafe.Pointer(c)) {
		panic("sync.Cond is copied")
	}
}

由于第一项和最后一项检查基本相同,我们可以将上述代码拆解成两个主要的检查步骤。

第一项检查,uintptr(*c) != uintptr(unsafe.Pointer(c)),确认内存地址是否发生变化,如果是,则对象有被拷贝。但这里有一个问题,如果copyChecker是第一次被使用,由于还未初始化,两边都是零值。

第二项检查,!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))),使用CAS操作同时处理初始化和检查任务:

  • 如果CAS成功,说明copyChecker刚刚被初始化,故对象还没有被复制。
  • 如果CAS失败,说明copyChecker已经初始化,那么我们就需要进行最终检查uintptr(*c) != uintptr(unsafe.Pointer(c)),以确认对象未被拷贝。

最后一次检查(和第一次检查完全一致),确保对象在上面的操作之后也没有被复制。

为什么最后还有一次检查?两次检查还不够吗?

第三次检查的原因是前两次检查操作不是原子性的。

如果检查时copyChecker是第一次使用,此时它还没有被初始化,为零值。如果这样,前两次检查就会错误通过,尽管对象没有被拷贝,但它也没有被初始化。

notifyList:基于tick的提醒列表

除了锁和复制检查机制,sync.Cond的另一个重要部分就是notifyList

go
type Cond struct {
	noCopy noCopy
	L Locker

	notify  notifyList

	checker copyChecker
}

type notifyList struct {
	wait   uint32
	notify uint32
	lock   uintptr
	head   unsafe.Pointer
	tail   unsafe.Pointer
}

如今,sync包和运行时包的notifyList不一样,但使用了相同的名字和内存布局(这是故意为之)。为了理解其运行逻辑,我们得看看运行时包中的版本:

go
type notifyList struct {
	wait atomic.Uint32
	notify uint32

	lock mutex

	head *sudog
	tail *sudog
}

如果你看到head和tail,你大概会猜这是某种链表的实现,那你就猜对了。这是一个sudog链表(含义是伪goroutine,pseudo-goroutine),代表着等待同步事件的goroutine,比如等待从channel收发数据,等待一个条件变量。

headtail是链表中首尾两个goroutine。同时,waitnotify字段是持续增长的ticket号,代表队列中的一个位置。

  • wait:代表要发放给等待状态goroutine的下一个ticket。
  • notify:代表下一个应该被通知,或者说唤醒的位置。

这就是notifyList背后的核心思想了,结合起来之后是这样的。

notifyListAdd()

当一个goroutine将要等待一次通知时,它会调用notifyListAdd()来获取ticket。

go
func (c *Cond) Wait() {
	c.checker.check()
	// 获取ticket
	t := runtime_notifyListAdd(&c.notify)
	c.L.Unlock()
	// 将goroutine加入队列并阻塞
	runtime_notifyListWait(&c.notify, t)
	c.L.Lock()
}

func notifyListAdd(l *notifyList) uint32 {
	return l.wait.Add(1) - 1
}

ticket分配由一个原子计数器负责。因此,当一个goroutine的调用notifyListAdd()时,计数器递增,goroutine会拿到下一个可用的ticket号。

每一个goroutine都能拿到一个唯一的ticket号,这一过程无需任何锁。这也意味着多个goroutine能够同时请求ticket,无需等待。

例如,如果ticket计数器当前指向5,调用notifyListAdd()的goroutine会拿到ticket号5,计数器会自增到6,等待下一次调用。wait字段总是指向下一个被发放的ticket号。

但这里事情稍微有些绕了。

由于多个goroutine能够同时获取ticket,它们调用notifyListAdd()和进入notifyListWait()的时机之间有些微间隙。所以尽管ticket号是递增发放的,它们的顺序也不能被保证。goroutine加入链表的顺序可能是1, 2, 3,但最后的顺序可能是3, 2, 1或者2, 1, 3,这都取决于间隔时间。

拿到ticket之后,goroutine的下一步就是等待被通知。这发生在goroutine调用notifyListWait(t)之时,t就是它刚刚获取的ticket号。

go
func notifyListWait(l *notifyList, t uint32) {
	lockWithRank(&l.lock, lockRankNotifyList)

	// 如果这张ticket已经被通知,立刻返回
	if less(t, l.notify) {
		unlock(&l.lock)
		return
	}

	// 加入链表
	s := acquireSudog()
	...

	if l.tail == nil {
		l.head = s
	} else {
		l.tail.next = s
	}
	l.tail = s
	goparkunlock(&l.lock, waitReasonSyncCondWait, traceBlockCondWait, 3)
	...

	releaseSudog(s)
}

首先,goroutine检查自己的ticket是不是已经被通知。

它对比自己的ticket(t)和当前的通知号。如果通知号已经大于自己的ticket,他就没必要等待了,会直接跳转到共享资源开始工作。

这种快速检查很重要,特别是在我们分开Signal()Broadcast()的情况下。但如果goroutine的ticket还没被通知,它就把自己加入链表,进入睡眠状态,直到被通知。

notifyListNotifyOne()

当需要通知等待中的goroutine时,系统从当前未被通知的最小ticket号开始,这由l.notify追踪。

go
func notifyListNotifyOne(l *notifyList) {
	// 快速检查:要是没人等待,直接退出
	if l.wait.Load() == atomic.Load(&l.notify) {
		return
	}

	lockWithRank(&l.lock, lockRankNotifyList)

	// 再次检查,确保真的需要执行后续逻辑
	t := l.notify
	if t == l.wait.Load() {
		unlock(&l.lock)
		return
	}

	// 更新到下一个需要通知的ticket
	atomic.Store(&l.notify, t+1)

	// 在链表中找到对应的goroutine
	for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
		if s.ticket == t {
			// 找到了匹配的goroutine
			n := s.next
			if p != nil {
				p.next = n
			} else {
				l.head = n
			}
			if n == nil {
				l.tail = p
			}
			unlock(&l.lock)
			s.next = nil
			readyWithTime(s, 4) // 标记goroutine为就绪状态
			return
		}
	}
	unlock(&l.lock)
}

还记得我们说过ticket不保证顺序?

我们可能有goroutine的ticket分别是2, 1, 3,但通知号总是顺序递增的。所以,当系统要唤醒一个goroutine时,就需要遍历链表,找到拿着下一张ticket的goroutine。一旦找到了,它就把它从链表中移除,将之标记为就绪状态。

但这里还有些有趣的地方,有时候这里还会有时机问题。假设一个goroutine拿到了ticket,但是在这个函数运行时还没有被加入链表。

接下来会发生什么?比如,代码执行顺序是:notifyListAdd() -> notifyListNotifyOne() -> notifyListWait()

这种情况下,函数遍历链表,但没有找到拿着对应ticket的goroutine。但是不要担心,notifyListWait()会在goroutine调用它时处理这种情况。

还记得前面提到的一次重要检查吗?在notifyListWait()里:if less(t, l.notify) {...}

这项检查很重要,因为它允许一个ticket号小于l.notify的goroutine立刻就绪。此时,goroutine跳过等待阶段,直接访问共享资源。

所以,即使goroutine还没有加入链表,只要它拿着有效的ticket,它就也能被通知到。这让整个设计变得非常丝滑,每个goroutine都能立刻拿到ticket,不需要额外等待。一切都不会被阻塞。

notifyListNotifyAll()

现在我们来看最后一部分,Broadcast或者notifyListNoftifyAll()。和notifyListNotifyOne()相比,这个函数要简单得多:

go
func notifyListNotifyAll(l *notifyList) {
	// 快速检查:没人等待时立刻退出
	if l.wait.Load() == atomic.Load(&l.notify) {
		return
	}

	lockWithRank(&l.lock, lockRankNotifyList)
	s := l.head
	l.head = nil
	l.tail = nil

	atomic.Store(&l.notify, l.wait.Load())
	unlock(&l.lock)

	// 将链表中的所有goroutine标记为就绪
	for s != nil {
		next := s.next
		s.next = nil
		readyWithTime(s, 4)
		s = next
	}
}

这段代码特简单,我想你已经看明白了。基本上,Broadcast()把链表中所有的goroutine标记为就绪,然后清空整个链表。

最后我们用一个警告来结尾:我们很容易就会误用sync.Cond,引入一些棘手的,很难debug的问题。了解了技术实现后,我建议看看工程师角度的讨论:proposal:sync:remove the Cond type

Tags: