一五一、Go入门到进阶:并发编程

1 并行与并发
1.1 概念
参考资料:https://talks.golang.org/2012/waza.slide
概念
特点
串行(serialism)
使用单核CPU执行,多个任务执行时间上不重叠
A任务执行完才开始B任务
并行(parallelism)
使用多核CPU执行,多个任务执行时间上是重叠的
同一时刻A、B同时执行
并发(concurrency)
跟CPU和执行没有必然关系
是一种构造程序的方式,把任务分解为一个个独立运行的小任务。通信是协调这些小任务的手段

  • Go支持并发,goroutine来并发执行,channel实现消息传递和同步,select实现多路并发控制
    1.2 并发模型
    参考资料:https://zhuanlan.zhihu.com/p/455843256
    分类
    并发模型
    特点
    基于共享内存
    (Shared Memory)
    多线程编程
    通过共享内存实现通信,用lock、condition规定执行顺序
    基于消息传递
    (Message Passing)
    Actor
    主角是Actor,类似一种worker,Actor彼此之间直接发送消息,不需要经过什么中介,消息是异步发送和处理的

CSP
(Communicating Sequential Processes)
worker之间不直接彼此联系,而是通过不同channel进行消息发布和侦听。消息的发送者和接收者之间通过Channel松耦合,发送者不知道自己消息被哪个接收者消费了,接收者也不知道是哪个发送者发送的消息

  • Go语言的并发模型是CSP,提倡通过通信共享内存而不是通过共享内存而实现通信。
    2 Go并发模型
    参考资料:https://www.topgoer.com/并发编程/GMP原理与调度.html
    2.1 GMP模型
    基于GMP模型实现用户态线程:
  • G:goroutine,go协程,用户态轻量级线程,使用go关键词创建
  • M:machine:内核级线程,所有的goroutine最终都要分配到M上执行
  • P:processor,调度器,负责调度G到M上执行。个数为GOMAXPROC,一般与CPU核数一致
    runtime.GOMAXPROCS(runtime.NumCPU())

概念解释如下:

  • 全局队列(Global Queue):存放等待运行的 G。
  • P 的本地队列:同全局队列类似,存放的也是等待运行的 G,存的数量有限,不超过 256 个。新建 G’时,G’优先加入到 P 的本地队列,如果队列满了,则会把本地队列中一半的 G 移动到全局队列。
  • P 列表:所有的 P 都在程序启动时创建,并保存在数组中,最多有 GOMAXPROCS(可配置) 个。
  • M:线程想运行任务就得获取 P,从 P 的本地队列获取 G,P 队列为空时,M 也会尝试从全局队列拿一批 G 放到 P 的本地队列,或从其他 P 的本地队列偷一半放到自己 P 的本地队列。M 运行 G,G 执行之后,M 会从 P 获取下一个 G,不断重复下去。
    2.2 go func()调度流程

​ 1、我们通过 go func () 来创建一个 goroutine;
​ 2、有两个存储 G 的队列,一个是局部调度器 P 的本地队列、一个是全局 G 队列。新创建的 G 会先保存在 P 的本地队列中,如果 P 的本地队列已经满了就会保存在全局的队列中;
​ 3、G 只能运行在 M 中,一个 M 必须持有一个 P,M 与 P 是 1:1 的关系。M 会从 P 的本地队列弹出一个可执行状态的 G 来执行,如果 P 的本地队列为空,就会想其他的 MP 组合偷取一个可执行的 G 来执行;
​ 4、一个 M 调度 G 执行的过程是一个循环机制;
​ 5、当 M 执行某一个 G 时候如果发生了 syscall 或则其余阻塞操作,M 会阻塞,如果当前有一些 G 在执行,runtime 会把这个线程 M 从 P 中摘除 (detach),然后再创建一个新的操作系统的线程 (如果有空闲的线程可用就复用空闲线程) 来服务于这个 P;
​ 6、当 M 系统调用结束时候,这个 G 会尝试获取一个空闲的 P 执行,并放入到这个 P 的本地队列。如果获取不到 P,那么这个线程 M 变成休眠状态, 加入到空闲线程中,然后这个 G 会被放入全局队列中。
3 常用的并发原语
并发原语的适用场景:

  • 共享资源。并发地读写共享资源,会出现数据竞争(data race)的问题,所以需要 Mutex、RWMutex 这样的并发原语来保护。
  • 任务编排。需要 goroutine 按照一定的规律执行,而 goroutine 之间有相互等待或者依赖的顺序关系,我们常常使用 WaitGroup 或者 Channel 来实现。
  • 消息传递。信息交流以及不同的 goroutine 之间的线程安全的数据交流,常常使用 Channel 来实现。
    3.1 共享资源保护
    1、sync.Mutex
    Mutex 是使用最广泛的同步原语(Synchronization primitives),下面举例说明如何使用。
  • 问题代码:没有使用锁。预期结果是100万,实际不会到100万。
    package main

import (
“fmt”
“sync”
)

func main() {
var count = 0
// 使用WaitGroup等待10个goroutine完成
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
// 对变量count执行10次加1
for j := 0; j < 100000; j++ {
count++
}
}()
}
// 等待10个goroutine完成
wg.Wait()
fmt.Println(count)
}

  • 原因:count变量是临界区,count++不是原子操作(读变量值、值++、写变量值)

  • 检测工具:go提供了检测数据竞争的工具:
    go run -race xxxx.go

  • 解决:count++之前加锁,之后释放锁
    package main

import (
“fmt”
“sync”
)

func main() {
// 互斥锁保护计数器
var mu sync.Mutex
// 计数器的值
var count = 0

// 辅助变量,用来确认所有的goroutine都完成
var wg sync.WaitGroup
wg.Add(10)

// 启动10个gourontine
for i := 0; i < 10; i++ {
	go func() {
		defer wg.Done()
		// 累加10万次
		for j := 0; j < 100000; j++ {
			mu.Lock()
			count++
			mu.Unlock()
		}
	}()
}
wg.Wait()
fmt.Println(count)

}

使用Mutex常见的踩坑:

  • Lock/Unlock 不配对:只有lock没有unlock会死锁,没有lock直接unlock会panic
  • 重入:mutex是不可重入锁。已经获得了资源的lock后,同一个协程再lock也会阻塞等待
  • Copy 已使用的 Mutex:临界区被lock的状态可能被同时拷贝,无法获得lock,可以用go vet检测
    2、sync.RWMutex
    如果是写少读多的场景,可以读写分离,缩小锁的粒度,提升性能
    注意:
  • 用defer来unlock可能导致不必要的加锁时间,所以尽可能在临界区后立即执行unlock
    package main

import (
“sync”
“time”
)

func main() {
var counter CounterForRW
for i := 0; i < 10; i++ { // 10个reader
go func() {
for {
counter.Count() // 计数器读操作
time.Sleep(time.Millisecond)
}
}()
}

for { // 一个writer
	counter.Incr() // 计数器写操作
	time.Sleep(time.Second)
}

}

// 一个线程安全的计数器
type CounterForRW struct {
mu sync.RWMutex
count uint64
}

// 使用写锁保护
func (c *CounterForRW) Incr() {
c.mu.Lock()
c.count++
c.mu.Unlock()
}

// 使用读锁保护
func (c *CounterForRW) Count() uint64 {
c.mu.RLock()
defer c.mu.RUnlock()
return c.count
}

3、sync/atomic
不需要锁的原子操作。例如把上面加100万的代码,用atomic操作替换mutex:
package main

import (
“fmt”
“sync”
“sync/atomic”
)

func main() {
// 计数器的值
var count int32 = 0

// 辅助变量,用来确认所有的goroutine都完成
var wg sync.WaitGroup
wg.Add(10)

// 启动10个gourontine
for i := 0; i < 10; i++ {
	go func() {
		defer wg.Done()
		// 累加10万次
		for j := 0; j < 100000; j++ {
			atomic.AddInt32(&count, 1)
		}
	}()
}
wg.Wait()
fmt.Println(count)

}

4、map
常见的坑:

  • 未初始化:会panic
    package main

func main() {
var m map[int]int // 异常
//m := map[int]int{} // 正常
//m := make(map[int]int, 0) //正常
m[100] = 100
}

  • 并发读写:map不是协程安全的,以下代码会panic
    package main

func main() {
var m = make(map[int]int, 10) // 初始化一个map
go func() {
for {
m[1] = 1 //设置key
}
}()

go func() {
	for {
		_ = m[2] //访问这个map
	}
}()
select {}

}

解决map并发读写问题:

  1. 加锁
    package main

import “sync”

func main() {
mu := sync.RWMutex{}
var m = make(map[int]int, 10) // 初始化一个map
go func() {
for {
mu.Lock()
m[1] = 1 //设置key
mu.Unlock()
}
}()

go func() {
	for {
		mu.RLock()
		_ = m[2] //访问这个map
		mu.RUnlock()
	}
}()
select {}

}

  1. 官方sync.Map,适用场景:
  • 只会增长的缓存系统中,一个 key 只写入一次而被读很多次;
  • 多个 goroutine 为不相交的键集读、写和重写键值对。
    package main

import “sync”

func main() {
m := sync.Map{}
go func() {
for {
m.Store(1, 1)
}
}()

go func() {
	for {
		m.Load(2)
	}
}()
select {}

}

3.2 任务编排
1、sync.WaitGroup
等待组,用于并发任务的同步
示例:并发请求多个url
package main

import (
“fmt”
“net/http”
“sync”
)

func main() {

// 声明一个等待组
var wg sync.WaitGroup

// 准备一系列的网站地址
var urls = []string{
	"http://www.github.com/",
	"https://www.qiniu.com/",
	"https://www.golangtc.com/",
}

// 遍历这些地址
for _, url := range urls {

	// 每一个任务开始时, 将等待组增加1
	wg.Add(1)

	// 开启一个并发
	go func(url string) {

		// 使用defer, 表示函数完成时将等待组值减1
		defer wg.Done()

		// 使用http访问提供的地址
		_, err := http.Get(url)

		// 访问完成后, 打印地址和可能发生的错误
		fmt.Println(url, err)

		// 通过参数传递url地址
	}(url)
}

// 等待所有的任务完成
wg.Wait()

fmt.Println("over")

}

正确使用姿势:

  • 先add后wait
  • 保证add和done数量相同:add多会无限等待,done多会panic
  • 不要复制重用:必要的场景下可以传引用
    2、sync.Once
    Once 可以用来执行且仅仅执行一次动作,常常用于单例对象的初始化场景,示例:
    package main

import (
“fmt”
“sync”
)

func main() {
var once sync.Once

// 第一个初始化函数
f1 := func() {
	fmt.Println("in f1")
}
once.Do(f1) // 打印出 in f1

// 第二个初始化函数
f2 := func() {
	fmt.Println("in f2")
}
once.Do(f2) // 无输出

}

只执行一次的一些实现方式和区别:

  • 启动即初始化
    • package变量
    • init方法
  • 支持延迟初始化
    • sync.Once
      不要踩坑:
  • Do中不要调用本身的Do(避免递归调用),否则会死锁

3、singleflight
把相同的并发请求合并为一次请求:当多个goroutine并发调用同一个函数的时候,可以只让一个goroutine去调用,查到结果后分享给这些goroutine。常用于缓存读场景,缓解热点key对数据库的压力。

举例:根据uid从db查询userName,每10ms来一个查询请求,单次db查询耗时50ms。
1、不使用singleflight,get from db操作有10次,userName都查到了zhangsan
package main

import (
“fmt”
“sync”
“time”
)

func main() {

//sf := singleflight.Group{}
uid := 100

wg := sync.WaitGroup{}
wg.Add(10)
for i := 0; i < 10; i++ {
	go func(i int, uid int) {
		userName := getUserNameFromDb(i, uid)
		//userName, _, _ := sf.Do(strconv.Itoa(uid), func() (interface{}, error) {
		//	return getUserNameFromDb(i, uid), nil
		//})
		fmt.Printf("i=%d uid=%d userName=%s\n", i, uid, userName)
		wg.Done()
	}(i, uid)

	//每10ms过来一个请求
	time.Sleep(10 * time.Millisecond)
}
wg.Wait()

}

//根据uid从db查userName,模拟耗时50ms
func getUserNameFromDb(i, uid int) string {
fmt.Printf(“get from db i=%d uid=%d\n”, i, uid)
time.Sleep(50 * time.Millisecond)
if uid == 100 {
return “zhangsan”
}
return “”
}

2、使用singleflight,get from db操作只有2次,userName也都查到了zhangsan
package main

import (
“fmt”
“golang.org/x/sync/singleflight”
“strconv”
“sync”
“time”
)

func main() {

sf := singleflight.Group{}
uid := 100

wg := sync.WaitGroup{}
wg.Add(10)
for i := 0; i < 10; i++ {
	go func(i int, uid int) {
		//userName := getUserNameFromDb(i, uid)
		userName, _, _ := sf.Do(strconv.Itoa(uid), func() (interface{}, error) {
			return getUserNameFromDb(i, uid), nil
		})
		fmt.Printf("i=%d uid=%d userName=%s\n", i, uid, userName)
		wg.Done()
	}(i, uid)

	//每10ms过来一个请求
	time.Sleep(10 * time.Millisecond)
}
wg.Wait()

}

//根据uid从db查userName,模拟耗时50ms
func getUserNameFromDb(i, uid int) string {
fmt.Printf(“get from db i=%d uid=%d\n”, i, uid)
time.Sleep(50 * time.Millisecond)
if uid == 100 {
return “zhangsan”
}
return “”
}

4、context

  • 用于上下文传递
    • WithValue 基于 parent Context 生成一个新的 Context,保存了一个 key-value 键值对
    • Value方法会优先从自己存储中查找key,不存在的话向parent Context链式查找
      package main

import (
“context”
“fmt”
)

func main() {

ctx1 := context.WithValue(context.Background(), "key1", "0001")
ctx2 := context.WithValue(ctx1, "key2", "0002")
print(ctx1, ctx2)

}

func print(ctx1 context.Context, ctx2 context.Context) {

fmt.Println(ctx1.Value("key1"))
fmt.Println(ctx1.Value("key2"))

fmt.Println(ctx2.Value("key1"))
fmt.Println(ctx2.Value("key2"))

}

  • cancel用于任务状态同步(检查Done)
    • cancel 是向下传递的,如果一个 WithCancel 生成的 Context 被 cancel 时,如果它的子 Context(也有可能是孙,或者更低,依赖子的类型)也是 cancelCtx 类型的,就会被 cancel,但是不会向上传递。
      package main

import (
“context”
“fmt”
“time”
)

func main() {
ctx, cancel := context.WithCancel(context.Background())

//启动新协程干活,干完后会cancel context
go doSomething(cancel)

//等待context结束
<-ctx.Done()
fmt.Println("ctx is done")

}

func doSomething(cancel context.CancelFunc) {
fmt.Println(“start do something”, time.Now())
<-time.After(3 * time.Second)
fmt.Println(“finish do something”, time.Now())

cancel()

}

  • timeout超时控制
    • WithTimeout 会返回一个 parent 的副本,并且设置duration时间段后结束
    • WithDeadline 会返回一个 parent 的副本,并且设置到一个时刻结束
      package main

import (
“context”
“fmt”
“time”
)

func main() {
//3秒后退出
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
foooo(ctx)
}

func foooo(ctx context.Context) {
fmt.Println(time.Now())

//等context超时退出
<-ctx.Done()

fmt.Println(time.Now())

}

3.3 channel消息传递
channel的实现是一个循环队列。
channel存在3种状态:

  1. nil,未初始化的状态,只进行了声明,或者手动赋值为nil
  2. active,正常的channel,可读或者可写
  3. closed,已关闭,千万不要误认为关闭channel后,channel的值是nil(关闭后可以继续读到channel的数据)
    channel可进行3种操作:
  4. 关闭
    9种组合:

for range
不断从channel读取数据,直到读到channel关闭的信号(channel关闭后,原有的数据可以继续读到)
package main

import (
“fmt”
)

func main() {
//新建chan,写10个数据后关闭
ch := make(chan int, 10)
for i := 0; i < 10; i++ {
ch <- i
}
close(ch)

//消费chan的数据
for data := range ch {
	fmt.Println(data)
}

}

异步队列
package main

import (
“fmt”
“time”
)

func producer(ch chan time.Time) {
for {
ch <- time.Now()
time.Sleep(time.Second)
}
}
func consumer(ch chan time.Time) {
for {
fmt.Println(<-ch)
}
}

func main() {
ch := make(chan time.Time, 1)
go producer(ch)
go consumer(ch)

//主协程阻塞
ch1 := make(chan chan int)
<-ch1

}
select多路选择
同时阻塞,知道最先读到的channel被处理
package main

import (
“context”
“fmt”
“time”
)

func main() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

select {
case <-time.After(1 * time.Second):
	fmt.Println("time after 1 sencond") // 会执行
case <-ctx.Done():
	fmt.Println("ctx timeout after 2 second") // 不会执行
}

}
超时和定时任务
超时
package main

import (
“fmt”
“time”
)

func main() {
fmt.Println(time.Now())
<-time.After(time.Second)
fmt.Println(time.Now())
}
定时任务
package main

import (
“fmt”
“time”
)

func producer(ch chan time.Time) {
//for {
// ch <- time.Now()
// time.Sleep(time.Second)
//}

//改成定时器
ticker := time.NewTicker(time.Second)
for {
	ch <- <-ticker.C
}

}
func consumer(ch chan time.Time) {
for {
fmt.Println(<-ch)
}
}

func main() {
ch := make(chan time.Time, 1)
go producer(ch)
go consumer(ch)

//主协程阻塞
ch1 := make(chan chan int)
<-ch1

}
并发数控制
package main

import (
“fmt”
“sync”
“time”
)

func main() {
ch := make(chan int, 10) // 控制最多10个协程并发
wg := sync.WaitGroup{} //等待子协程结束

//100个任务,共需要处理10s
for i := 0; i < 100; i++ {
	wg.Add(1)
	ch <- 1

	go func() {
		doWork()
		wg.Done()
		<-ch
	}()
}
wg.Wait()

}

//每个子任务耗时1s
func doWork() {
fmt.Println(time.Now())
time.Sleep(time.Second)
}
或者基于timer实现的limiter:
package main

import (
“context”
“fmt”
“golang.org/x/time/rate”
“time”
)

func main() {
ctx := context.Background()
limiter := rate.NewLimiter(2, 1) // 每秒最多调用2次
for {
limiter.Wait(ctx)
go doWork()
}
}

//每个子任务耗时1s
func doWork() {
fmt.Println(time.Now())
time.Sleep(time.Second)
}

相关推荐

  1. Go入门并发编程

    2024-06-11 18:38:02       7 阅读
  2. 微信小程序从入门

    2024-06-11 18:38:02       30 阅读
  3. 并发编程

    2024-06-11 18:38:02       32 阅读
  4. go语言并发编程() ——Context

    2024-06-11 18:38:02       10 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-06-11 18:38:02       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-06-11 18:38:02       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-06-11 18:38:02       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-06-11 18:38:02       18 阅读

热门阅读

  1. PHP运算符:从基础到高级

    2024-06-11 18:38:02       6 阅读
  2. 华为和锐捷设备流统配置

    2024-06-11 18:38:02       8 阅读
  3. Linux下的lvm镜像与快照

    2024-06-11 18:38:02       7 阅读
  4. STM32 UART串口与物联网设备的集成方案

    2024-06-11 18:38:02       9 阅读
  5. PostgreSQL教程

    2024-06-11 18:38:02       4 阅读