package main
import (
"fmt"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Println("worker", id, "started job", j)
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j)
results <- j * 2
}
}
/*
运行结果
worker 3 started job 1
worker 2 started job 3
worker 1 started job 2
worker 1 finished job 2
worker 1 started job 4
worker 2 finished job 3
worker 2 started job 5
worker 3 finished job 1
worker 1 finished job 4
worker 2 finished job 5
这个程序首先创建了一个 jobs channel 和一个 results channel。
然后,它启动了 3 个 worker goroutine,每个 worker goroutine
都在一个无限循环中从 jobs channel 接收任务,并将结果发送到 results channel。
然后,主 goroutine 向 jobs channel 发送 5 个任务,并关闭 jobs channel,表示这些就是所有任务了。
最后,主 goroutine 从 results channel 收集所有的结果。
这个程序的性能取决于许多因素,包括 CPU 的速度、内存的大小、操作系统的调度策略等。
在一般情况下,使用 goroutine 和 channel 进行并发处理可以有效地利用多核 CPU,提高程序的性能。
但是,如果 goroutine 的数量过多,或者 channel 的使用不当,可能会导致性能下降。
因此,你需要根据你的具体需求和环境来调整并发策略。
*/
func main() {
const numJobs = 5
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
// 启动 3 个 worker,初始是阻塞的,因为 jobs 还是空的。
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
// 发送 5 个 jobs,然后关闭 jobs channel,表示这些就是所有任务了。
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)
// 收集所有的结果。
for a := 1; a <= numJobs; a++ {
<-results
}
}
信号量
package main
import (
"fmt"
"sync"
)
type SafeCounter struct {
v map[string]int
mux sync.Mutex
}
func (c *SafeCounter) Inc(key string) {
c.mux.Lock()
// Lock 之后同一时刻只有一个 goroutine 能访问 c.v
c.v[key]++
c.mux.Unlock()
}
func (c *SafeCounter) Value(key string) int {
c.mux.Lock()
// Lock 之后同一时刻只有一个 goroutine 能访问 c.v
defer c.mux.Unlock()
return c.v[key]
}
/*
运行结果
go run example_16_mutex.go
1000
在这个例子中,SafeCounter 是一个并发安全的计数器。它使用 sync.Mutex 来保护其内部的 map。
当你调用 Inc 方法时,它会先锁定 Mutex,然后增加计数器的值,最后解锁 Mutex。
当你调用 Value 方法时,它也会先锁定 Mutex,然后返回计数器的值,最后解锁 Mutex。
在 main 函数中,我们启动了 1000 个 goroutine,每个 goroutine 都会调用 Inc 方法。
由于 Inc 方法是并发安全的,所以即使在并发的情况下,计数器的值也是正确的。
*/
func main() {
c := SafeCounter{v: make(map[string]int)}
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
c.Inc("somekey")
wg.Done()
}()
}
wg.Wait()
fmt.Println(c.Value("somekey"))
}
package main
import (
"fmt"
"sync"
"time"
)
/*
运行结果
go run example_17_sync_cond.go
goroutine2 waiting
goroutine1 waiting
main goroutine ready
goroutine1 true
goroutine2 true
在 Go 语言中,你可以使用 sync.Cond 来进行并发同步。sync.Cond 是一个条件变量,它可以让一个或多个等待的 goroutine 知道某个条件已经成立。
以下是一个简单的例子:
在这个例子中,我们有两个 goroutine 都在等待 sharedRsc 变为 true。它们使用 c.Wait() 来等待这个条件。
当 main goroutine 将 sharedRsc 设置为 true 并调用 c.Broadcast() 时,所有等待的 goroutine 都会被唤醒。
注意,sync.Cond 必须和一个 sync.Mutex 或 sync.RWMutex 一起使用,以避免并发的条件竞争。
在调用 Wait、Signal 和 Broadcast 方法时,必须先锁定这个互斥锁
*/
func main() {
var wg sync.WaitGroup
var sharedRsc = false
var m = sync.Mutex{}
c := sync.NewCond(&m)
wg.Add(1)
go func() {
defer wg.Done()
c.L.Lock()
for sharedRsc == false {
fmt.Println("goroutine1 waiting")
c.Wait()
}
fmt.Println("goroutine1", sharedRsc)
c.L.Unlock()
}()
wg.Add(1)
go func() {
defer wg.Done()
c.L.Lock()
for sharedRsc == false {
fmt.Println("goroutine2 waiting")
c.Wait()
}
fmt.Println("goroutine2", sharedRsc)
c.L.Unlock()
}()
time.Sleep(2 * time.Second)
c.L.Lock()
fmt.Println("main goroutine ready")
sharedRsc = true
c.Broadcast()
c.L.Unlock()
wg.Wait()
}
同步
package main
import (
"fmt"
"sync"
"time"
)
type SafeCounter struct {
v map[string]int
mux sync.Mutex
cond *sync.Cond
}
func (c *SafeCounter) Inc(key string) {
c.mux.Lock()
c.v[key]++
c.mux.Unlock()
c.cond.Broadcast()
}
func (c *SafeCounter) Value(key string) int {
c.mux.Lock()
defer c.mux.Unlock()
for c.v[key] == 0 {
c.cond.Wait()
}
return c.v[key]
}
/*
����
go run example_18_mutex_cond.go
1
����������У�SafeCounter ��һ��������ȫ�ļ���������ʹ�� sync.Mutex ���������ڲ��� map��ʹ�� sync.Cond ���ȴ�ij����������������ֵ��Ϊ 0��������
������� Inc ����ʱ������������ Mutex��Ȼ�����Ӽ�������ֵ�������� Mutex����֪ͨ���еȴ��� goroutine��
������� Value ����ʱ������������ Mutex��Ȼ����һ��ѭ���еȴ���������ֵ��Ϊ 0����ؼ�������ֵ�������� Mutex��
�������չʾ�����ʹ�� sync.Mutex �� sync.Cond ��ʵ���̰߳�ȫ�IJ�����̡�
*/
func main() {
c := SafeCounter{v: make(map[string]int)}
c.cond = sync.NewCond(&c.mux)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println(c.Value("somekey"))
}()
time.Sleep(time.Second)
c.Inc("somekey")
wg.Wait()
}
生产者消费者
package main
import (
"fmt"
"sync"
)
type Item struct {
value string
}
type Buffer struct {
items []Item
lock sync.Mutex
cond *sync.Cond
}
func NewBuffer() *Buffer {
b := &Buffer{}
b.cond = sync.NewCond(&b.lock)
return b
}
func (b *Buffer) Produce(item Item) {
b.lock.Lock()
b.items = append(b.items, item)
b.lock.Unlock()
b.cond.Broadcast()
}
func (b *Buffer) Consume() Item {
b.lock.Lock()
for len(b.items) == 0 {
b.cond.Wait()
}
item := b.items[0]
b.items = b.items[1:]
b.lock.Unlock()
return item
}
/*
����
go run example_19_produce_sonsume.go
consume: item 0
consume: item 1
consume: item 2
consume: item 3
consume: item 4
consume: item 5
consume: item 6
consume: item 7
consume: item 8
consume: item 9
����������У�Buffer ��һ��������ȫ�Ļ���������ʹ�� sync.Mutex ���������ڲ�����Ƭ��ʹ�� sync.Cond ���ȴ���������Ϊ�ա�
������� Produce ����ʱ������������ Mutex��Ȼ��һ��Ԫ�����ӵ���Ƭ�������� Mutex����֪ͨ���еȴ��� goroutine��
������� Consume ����ʱ������������ Mutex��Ȼ����һ��ѭ���еȴ���Ƭ��Ϊ�գ�Ȼ�����Ƭ��ȡ��һ��Ԫ�أ������� Mutex��
�������չʾ�����ʹ�� sync.Mutex �� sync.Cond ��ʵ��������-������ģ�͡�
*/
func main() {
b := NewBuffer()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 10; i++ {
b.Produce(Item{value: fmt.Sprintf("item %d", i)})
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 10; i++ {
item := b.Consume()
fmt.Println("consume:", item.value)
}
}()
wg.Wait()
}
读写锁
package main
import (
"fmt"
"sync"
//"time"
)
type SafeCounter struct {
v map[string]int
mux sync.RWMutex
}
func (c *SafeCounter) Inc(key string) {
c.mux.Lock()
c.v[key]++
c.mux.Unlock()
}
func (c *SafeCounter) Value(key string) int {
c.mux.RLock()
defer c.mux.RUnlock()
return c.v[key]
}
/*
在这个例子中,SafeCounter 是一个并发安全的计数器。它使用 sync.RWMutex 来保护其内部的 map。
当你调用 Inc 方法时,它会先锁定 RWMutex,然后增加计数器的值,最后解锁 RWMutex。
当你调用 Value 方法时,它会先获取 RWMutex 的读锁,然后返回计数器的值,最后解锁 RWMutex。
*/
func main() {
c := SafeCounter{v: make(map[string]int)}
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
c.Inc("somekey")
fmt.Println(c.Value("somekey"))
}(i)
}
wg.Wait()
}