沧海一粟

天下事有难易乎?为之,则难者亦易矣;不为,则易者亦难矣。

0%

go语言 互斥锁 sync.Mutex

  相比于 Go 语言宣扬的“用通讯的方式共享数据”,通过共享数据的方式来传递信息和协调线程运行的做法其实更加主流,毕竟大多数的现代编程语言,都是用后一种方式作为并发编程的解决方案的。
  我们来了解一下go中的互斥锁sync.Mutex

(1) sync.Mutex是什么

  sync.Mutex是go语言里的一种互斥锁,是保证同步的一种工具。
  类似生活中去医院看病时挂号等医生叫号的过程,有很多患者挂号(协程),只有一个医生(资源),被叫号的患者(拿到锁)可以到诊室里让医生看病。看完病离开诊室(释放锁)。


(2) 为什么要用sync.Mutex

先看一个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import "sync"

var sum int = 0
var wg sync.WaitGroup
var mu sync.Mutex

// 开10个协程并发执行1000次 sum++
// 如果不加 mu.lock() mu.unlock() 最后的结果不是10000 一般会小于10000
// 把 // mu.lock() // mu.unlock() 前面的注释删除,结果就是10000
// 这个是缓存导致的可见性问题。
// 具体原因是因为sum++不是原子操作,CPU分2条指令执行,CPU执行第2条指令时获取到的缓存sum值和主内存sum值不一致导致
func main() {

// fatal error: sync: unlock of unlocked mutex
// // mu.Unlock()

// 开10个协程并发执行1000次 sum=sum+1
for i := 0; i < 10; i++ {
wg.Add(1)
go funcAdd()
}
// 等所有协程执行完
wg.Wait()
println("result:", sum)

}

func funcAdd() {
for i := 0; i < 1000; i++ {
// 对sum加锁
//mu.Lock()
sum++
// 对sum解锁
//mu.Unlock()
//println("result=", sum)
}
// 没有下面这一行代码 提示 fatal error: all goroutines are asleep - deadlock!
wg.Done()
}

上面的代码主要功能是开10个协程并发执行1000次 sum++,并打印结果,可以自己执行一下,会发现执行的结果<10000。
把第33行 36行前的注释删掉,程序执行的结果就是结果就是10000

(2.1) 互斥锁的作用

互斥锁是保证同步的一种工具,主要体现在以下2个方面:
1、避免多个线程在同一时刻操作同一个数据块 (sum);
2、可以协调多个线程,以避免它们在同一时刻执行同一个代码块 (sum++)。

(2.2) 什么时候用

1、需要保护一个数据或数据块时;
2、需要协调多个协程串行执行同一代码块,避免并发问题时。

 比如 经常遇到A给B转账100元的例子,这个时候就可以用互斥锁来实现。


(3) sync.Mutex的用法

1
2
3
4
var mu sync.Mutex
mu.Lock() // 加锁
// 执行业务逻辑
mu.Unlock() // 释放锁


(4) sync.Mutex原理

go版本 go1.17.1
源码路径 src包 /sync/mutex.go 代码+注释一共才227行,比较简洁

 下面从数据结构、加锁、解锁三方面来解析sync.Mutex原理

(4.1) sync.Mutex数据结构

1
2
3
4
5
6
7
8
9
10
// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
// 互斥锁当前的状态
state int32
// 信号量,用于唤醒goroutine
sema uint32
}

  可以看到,互斥锁结构体只有2个变量,互斥锁状态state 和 信号量sema

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
const (
// mutex is locked
// 是否加锁的标识
mutexLocked = 1 << iota // 1代表加锁
mutexWoken // 2 代表唤醒
mutexStarving // 3 代表饥饿
mutexWaiterShift = iota // 4 代表位移

// 公平锁
//
// 锁有两种模式:正常模式和饥饿模式。
// 在正常模式下,所有的等待锁的goroutine都会存在一个先进先出的队列中(轮流被唤醒)
// 但是一个被唤醒的goroutine并不是直接获得锁,而是仍然需要和那些新请求锁的(new arrivial)
// 的goroutine竞争,而这其实是不公平的,因为新请求锁的goroutine有一个优势——它们正在CPU上
// 运行,并且数量可能会很多。所以一个被唤醒的goroutine拿到锁的概率是很小的。在这种情况下,
// 这个被唤醒的goroutine会加入到队列的头部。如果一个等待的goroutine有超过1ms(写死在代码中)
// 都没获取到锁,那么就会把锁转变为饥饿模式。
//
// 在饥饿模式中,锁的所有权会直接从释放锁(unlock)的goroutine转交给队列头的goroutine,
// 新请求锁的goroutine就算锁是空闲状态也不会去获取锁,并且也不会尝试自旋。它们只是排到队列的尾部。
//
// 如果一个goroutine获取到了锁之后,它会判断以下两种情况:
// 1. 它是队列中最后一个goroutine;
// 2. 它拿到锁所花的时间小于1ms;
// 以上只要有一个成立,它就会把锁转变回正常模式。

// 正常模式会有比较好的性能,因为即使有很多阻塞的等待锁的goroutine,
// 一个goroutine也可以尝试请求多次锁。
// 饥饿模式对于防止尾部延迟来说非常的重要。
starvationThresholdNs = 1e6
)

  state代表当前锁状态,int类型,最大可以表示2^31-1,state零值是0,这个字段会同时被多个goroutine所共用(使用 atomic.CAS 来保证原子性)
  mutexLocked = 1 代表加锁
  mutexWoken = 2 代表唤醒
  mutexStarving = 3 代表饥饿
  mutexWaiterShift 4 代表位移
  starvationThresholdNs 值为1e6纳秒,也就是1毫秒,当等待队列中队首goroutine等待时间超过starvationThresholdNs也就是1毫秒,mutex进入饥饿模式。

可以看一下state的状态及枚举

state状态 state状态枚举 对应二进制 对应状态
mutexUnLock state=0 0000 未加锁
mutexLocked state=1 0001 加锁
mutexWoken state=2 0010 唤醒
mutexStarving state=4 0100 饥饿
mutexWaiterShift state=3 0011 代表位移

在看下面代码之前,一定要记住这几个状态之间的 与运算 或运算,否则代码里的与运算或运算

1
2
3
4
5
6
7
8
state:   |32|31|...|3|2|1|
\__________/ | |
| | |
| | mutex的占用状态(1被占用,0可用)
| |
| mutex的当前goroutine是否被唤醒
|
当前阻塞在mutex上的goroutine数

mutex state bit

(4.2) 加锁过程 Lock

加锁流程描述:

1、单协程获取锁,通过CAS把state状态从0设置成1,加锁成功后返回;
2、一个或多个协程获取锁,通过CAS把state状态从0设置成1失败时,代表有冲突,首先自旋,如果其他协程在这段时间内释放了该锁后,获取该锁成功,返回;如果获取锁失败继续往下执行;
3、有冲突,且已经过了自旋阶段,通过信号量进行阻塞;
3.1 刚被唤醒的 加入到等待队列首部;
3.2 新加入的 加入到等待队列的尾部。
4、有冲突,根据不同的模式做处理;
4.1 饥饿模式 获取锁
4.2 正常模式 唤醒,继续循环,回到2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
// 通过CAS来判断是否加锁,如果执行CAS把互斥锁状态从0设置成1,则加锁成功,返回
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
//
m.lockSlow()
}

  先忽略race.Enabled相关代码,这个是go做race检测时候用的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124

func (m *Mutex) lockSlow() {
var waitStartTime int64
// 是否处于饥饿模式
starving := false
// 用来存当前goroutine是否已唤醒
awoke := false
// 用来存当前goroutine的循环次数
iter := 0
// 记录下当前的状态
old := m.state
//
for {
// Don't spin in starvation mode, ownership is handed off to waiters
// so we won't be able to acquire the mutex anyway.
// 不要在饥饿模式下旋转,所有权交给服务员。
// 所以我们无论如何都无法获得互斥。
//
// 看、计算+思考了很长时间,才明白,这块是为了让mutexLocked状态+可以自旋的协程先自旋
// 看这块代码要思考协程状态间为什么设置成位运算
// 加个括号方便理解 ((old&(mutexLocked|mutexStarving)) == mutexLocked) && runtime_canSpin(iter)
// old&(mutexLocked|mutexStarving) 是位运算操作,mutexLocked=0001 mutexStarving=0100 | 运算完是 0101
// 要使 old&0101 == 0001(mutexLocked),old只有是 0001(mutexLocked) 或者 1001 11001 ... ,0010(mutexWoken) 0100(mutexStarving) 都不能使结果=0001
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
// 将自己的状态以及锁的状态设置为唤醒,这样当Unlock的时候就不会去唤醒其它被阻塞的goroutine了
//
// old=0001 1001 ... mutexWoken=0010 old&mutexWoken=0000 old|mutexWoken=0011
// atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) 设置当前goroutine的state
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
// 当前goroutine已唤醒
awoke = true
}
// 主动自旋
runtime_doSpin()
// 循环次数加一
iter++
// 更新锁的状态(有可能在自旋的这段时间之内锁的状态已经被其它goroutine改变)
old = m.state
continue
}

// 到了这一步, state的状态可能是:
// 1. 锁还没有被释放,锁处于正常状态
// 2. 锁还没有被释放, 锁处于饥饿状态
// 3. 锁已经被释放, 锁处于正常状态
// 4. 锁已经被释放, 锁处于饥饿状态
//
// 并且当前gorutine的awoke可能是true, 也可能是false (其它goutine已经设置了state的woken标识)
new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
// 如果old state状态不是饥饿状态, new state 设置锁状态
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 将等待队列的等待者的数量加1 实际上是new=new+8
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
// 如果当前goroutine已经处于饥饿状态, 并且old state的已被加锁,
// 将new state的状态标记为饥饿状态, 将锁转变为饥饿状态.
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// 如果本goroutine已经设置为唤醒状态, 需要清除new state的唤醒标记, 因为本goroutine要么获得了锁,要么进入休眠,
// 总之state的新状态不再是woken状态.
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}

if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}

(4.3) 解锁过程 UnLock

References

[1] go中sync.Mutex源码解读
[2] 这可能是最容易理解的 Go Mutex 源码剖析
[3] [Go并发] - Mutex源码解析