golang中的并发机制

Golang中的并发机制

Golang的并发处理优势

并发处理是Golang设计的核心目标之一,其通过一条简单的关键字go来启动协程goroutine,在广义上我们可以将协程理解为线程;

由于Golang轻量级的特性,在一个主程序中可是轻而易举地列举上千条协程,线程安全只需要由通道(chan)数据类型实现,而且先天支持多核CPU的调度;

多核CPU并发分担

所谓main()运行在程序的主线程中,当main()退出时,所有的goroutine都将退出,即便这些协程没有完成它的任务;

因此我们在接下来的示例程序中将引入100ms的睡眠无限循环防止程序提前退出,以便其他协程有机会运行,同时避免主协程无效忙等;

请注意,协程的执行是异步的,主协程不会等待协程,而是直接跳过协程,这就是在协程上打断点无效的原因;

在Goalng中查看实际硬件CPU数:

1
cpuCores := runtime.NumCPU()

将可用CPU核数设为n,对于n<1,配置为默认的机器配置;

1
runtime.GOMAXPROCS(n)

测试起见我们将CPU数设为2,因为可能配置太高看不出来共享冲突;

共享冲突

执行以下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
"fmt"
"runtime"
)

var valueG int = 0
var stop1 bool = false
var stop2 bool = false

func routine1(countA int, stopA *bool) {
for i := 0; i < countA; i++ {
valueG = valueG + 2
}
*stopA = true
}

func main() {
runtime.GOMAXPROCS(2)
go routine1(100000000, &stop1)
go routine1(100000000, &stop2)
for {
if stop1 && stop2 {
break
}
}
fmt.Printf("valueG: %v\n", valueG)
}

执行结果如下:

image-20240919015923605

很明显,临界资源的访问没有互斥机制,所以执行的结果也有不同;

我们加上控制机制来保证同一时刻只能由一个任务来访问该数据

共享安全

共享安全:保证在多任务并发处理时共享数据不会因共享冲突而导致错误;

Golang就提供了这样的数据类型来保证共享安全机制:chan管道, 这是一个先入先出的队列

执行以下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package main

import (
"fmt"
"runtime"
)

var valueG chan int
var stop1 bool = false
var stop2 bool = false

func routine1(countA int, stopA *bool) {
for i := 0; i < countA; i++ {
tmpC := <-valueG
valueG <- (tmpC + 2)
}
*stopA = true
}
func main() {
runtime.GOMAXPROCS(2)
valueG = make(chan int, 1)
defer close(valueG)

go routine1(10000, &stop1)
go routine1(10000, &stop2)
valueG <- 0

for {
if stop1 && stop2 {
break
}
}
fmt.Printf("valueG: %v\n", <-valueG)
}

执行结果如下,可以看到确实输出结果保持了一致:

image-20240919020628894

注意:

  • 理解-><-操作符是阻塞的:
    • 当从通道接收数据时,如果通道中没有数据,接收操作会阻塞,直到有数据可接收;
    • 当向通道发送数据时,如果通道没有接收方,发送操作会阻塞,直到有协程准备接收数据;
    • 这种阻塞行为可以确保并发协程之间的通信是同步的,避免了数据的丢失或竞态条件

获取令牌的任务

对于一些任务,取到令牌的任务才能访问某些数据,访问完毕之后交回令牌供其他任务使用;

以下是示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main

import (
"fmt"
"runtime"
)

var valueG int
var stop1 bool = false
var stop2 bool = false

var token chan bool

func routine1(countA int, stopA *bool) {
for i := 0; i < countA; i++ {
<-token // 关键
valueG = valueG +2
token <- false
}
*stopA = true
}
func main() {
runtime.GOMAXPROCS(2)
token = make(chan bool, 1)
defer close(token)

go routine1(10000, &stop1)
go routine1(10000, &stop2)

token <- false
for {
if stop1 && stop2 {
break
}
}
fmt.Printf("valueG: %v\n", valueG)
}

在实际应用中,令牌的或许就像这个token一样,等待管道输入过后,任务完成后交回token给下一个任务使用;

多任务归并

并发编程中,主任务将任务拆分成子任务并等待所有子任务完成之后再进行下一步处理的过程称作多任务的归并;

这样做可以充分利用CPU性能,避免资源的过渡浪费;

观察以下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main

import (
"fmt"
"runtime"
)
var goroutineCount = runtime.GOMAXPROCS(0) // 默认CPU核数
var resultBuffer chan float64

func addRoutine(lenT int){
var sumT float64
for i := 0; i < lenT; i++ {
sumT += 1.0
}
resultBuffer <- sumT
}

func addByGoroutine(countA int) float64 {
sumT := 0.0
lenT := countA / goroutineCount
leftT := countA - lenT*goroutineCount
go addRoutine(lenT + leftT)
for i := 1; i < goroutineCount; i++ {
go addRoutine(lenT)
}
for i := 0; i < goroutineCount; i++ {
sumT += <-resultBuffer
}
return sumT
}

func main(){
resultBuffer = make(chan float64, 1)
defer close(resultBuffer)

res := addByGoroutine(1000000)
fmt.Print(res)
}

超时终止机制

利用select监听多个通道,我们可以轻松实现超时终止机制:

1
2
3
4
5
6
7
8
select {
case tmpF = <-resultBuffer1:
sumT1 += tmpF // 任务1
case tmpC = <-resultBuffer2:
sumT2 += tmpC // 任务2
case <-time.After(3 * time.Second):
timeoutFlag = true // 超时
}

注意:time.After将返回一个定时触发的chan类型值

我们可以如下自定义超时函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 休眠指定的秒数后,向通道中写入一个数值表示超时(数值本身不重要)
// chanA是只能写入的单向通道
func realTimeout1(secondsA time.Duration, chanA chan<- bool) {
time.Sleep(secondsA * time.Second)
chanA <- true
}
// 仅用于新建一个通道后启动真正的超时routine,并将该通道返回让select等待通道中有值
func timeout1(secondsA time.Duration) <-chan bool {
chan1 := make(chan bool, 1)
// 传入realTimeout1的chan1被强制转换为只写通道类型
go realTimeout1(secondsA, (chan<- bool)(chan1))
return (<-chan bool)(chan1) // 返回时将chan1强制转换为只读通道类型
}

sync实现并发处理

我们可以利用sync.WaitGroup来实现多任务的归并,初始化sync.WaitGroup,每进行完一个任务计数器减1;

观察如下代码,执行发现每次结果有所不同:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import (
"fmt"
"sync"
)

var groupG sync.WaitGroup
var times = 100000
var valueG = 0

func addRoutine(countA int) {
defer groupG.Done()
for i := 0; i < countA; i++ {
valueG = valueG + 2
}
}
func minusRoutine(countA int) {
defer groupG.Done()
for i := 0; i < countA; i++ {
valueG = valueG - 1
}
}

func main(){
groupG.Add(2)
go addRoutine(times)
fmt.Printf("add value: %d\n", valueG)

go minusRoutine(times)
fmt.Printf("minus value: %d\n", valueG)

groupG.Wait()

fmt.Printf("Final value: %d\n", valueG)
}

这是因为触发了竞态条件:valueG = valueG + 2valueG = valueG - 1 这些操作并不是原子操作,包含多个步骤:读取 valueG 的值,进行计算,然后将结果写回 valueG

在并发环境中,这些步骤可能会被其他 Goroutine 打断,导致结果不一致;

正确的做法是为addminus加锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main

import (
"fmt"
"sync"
)

var groupG sync.WaitGroup
var mutexG sync.Mutex

var times = 100000
var valueG = 0

func addRoutine(countA int) {
defer groupG.Done()
for i := 0; i < countA; i++ {
mutexG.Lock()
valueG = valueG + 2
mutexG.Unlock()
}
}
func minusRoutine(countA int) {
defer groupG.Done()
for i := 0; i < countA; i++ {
mutexG.Lock()
valueG = valueG - 1
mutexG.Unlock()
}
}

func main(){
groupG.Add(2)
go addRoutine(times)
fmt.Printf("add value: %d\n", valueG)

go minusRoutine(times)
fmt.Printf("minus value: %d\n", valueG)

groupG.Wait()

fmt.Printf("Final value: %d\n", valueG)
}

协程和进程的区别

  1. 一个线程可以有多个协程。
  2. 大多数业务场景下,线程进程可以看做是同步机制,而协程则是异步。
  3. 线程是抢占式,而协程是非抢占式的,所以需要用户代码释放使用权来切换到其他协程,因此同一时间其实只有一个协程拥有运行权,相当于单线程的能力。
  4. 协程并不是取代线程,而且抽象于线程之上。线程是被分割的CPU资源, 协程是组织好的代码流程, 协程需要线程来承载运行。