Golang非CSP并发模型外的其他并行方法总结

2018-12-18

Golang最为让人熟知的并发模型当属CSP并发模型,也就是由goroutine和channel构成的GMP并发模型,具体内容不在赘述了,可以翻回之前的文章查看。在这里,要讲讲Golang的其他并发方式。

Golang不仅可以使用CSP并发模式,还可以使用传统的共享数据的并发模式。

临界区(critical section)

这是传统语言比较常用的的方式,即加锁。加锁使其线程同步,每次只允许一个goroutine进入某个代码块,此代码块区域称之为”临界区(critical section)”。

Golang为临界区(critical section)提供的是互斥锁的包和条件变量的包。

互斥锁

就是通常使用的锁,用来让线程串行用的。Golang提供了互斥锁sync.Mutex和读写互斥锁 sync.RWMutex,用法极其简单:

1
2
3
4
5
6
7
var s sync.Mutex

s.Lock()

// 这里的代码就是串行了,吼吼吼。。。

s.Unlock()

Lock和Unlock

sync.Mutexsync.RWMutex的区别

没啥大的区别,只不过sync.RWMutex更加细腻,可以将“读操作”和“写操作”区别对待。
sync.RWMutex中的Lock和unLock针对写操作

1
2
3
4
5
6
7
var s sync.RWMutex

s.Lock()

// 上写锁了,吼吼

s.Unlock()

sync.RWMutex中的RLock和RUnLock针对读操作

1
2
3
4
5
6
7
var s sync.RWMutex

s.RLock()

// 上读锁了,吼吼..

s.RUnlock()

读写锁有以下规则:

  • 写锁被锁定,(再试图进行)读锁和写锁都阻塞
  • 读锁被锁定,(再试图进行)写锁阻塞,(再试图进行)读锁不阻塞

即:多个写操作不能同时进行,写操作和读操作也不能同时进行,多个读操作可以同时进行

注意事项:

  • 不要重复锁定互斥锁;因为代码写起来麻烦,容易出错,万一死锁(deadlock)了就废了。Go语言运行时系统自己抛出的panic都属于致命错误,都是无法恢复的,调用recover函数对它们起不到任何作用。一旦产生死锁,程序必然崩溃。
  • 锁定和解锁一定要成对出现,如果怕忘记解锁,最好是使用defer语句来解锁;但是,一定不要对未锁定的或者已经锁定的互斥锁解锁,因为会触发panic,而且此panic和死锁一样,属于致命错误,程序肯定崩溃
  • sync.Mutex是个结构体,尽量不要其当做参数,在多个函数直接传播。因为没啥意义,Golang的参数都是副本,多个副本之间都是相互独立的。

条件变量Cond

互斥锁是用来锁住资源,“创造”临界区的。而条件变量Cond可以认为是用来自行调度线程(在此即为groutine)的,当某个状态时,阻塞等待,当状态改变时,唤醒。

Cond的使用,离不开互斥锁,即离不开sync.Mutexsync.RWMutex
Cond初始化都需要有个互斥锁。(ps:哪怕初始化不需要,就应用场景而言,也得需要个互斥锁)

Cond提供Wait、Signal、Broadcast 三种方法。
Wait表示线程(groutine)阻塞等待;
Signal表示唤醒等待的groutine;
Broadcast表示唤醒等待的所有groutine;

初始化:

1
cond := sync.NewCond(&sync.Mutex{})

在其中一个groutine中:

1
2
3
4
5
6
cond.L.Lock()
for status == 0 {
cond.Wait()
}
//状态改变,goroutine被唤醒后,干点啥。。。
cond.L.Unlock()

以上算是模板

在另外一个groutine中:

1
2
3
4
cond.L.Lock()
status = 1
cond.Signal() // 或者使用cond.Broadcast()来唤醒以上groutine中沉睡的groutine
cond.L.Unlock()

原子操作(atomicity)

原子操作是硬件芯片级别的支持,所以可以保证绝对的线程安全。而且执行效率比其他方式要高出好几个数量级。

Go语言的原子操作当然也是基于CPU和操作系统的,Go语言提供的原子操作的包是sync/atomic,此包提供了加(Add)、CAS(交换并比较 compare and swap)、成对出现的存储(store)和加载(load)以及交换(swap)。

此包提供的大多数函数针对的数据类型也非常的单一:只有整型!使用方式十分的简单,看着函数直接调用就好。

1
2
3
var a int32
a = 1
a = atomic.AddInt32(&a, 2) //此处是原子操作,就这么简单,吼吼

在此特别强调一下CAS,CAS对应的函数前缀是“CompareAndSwap”,含义和用法正如英文翻译:比较并交换。在进行CAS操作的时候,函数会先判断被操作变量的当前值是否与我们预期的旧值相等,如果相等,它就把新值赋给该变量,并返回true,反之,就忽略此操作,并返回false。

可能是Golang提供的原子操作的数据类型实在是有限,Go又补充了一个结构体atomic.Value,此结构体相当于一个小容器,可以提供原子操作的存储store和提取load

1
2
3
4
5
6
var atomicVal atomic.Value
str := "hello"

atomicVal.Store(str) //此处是原子操作哦

newStr := atomicVal.Load() //此处是原子操作哦

其他

为了能更好的调度goroutine,Go提供了sync.WaitGroupsync.Once还有context

sync.WaitGroup

sync.WaitGroup的作用就是在多goroutine并发程序中,让主goroutine等待所有goroutine执行结束。(直接查看代码注释)
sync.WaitGroup提供了三个函数AddDoneWait三者用法如下:

  • Add 写在主goroutine中,参数为将要运行的goroutine的数量
  • Done 写在各个非主goroutine中,表示运行结束
  • Wait 写在主goroutine中,block主goroutine,等待所有其他goroutine运行结束
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
var wait sync.WaitGroup

wait.Add(2) //必须是运行的goroutine的数量

go func() {
//TODO 一顿小操作
defer wait.Done() // done函数用在goroutine中,表示goroutine操作结束
}()

go func() {
//TODO 一顿小操作
defer wait.Done() // done函数用在goroutine中,表示goroutine操作结束
}()

wait.Wait() // block住了,直到所有goroutine都结束

注意

sync.WaitGroup中有一个计数器,记录的是需要等待的goroutine的数量,默认值是0,可以通过Add方法来增加或者减少值,但是切记,千万不能让计数器的值小于零,会触发panic!

sync.WaitGroup调用Wait方法的时候,sync.WaitGroup中计数器的值一定要为0。因此Add中的值一定要等于非主goroutine的数量!
且不要把Add和Wait方法放到不同的goroutine中执行!

sync.Once

真真正正的只执行一次。

sync.Once只要一个方法:Do,里面就一个参数:func。多说无益,复制下面代码,猜猜执行结果就知道了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
"fmt"
"sync"
)

func main() {
var once sync.Once
onceBody := func() {
fmt.Println("Only once")
}
done := make(chan bool)
for i := 0; i < 10; i++ {
go func() {
once.Do(onceBody)
done <- true
}()
}
for i := 0; i < 10; i++ {
<-done
}
}

执行结果

1
Only once

没错,只有一行。真只执行了一次。

context

context可以用来实现一对多的goroutine协作。这个包的应用场景主要是在API中。字面意思也很直接,上下文。当一个请求来时,会产生一个goroutine,但是这个goroutine往往要衍生出许多额外的goroutine去处理操作,例如链接database、请求rpc请求。。等等,这些衍生的goroutine和主goroutine有很多公用数据的,例如同一个请求生命周期、用户认证信息、token等,当这个请求超时或者被取消的时候,这里所有的goroutine都应该结束。context就可以帮助我们达到这个效果。

很显然,主goroutine和衍生的所有子goroutine之间形成了一颗树结构。我们的context可以从根节点遍布整棵树,当然,是线程安全的。

线程之间的基本是这样的:

1
2
3
func DoSomething(ctx context.Context, arg Arg) error {
// ... use ctx ...
}

有两个根context:background和todo;这两个根都是contenxt空的,没有值的。两者也没啥太本质的区别,Background是最常用的,作为Context这个树结构的最顶层的Context,它不能被取消。当不知道用啥context的时候就可以用TODO。

根生成子节点有以下方法:

1
2
3
4
5
6
7
8
9
10
11
//生成可撤销的Context (手动撤销)
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)

//生成可定时撤销的Context (定时撤销)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)

//也是生成可定时撤销的Context (定时撤销)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

//不可撤销的Context,可以存一个kv的值
func WithValue(parent Context, key, val interface{}) Context

可撤销的Context

以下是每个方法的调用方式(全都来自godoc,可粘贴复用):
可撤销的func WithCancel(parent Context) (ctx Context, cancel CancelFunc)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
gen := func(ctx context.Context) <-chan int {
dst := make(chan int)
n := 1
go func() {
for {
select {
case <-ctx.Done(): //只有撤销函数被调用后,才会触发
return
case dst <- n:
n++
}
}
}()
return dst
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel() //调用返回的cancel方法来让 context声明周期结束

for n := range gen(ctx) {
fmt.Println(n)
if n == 5 {
break
}
}

要想结束所有线程,就调用ctx, cancel := context.WithCancel(context.Background())函数返回的cancel函数即可,当撤销函数被调用之后,对应的Context值会先关闭它内部的接收通道,也就是它的Done方法返回的通道。

WithDeadlineWithTimeout用法基本类似,而且WithTimeout函数内部调用了WithDeadline函数。两者唯一区别是WithTimeout表示从现在开始xxx超时,而WithDeadline的时间可以是之前的时间:意思是说WithTimeout表示从现在开始, xxx时间后超时。而WithDeadline表示xx时间点,结束!这个时间点可以是昨天,时间点不收任何限制。

以下是godoc给出的列子:

WithDeadline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
"context"
"fmt"
"time"
)

func main() {
d := time.Now().Add(50 * time.Millisecond)
ctx, cancel := context.WithDeadline(context.Background(), d)

// Even though ctx will be expired, it is good practice to call its
// cancelation function in any case. Failure to do so may keep the
// context and its parent alive longer than necessary.
defer cancel() //时间超时会自动调用

select {
case <-time.After(1 * time.Second):
fmt.Println("overslept")
case <-ctx.Done():
fmt.Println(ctx.Err())
}

}

输出:

1
context deadline exceeded

WithTimeout

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import (
"context"
"fmt"
"time"
)

func main() {
// Pass a context with a timeout to tell a blocking function that it
// should abandon its work after the timeout elapses.
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel() //时间超时会自动调用

select {
case <-time.After(1 * time.Second):
fmt.Println("overslept")
case <-ctx.Done():
fmt.Println(ctx.Err()) // prints "context deadline exceeded"
}

}

输出:

1
context deadline exceeded

不可撤销的context,传递值

WithValue可以用来在传递值的,值的存取是以KV的形式来进行的。直接上例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
type favContextKey string

f := func(ctx context.Context, k favContextKey) {
if v := ctx.Value(k); v != nil {
fmt.Println("found value:", v)
return
}
fmt.Println("key not found:", k)
}

k := favContextKey("language")
k1 := favContextKey("Chinese")
ctx := context.WithValue(context.Background(), k, "Go")
ctx1 := context.WithValue(ctx, k1, "Go1")

f(ctx1, k1)
f(ctx1, k)

输出:

1
2
found value: Go1
found value: Go