1 并行与并发
1.1 概念
参考资料:https://talks.golang.org/2012/waza.slide
概念
特点
串行(serialism)
使用单核CPU执行,多个任务执行时间上不重叠
A任务执行完才开始B任务
并行(parallelism)
使用多核CPU执行,多个任务执行时间上是重叠的
同一时刻A、B同时执行
并发(concurrency)
跟CPU和执行没有必然关系
是一种构造程序的方式,把任务分解为一个个独立运行的小任务。通信是协调这些小任务的手段
- Go支持并发,goroutine来并发执行,channel实现消息传递和同步,select实现多路并发控制
1.2 并发模型
参考资料:https://zhuanlan.zhihu.com/p/455843256
分类
并发模型
特点
基于共享内存
(Shared Memory)
多线程编程
通过共享内存实现通信,用lock、condition规定执行顺序
基于消息传递
(Message Passing)
Actor
主角是Actor,类似一种worker,Actor彼此之间直接发送消息,不需要经过什么中介,消息是异步发送和处理的
CSP
(Communicating Sequential Processes)
worker之间不直接彼此联系,而是通过不同channel进行消息发布和侦听。消息的发送者和接收者之间通过Channel松耦合,发送者不知道自己消息被哪个接收者消费了,接收者也不知道是哪个发送者发送的消息
- Go语言的并发模型是CSP,提倡通过通信共享内存而不是通过共享内存而实现通信。
2 Go并发模型
参考资料:https://www.topgoer.com/并发编程/GMP原理与调度.html
2.1 GMP模型
基于GMP模型实现用户态线程: - G:goroutine,go协程,用户态轻量级线程,使用go关键词创建
- M:machine:内核级线程,所有的goroutine最终都要分配到M上执行
- P:processor,调度器,负责调度G到M上执行。个数为GOMAXPROC,一般与CPU核数一致
runtime.GOMAXPROCS(runtime.NumCPU())
概念解释如下:
- 全局队列(Global Queue):存放等待运行的 G。
- P 的本地队列:同全局队列类似,存放的也是等待运行的 G,存的数量有限,不超过 256 个。新建 G’时,G’优先加入到 P 的本地队列,如果队列满了,则会把本地队列中一半的 G 移动到全局队列。
- P 列表:所有的 P 都在程序启动时创建,并保存在数组中,最多有 GOMAXPROCS(可配置) 个。
- M:线程想运行任务就得获取 P,从 P 的本地队列获取 G,P 队列为空时,M 也会尝试从全局队列拿一批 G 放到 P 的本地队列,或从其他 P 的本地队列偷一半放到自己 P 的本地队列。M 运行 G,G 执行之后,M 会从 P 获取下一个 G,不断重复下去。
2.2 go func()调度流程
1、我们通过 go func () 来创建一个 goroutine;
2、有两个存储 G 的队列,一个是局部调度器 P 的本地队列、一个是全局 G 队列。新创建的 G 会先保存在 P 的本地队列中,如果 P 的本地队列已经满了就会保存在全局的队列中;
3、G 只能运行在 M 中,一个 M 必须持有一个 P,M 与 P 是 1:1 的关系。M 会从 P 的本地队列弹出一个可执行状态的 G 来执行,如果 P 的本地队列为空,就会想其他的 MP 组合偷取一个可执行的 G 来执行;
4、一个 M 调度 G 执行的过程是一个循环机制;
5、当 M 执行某一个 G 时候如果发生了 syscall 或则其余阻塞操作,M 会阻塞,如果当前有一些 G 在执行,runtime 会把这个线程 M 从 P 中摘除 (detach),然后再创建一个新的操作系统的线程 (如果有空闲的线程可用就复用空闲线程) 来服务于这个 P;
6、当 M 系统调用结束时候,这个 G 会尝试获取一个空闲的 P 执行,并放入到这个 P 的本地队列。如果获取不到 P,那么这个线程 M 变成休眠状态, 加入到空闲线程中,然后这个 G 会被放入全局队列中。
3 常用的并发原语
并发原语的适用场景:
- 共享资源。并发地读写共享资源,会出现数据竞争(data race)的问题,所以需要 Mutex、RWMutex 这样的并发原语来保护。
- 任务编排。需要 goroutine 按照一定的规律执行,而 goroutine 之间有相互等待或者依赖的顺序关系,我们常常使用 WaitGroup 或者 Channel 来实现。
- 消息传递。信息交流以及不同的 goroutine 之间的线程安全的数据交流,常常使用 Channel 来实现。
3.1 共享资源保护
1、sync.Mutex
Mutex 是使用最广泛的同步原语(Synchronization primitives),下面举例说明如何使用。 - 问题代码:没有使用锁。预期结果是100万,实际不会到100万。
package main
import (
“fmt”
“sync”
)
func main() {
var count = 0
// 使用WaitGroup等待10个goroutine完成
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
// 对变量count执行10次加1
for j := 0; j < 100000; j++ {
count++
}
}()
}
// 等待10个goroutine完成
wg.Wait()
fmt.Println(count)
}
原因:count变量是临界区,count++不是原子操作(读变量值、值++、写变量值)
检测工具:go提供了检测数据竞争的工具:
go run -race xxxx.go解决:count++之前加锁,之后释放锁
package main
import (
“fmt”
“sync”
)
func main() {
// 互斥锁保护计数器
var mu sync.Mutex
// 计数器的值
var count = 0
// 辅助变量,用来确认所有的goroutine都完成
var wg sync.WaitGroup
wg.Add(10)
// 启动10个gourontine
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
// 累加10万次
for j := 0; j < 100000; j++ {
mu.Lock()
count++
mu.Unlock()
}
}()
}
wg.Wait()
fmt.Println(count)
}
使用Mutex常见的踩坑:
- Lock/Unlock 不配对:只有lock没有unlock会死锁,没有lock直接unlock会panic
- 重入:mutex是不可重入锁。已经获得了资源的lock后,同一个协程再lock也会阻塞等待
- Copy 已使用的 Mutex:临界区被lock的状态可能被同时拷贝,无法获得lock,可以用go vet检测
2、sync.RWMutex
如果是写少读多的场景,可以读写分离,缩小锁的粒度,提升性能
注意: - 用defer来unlock可能导致不必要的加锁时间,所以尽可能在临界区后立即执行unlock
package main
import (
“sync”
“time”
)
func main() {
var counter CounterForRW
for i := 0; i < 10; i++ { // 10个reader
go func() {
for {
counter.Count() // 计数器读操作
time.Sleep(time.Millisecond)
}
}()
}
for { // 一个writer
counter.Incr() // 计数器写操作
time.Sleep(time.Second)
}
}
// 一个线程安全的计数器
type CounterForRW struct {
mu sync.RWMutex
count uint64
}
// 使用写锁保护
func (c *CounterForRW) Incr() {
c.mu.Lock()
c.count++
c.mu.Unlock()
}
// 使用读锁保护
func (c *CounterForRW) Count() uint64 {
c.mu.RLock()
defer c.mu.RUnlock()
return c.count
}
3、sync/atomic
不需要锁的原子操作。例如把上面加100万的代码,用atomic操作替换mutex:
package main
import (
“fmt”
“sync”
“sync/atomic”
)
func main() {
// 计数器的值
var count int32 = 0
// 辅助变量,用来确认所有的goroutine都完成
var wg sync.WaitGroup
wg.Add(10)
// 启动10个gourontine
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
// 累加10万次
for j := 0; j < 100000; j++ {
atomic.AddInt32(&count, 1)
}
}()
}
wg.Wait()
fmt.Println(count)
}
4、map
常见的坑:
- 未初始化:会panic
package main
func main() {
var m map[int]int // 异常
//m := map[int]int{} // 正常
//m := make(map[int]int, 0) //正常
m[100] = 100
}
- 并发读写:map不是协程安全的,以下代码会panic
package main
func main() {
var m = make(map[int]int, 10) // 初始化一个map
go func() {
for {
m[1] = 1 //设置key
}
}()
go func() {
for {
_ = m[2] //访问这个map
}
}()
select {}
}
解决map并发读写问题:
- 加锁
package main
import “sync”
func main() {
mu := sync.RWMutex{}
var m = make(map[int]int, 10) // 初始化一个map
go func() {
for {
mu.Lock()
m[1] = 1 //设置key
mu.Unlock()
}
}()
go func() {
for {
mu.RLock()
_ = m[2] //访问这个map
mu.RUnlock()
}
}()
select {}
}
- 官方sync.Map,适用场景:
- 只会增长的缓存系统中,一个 key 只写入一次而被读很多次;
- 多个 goroutine 为不相交的键集读、写和重写键值对。
package main
import “sync”
func main() {
m := sync.Map{}
go func() {
for {
m.Store(1, 1)
}
}()
go func() {
for {
m.Load(2)
}
}()
select {}
}
3.2 任务编排
1、sync.WaitGroup
等待组,用于并发任务的同步
示例:并发请求多个url
package main
import (
“fmt”
“net/http”
“sync”
)
func main() {
// 声明一个等待组
var wg sync.WaitGroup
// 准备一系列的网站地址
var urls = []string{
"http://www.github.com/",
"https://www.qiniu.com/",
"https://www.golangtc.com/",
}
// 遍历这些地址
for _, url := range urls {
// 每一个任务开始时, 将等待组增加1
wg.Add(1)
// 开启一个并发
go func(url string) {
// 使用defer, 表示函数完成时将等待组值减1
defer wg.Done()
// 使用http访问提供的地址
_, err := http.Get(url)
// 访问完成后, 打印地址和可能发生的错误
fmt.Println(url, err)
// 通过参数传递url地址
}(url)
}
// 等待所有的任务完成
wg.Wait()
fmt.Println("over")
}
正确使用姿势:
- 先add后wait
- 保证add和done数量相同:add多会无限等待,done多会panic
- 不要复制重用:必要的场景下可以传引用
2、sync.Once
Once 可以用来执行且仅仅执行一次动作,常常用于单例对象的初始化场景,示例:
package main
import (
“fmt”
“sync”
)
func main() {
var once sync.Once
// 第一个初始化函数
f1 := func() {
fmt.Println("in f1")
}
once.Do(f1) // 打印出 in f1
// 第二个初始化函数
f2 := func() {
fmt.Println("in f2")
}
once.Do(f2) // 无输出
}
只执行一次的一些实现方式和区别:
- 启动即初始化
- package变量
- init方法
- 支持延迟初始化
- sync.Once
不要踩坑:
- sync.Once
- Do中不要调用本身的Do(避免递归调用),否则会死锁
3、singleflight
把相同的并发请求合并为一次请求:当多个goroutine并发调用同一个函数的时候,可以只让一个goroutine去调用,查到结果后分享给这些goroutine。常用于缓存读场景,缓解热点key对数据库的压力。
举例:根据uid从db查询userName,每10ms来一个查询请求,单次db查询耗时50ms。
1、不使用singleflight,get from db操作有10次,userName都查到了zhangsan
package main
import (
“fmt”
“sync”
“time”
)
func main() {
//sf := singleflight.Group{}
uid := 100
wg := sync.WaitGroup{}
wg.Add(10)
for i := 0; i < 10; i++ {
go func(i int, uid int) {
userName := getUserNameFromDb(i, uid)
//userName, _, _ := sf.Do(strconv.Itoa(uid), func() (interface{}, error) {
// return getUserNameFromDb(i, uid), nil
//})
fmt.Printf("i=%d uid=%d userName=%s\n", i, uid, userName)
wg.Done()
}(i, uid)
//每10ms过来一个请求
time.Sleep(10 * time.Millisecond)
}
wg.Wait()
}
//根据uid从db查userName,模拟耗时50ms
func getUserNameFromDb(i, uid int) string {
fmt.Printf(“get from db i=%d uid=%d\n”, i, uid)
time.Sleep(50 * time.Millisecond)
if uid == 100 {
return “zhangsan”
}
return “”
}
2、使用singleflight,get from db操作只有2次,userName也都查到了zhangsan
package main
import (
“fmt”
“golang.org/x/sync/singleflight”
“strconv”
“sync”
“time”
)
func main() {
sf := singleflight.Group{}
uid := 100
wg := sync.WaitGroup{}
wg.Add(10)
for i := 0; i < 10; i++ {
go func(i int, uid int) {
//userName := getUserNameFromDb(i, uid)
userName, _, _ := sf.Do(strconv.Itoa(uid), func() (interface{}, error) {
return getUserNameFromDb(i, uid), nil
})
fmt.Printf("i=%d uid=%d userName=%s\n", i, uid, userName)
wg.Done()
}(i, uid)
//每10ms过来一个请求
time.Sleep(10 * time.Millisecond)
}
wg.Wait()
}
//根据uid从db查userName,模拟耗时50ms
func getUserNameFromDb(i, uid int) string {
fmt.Printf(“get from db i=%d uid=%d\n”, i, uid)
time.Sleep(50 * time.Millisecond)
if uid == 100 {
return “zhangsan”
}
return “”
}
4、context
- 用于上下文传递
- WithValue 基于 parent Context 生成一个新的 Context,保存了一个 key-value 键值对
- Value方法会优先从自己存储中查找key,不存在的话向parent Context链式查找
package main
import (
“context”
“fmt”
)
func main() {
ctx1 := context.WithValue(context.Background(), "key1", "0001")
ctx2 := context.WithValue(ctx1, "key2", "0002")
print(ctx1, ctx2)
}
func print(ctx1 context.Context, ctx2 context.Context) {
fmt.Println(ctx1.Value("key1"))
fmt.Println(ctx1.Value("key2"))
fmt.Println(ctx2.Value("key1"))
fmt.Println(ctx2.Value("key2"))
}
- cancel用于任务状态同步(检查Done)
- cancel 是向下传递的,如果一个 WithCancel 生成的 Context 被 cancel 时,如果它的子 Context(也有可能是孙,或者更低,依赖子的类型)也是 cancelCtx 类型的,就会被 cancel,但是不会向上传递。
package main
- cancel 是向下传递的,如果一个 WithCancel 生成的 Context 被 cancel 时,如果它的子 Context(也有可能是孙,或者更低,依赖子的类型)也是 cancelCtx 类型的,就会被 cancel,但是不会向上传递。
import (
“context”
“fmt”
“time”
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
//启动新协程干活,干完后会cancel context
go doSomething(cancel)
//等待context结束
<-ctx.Done()
fmt.Println("ctx is done")
}
func doSomething(cancel context.CancelFunc) {
fmt.Println(“start do something”, time.Now())
<-time.After(3 * time.Second)
fmt.Println(“finish do something”, time.Now())
cancel()
}
- timeout超时控制
- WithTimeout 会返回一个 parent 的副本,并且设置duration时间段后结束
- WithDeadline 会返回一个 parent 的副本,并且设置到一个时刻结束
package main
import (
“context”
“fmt”
“time”
)
func main() {
//3秒后退出
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
foooo(ctx)
}
func foooo(ctx context.Context) {
fmt.Println(time.Now())
//等context超时退出
<-ctx.Done()
fmt.Println(time.Now())
}
3.3 channel消息传递
channel的实现是一个循环队列。
channel存在3种状态:
- nil,未初始化的状态,只进行了声明,或者手动赋值为nil
- active,正常的channel,可读或者可写
- closed,已关闭,千万不要误认为关闭channel后,channel的值是nil(关闭后可以继续读到channel的数据)
channel可进行3种操作: - 读
- 写
- 关闭
9种组合:
for range
不断从channel读取数据,直到读到channel关闭的信号(channel关闭后,原有的数据可以继续读到)
package main
import (
“fmt”
)
func main() {
//新建chan,写10个数据后关闭
ch := make(chan int, 10)
for i := 0; i < 10; i++ {
ch <- i
}
close(ch)
//消费chan的数据
for data := range ch {
fmt.Println(data)
}
}
异步队列
package main
import (
“fmt”
“time”
)
func producer(ch chan time.Time) {
for {
ch <- time.Now()
time.Sleep(time.Second)
}
}
func consumer(ch chan time.Time) {
for {
fmt.Println(<-ch)
}
}
func main() {
ch := make(chan time.Time, 1)
go producer(ch)
go consumer(ch)
//主协程阻塞
ch1 := make(chan chan int)
<-ch1
}
select多路选择
同时阻塞,知道最先读到的channel被处理
package main
import (
“context”
“fmt”
“time”
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
select {
case <-time.After(1 * time.Second):
fmt.Println("time after 1 sencond") // 会执行
case <-ctx.Done():
fmt.Println("ctx timeout after 2 second") // 不会执行
}
}
超时和定时任务
超时
package main
import (
“fmt”
“time”
)
func main() {
fmt.Println(time.Now())
<-time.After(time.Second)
fmt.Println(time.Now())
}
定时任务
package main
import (
“fmt”
“time”
)
func producer(ch chan time.Time) {
//for {
// ch <- time.Now()
// time.Sleep(time.Second)
//}
//改成定时器
ticker := time.NewTicker(time.Second)
for {
ch <- <-ticker.C
}
}
func consumer(ch chan time.Time) {
for {
fmt.Println(<-ch)
}
}
func main() {
ch := make(chan time.Time, 1)
go producer(ch)
go consumer(ch)
//主协程阻塞
ch1 := make(chan chan int)
<-ch1
}
并发数控制
package main
import (
“fmt”
“sync”
“time”
)
func main() {
ch := make(chan int, 10) // 控制最多10个协程并发
wg := sync.WaitGroup{} //等待子协程结束
//100个任务,共需要处理10s
for i := 0; i < 100; i++ {
wg.Add(1)
ch <- 1
go func() {
doWork()
wg.Done()
<-ch
}()
}
wg.Wait()
}
//每个子任务耗时1s
func doWork() {
fmt.Println(time.Now())
time.Sleep(time.Second)
}
或者基于timer实现的limiter:
package main
import (
“context”
“fmt”
“golang.org/x/time/rate”
“time”
)
func main() {
ctx := context.Background()
limiter := rate.NewLimiter(2, 1) // 每秒最多调用2次
for {
limiter.Wait(ctx)
go doWork()
}
}
//每个子任务耗时1s
func doWork() {
fmt.Println(time.Now())
time.Sleep(time.Second)
}