go的singleflight学习

方法

Do
DoChan
Forget

使用示例

sg "golang.org/x/sync/singleflight"

func TestDo(t *testing.T) {
	var g sg.Group
	v, err, _ := g.Do("key", func() (interface{}, error) {
		return "bar", nil
	})
	if got, want := fmt.Sprintf("%v (%T)", v, v), "bar (string)"; got != want {
		t.Errorf("Do = %v; want %v", got, want)
	}
	if err != nil {
		t.Errorf("Do error = %v", err)
	}
}

func TestDoErr(t *testing.T) {
	var g sg.Group
	someErr := errors.New("Some error")
	v, err, _ := g.Do("key", func() (interface{}, error) {
		return nil, someErr
	})
	if err != someErr {
		t.Errorf("Do error = %v; want someErr %v", err, someErr)
	}
	if v != nil {
		t.Errorf("unexpected non-nil value %#v", v)
	}
}

func TestDoDupSuppress(t *testing.T) {
	var g sg.Group
	var wg1, wg2 sync.WaitGroup
	c := make(chan string, 1)
	var calls int32
	// 原子读取calls,如果为1则将wg1 -1,同时从chan中读取内容,然后再写回,
	// sleep 10mills 后,返回 chan 中的内容
	fn := func() (interface{}, error) {
		if atomic.AddInt32(&calls, 1) == 1 {
			fmt.Println("[fn] first call")
			// First invocation.
			// 第一次调用
			wg1.Done()
		}
		fmt.Println("[fn] call")

		v := <-c
		c <- v // pump; make available for any future calls

		// 模拟耗时的下游请求,让更多的协程等待sg的Do结果
		time.Sleep(10 * time.Millisecond) // let more goroutines enter Do

		return v, nil
	}

	const n = 10
	wg1.Add(1)
	for i := 0; i < n; i++ {
		curI := i
		wg1.Add(1)
		wg2.Add(1)
		go func() {
			fmt.Println("[TestDoDupSuppress] curI", curI)
			defer wg2.Done() // 维持协程的结束
			wg1.Done()       // 维持协程的开始
			// 位置1(不是下一行的标识)
			v, err, _ := g.Do("key", fn) // 位置1的下一个位置
			if err != nil {
				t.Errorf("Do error: %v", err)
				return
			}
			if s, _ := v.(string); s != "bar" {
				t.Errorf("Do = %T %v; want %q", v, v, "bar")
			}
		}()
	}
	wg1.Wait() // 全部协程都就绪了,保证所有协程进入位置1
	// 此时只有一个协程进入了Do中,等待在chan上

	c <- "bar"

	wg2.Wait()
	// 此时所有的协程都处理结束

	// 此时的calls一定处于[1, n)
	got := atomic.LoadInt32(&calls)
	fmt.Println("got", got)
	if got <= 0 || got >= n {
		t.Errorf("number of calls = %d; want over 0 and less than %d", got, n)
	}
}

// Test that singleflight behaves correctly after Forget called.
// See https://github.com/golang/go/issues/31420
// 忘记一个key,forget about a key
// 下一次调用时,可以继续使用Do
// Future calls to Do for this key will call the function
// 而不是等待一个更早的请求去直接拿结果
// rather than waiting for an earlier call to complete
func TestForget(t *testing.T) {
	var g sg.Group

	var (
		firstStarted  = make(chan struct{}) // 没有缓冲的通道
		unblockFirst  = make(chan struct{})
		firstFinished = make(chan struct{})
	)

	go func() {
		g.Do("key", func() (i interface{}, e error) {
			// 关闭一个已关闭的channel会导致panic,所以在关闭之前,需要确认channel是否已被关闭
			// 或者处理可能出现的panic
			// 注意
			// 关闭channel并不是必须的
			// 当不会再向channel写入数据时,可以关闭channel
			// 当其他goroutine试图从该channel接收时,它们会立即接收到一个零值而不再阻塞
			close(firstStarted)
			<-unblockFirst
			close(firstFinished)
			return
		})
	}()
	<-firstStarted  // 第一次请求卡在这里等待结果
	g.Forget("key") // 第一次请求的Do还卡在<-unblockFirst上,此时执行Forget

	unblockSecond := make(chan struct{}) // 第二次请求,卡在 <-unblockSecond 中
	secondResult := g.DoChan("key", func() (i interface{}, e error) {
		<-unblockSecond
		return 2, nil
	})

	close(unblockFirst) // 通知第一次请求的Do中执行<-unblockFirst
	<-firstFinished     // 等待第一次请求中的Do执行完成

	// 第三次请求,返回3
	thirdResult := g.DoChan("key", func() (i interface{}, e error) {
		return 3, nil
	})

	close(unblockSecond) // 通知第二次请求的Do执行完成
	r2 := <-secondResult // 读取第二次的结果
	if r2.Val != 2 {
		t.Errorf("We should receive result produced by second call, expected: 2, got %d", r2.Val)
	}
	r3 := <-thirdResult // 读取第三次的结果
	if r3.Val != 2 {
		t.Errorf("We should receive result produced by second call, expected: 2, got %d", r3.Val)
	}
}

// TestDoChan 结果存储到 Chan 中
func TestDoChan(t *testing.T) {
	var g sg.Group
	ch := g.DoChan("key", func() (interface{}, error) {
		return "bar", nil
	})

	res := <-ch
	v := res.Val
	err := res.Err
	if got, want := fmt.Sprintf("%v (%T)", v, v), "bar (string)"; got != want {
		t.Errorf("Do = %v; want %v", got, want)
	}
	if err != nil {
		t.Errorf("Do error = %v", err)
	}
}

// Test singleflight behaves correctly after Do panic.
// See https://github.com/golang/go/issues/41133,这个issues有意思
// 如果panic,则所有协程都panic
func TestPanicDo(t *testing.T) {
	var g sg.Group
	fn := func() (interface{}, error) {
		panic("invalid memory address or nil pointer dereference")
	}

	const n = 5
	waited := int32(n)
	panicCount := int32(0) // 如果panic,则所有协程都panic
	done := make(chan struct{})
	for i := 0; i < n; i++ {
		go func() {
			defer func() {
				// defer中先panicCount、再waited
				// 主函数判断的时候是先判断 waited,再判断 panicCount
				if err := recover(); err != nil {
					t.Logf("Got panic: %v\n%s", err, debug.Stack())
					atomic.AddInt32(&panicCount, 1)
				}

				if atomic.AddInt32(&waited, -1) == 0 {
					close(done)
				}
			}()

			g.Do("key", fn)
		}()
	}

	select {
	case <-done: // 关闭通道
		if panicCount != n {
			t.Errorf("Expect %d panic, but got %d", n, panicCount)
		}
	case <-time.After(time.Second): // 1秒后还没有执行完成
		t.Fatalf("Do hangs")
	}
}

// runtime.Goexit() 用于终止调用它的 goroutine
// 该函数不返回任何值。当执行此函数时,goroutine 将立即停止执行
// 但是并不会影响其他的 goroutine
// 随后,程序将恢复执行其他的 goroutine
// 如果主 goroutine 执行了这个函数,所有的 goroutine 将停止
//
// 这个函数通常在需要提前结束一个 goroutine 或者在特定条件下需要关闭 goroutine 时使用
// 但通常情况下,通过 return 或者到达函数尾部来结束一个 goroutine
// 使用 runtime.Goexit() 非常罕见
//
func TestGoexitDo(t *testing.T) {
	var g sg.Group
	fn := func() (interface{}, error) {
		fmt.Println("[fn] before goexit")
		runtime.Goexit()
		return nil, nil
	}

	const n = 5
	waited := int32(n)
	done := make(chan struct{})
	for i := 0; i < n; i++ {
		go func() {
			var err error
			defer func() {
				if err != nil {
					t.Errorf("Error should be nil, but got: %v", err)
				}
				tmp := atomic.AddInt32(&waited, -1)
				fmt.Println("waited", tmp)
				if tmp == 0 {
					close(done)
				}
			}()
			// g中使用匿名函数执行fn,所以当fn中直接runtime.Goexit(),则该协程被kill
			// 将不执行下一行打印,而是直接执行defer
			val, err, shared := g.Do("key", fn)
			fmt.Printf("val %v, err %v, shared %v\n", jsonx.ToString(val), err, shared)
		}()
	}

	select {
	case <-done:
		fmt.Println("finish")
	case <-time.After(time.Second):
		t.Fatalf("Do hangs")
	}
}

相关推荐

  1. gosingleflight学习

    2024-03-11 08:18:01       23 阅读
  2. gosingleflight

    2024-03-11 08:18:01       19 阅读
  3. Go语言中同步原语:ErrGroup、Semaphore和SingleFlight

    2024-03-11 08:18:01       37 阅读
  4. goslice学习

    2024-03-11 08:18:01       21 阅读
  5. gofasthttp学习

    2024-03-11 08:18:01       26 阅读
  6. gofasthttp学习~stacklesswriter

    2024-03-11 08:18:01       20 阅读
  7. Go语言GC

    2024-03-11 08:18:01       37 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-03-11 08:18:01       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-03-11 08:18:01       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-03-11 08:18:01       19 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-03-11 08:18:01       20 阅读

热门阅读

  1. Angular 项目的架构设计

    2024-03-11 08:18:01       21 阅读
  2. 解决eclipse上启动不了tomcat问题

    2024-03-11 08:18:01       18 阅读
  3. NumPy的np.dot函数:计算点积与矩阵乘积的利器

    2024-03-11 08:18:01       25 阅读
  4. 软考笔记--信息系统架构

    2024-03-11 08:18:01       16 阅读
  5. 软考 系统架构设计师之回归及知识点回顾(3)

    2024-03-11 08:18:01       22 阅读
  6. 算法二刷day4

    2024-03-11 08:18:01       23 阅读
  7. 【Vue3】基础

    2024-03-11 08:18:01       21 阅读
  8. Linux下阻塞IO驱动实验实例二的测试

    2024-03-11 08:18:01       23 阅读
  9. List去重的五种方法

    2024-03-11 08:18:01       21 阅读