简单剖析tRPC-Go中使用的第三方协程池ants

tRPC-Go中的tRPC.Go()方法使用了ants协程池,做个简单剖析

panjf2000/ants协程池

在tRPC.Go方法(异步启动goroutine)中看到里面使用了ants协程池去实现(具体位置:g.pool.Invoke(p)

前置知识:

我们想异步完成一个任务,首先创建一个任务,然后需要从协程池(PoolWithFunc)中获取worker(goWorkerWithFunc),假设目前队列为空,这时一个worker和一个goroutine会一起创建出来,可以认为他俩就是绑一起的,然后处理完这个任务后,处于当前goroutine中的worker会放入全局的队列中,等待被其他协程去获取这个worker。

于是深入探索了下pool.Invoke等方法:

// github.com/panjf2000/ants/v2@v2.4.6/pool_func.go:162
// Invoke submits a task to pool.
func (p *PoolWithFunc) Invoke(args interface{}) error {// 这个func的caller:p就是一个协程池对象
	if p.IsClosed() {
		return ErrPoolClosed
	}
	var w *goWorkerWithFunc
	if w = p.retrieveWorker(); w == nil {// <----
		return ErrPoolOverload
	}
	w.args <- args // 这里展现了channel的异步通信能力:告知,具体可往下看**callback**
	return nil
}

首先执行p.retrieveWorker()获取一个worker,这个方法里面会根据当前worker队列的数量去做不同的逻辑:

  1. 若队列为空,表示无可用的worker,则需要新建(其中包含使用原生go去启动新协程)
// 会执行下面这个函数
spawnWorker := func() {
		w = p.workerCache.Get().(*goWorkerWithFunc)// workerCache是个sync.pool,从中获取goWorkerWithFunc对象
		w.run()// 启动一个 Goroutine 来重复执行传入的函数
	}
// worker 的结构如下:
// goWorkerWithFunc is the actual executor who runs the tasks,
// it starts a goroutine that accepts tasks and performs function calls.
type goWorkerWithFunc struct {
	// pool who owns this worker.
	pool *PoolWithFunc // 该 worker 所在协程池对象的指针
	// args is a job should be done.
	args chan interface{} // 待执行的任务
	// recycleTime will be update when putting a worker back into queue.
	recycleTime time.Time
}

为什么说这个Goroutine可以重复执行传入的函数?

答案:使用for循环不断获取无缓冲channel中的对象,获取不到则阻塞,直到管道关闭为止。

// run starts a goroutine to repeat the process
// that performs the function calls.
func (w *goWorkerWithFunc) run() {
	w.pool.incRunning()
	go func() {
		defer func() {
			// ...
		}()
		for args := range w.args {// w.args类型是chan any,用for循环不断获取无缓冲channel中的对象,获取不到则阻塞,直到管道关闭为止
			if args == nil {
				return
			}
			w.pool.poolFunc(args)// args对象就是每个传入的任务,poolFunc是这个协程池对象的执行任务的接口func
			if ok := w.pool.revertWorker(w); !ok {// 将该worker放入队列中,供其他人后续获取这个worker并利用当前这个协程去执行任务
				return
			}
		}
	}()
}

所以说:新开的这个协程会一直处于for循环中不断等待并执行新的任务。

callbackg.pool.Invoke(p)方法中的w.args <- args就是用来通知某个任务协程中的for args := range w.args去执行新的任务。

这里的p和args对象就是一个任务,结构如下:

p := &goerParam{
    ctx:     newCtx,// context.Context
    cancel:  cancel,// context.CancelFunc
    handler: handler,// func(context.Context) 调用方传入的闭包函数
}

sync.pool

上面可以看到 p.workerCache.Get().(*goWorkerWithFunc)中,任务p包含了一个workerCache属性,它是sync.pool类型,一个并发安全的对象池。说明worker都保存在一个对象池中,目的是减少内存分配和垃圾回收的开销。

有待深入…

相关推荐

  1. 简单剖析tRPC-Go使用ants

    2024-06-17 01:18:04       9 阅读
  2. Go实现简单(通过channel实现)

    2024-06-17 01:18:04       19 阅读
  3. Golangants使用笔记

    2024-06-17 01:18:04       37 阅读
  4. 面:go能不能手写一个简单

    2024-06-17 01:18:04       11 阅读
  5. go实现

    2024-06-17 01:18:04       17 阅读
  6. Go语言使用

    2024-06-17 01:18:04       34 阅读
  7. python使用

    2024-06-17 01:18:04       18 阅读
  8. go

    2024-06-17 01:18:04       5 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-06-17 01:18:04       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-06-17 01:18:04       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-06-17 01:18:04       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-06-17 01:18:04       18 阅读

热门阅读

  1. Opencv无法自动补全

    2024-06-17 01:18:04       7 阅读
  2. 15分钟面试被5连CALL,你扛得住么?

    2024-06-17 01:18:04       8 阅读
  3. SSH error : no kex alg message

    2024-06-17 01:18:04       8 阅读
  4. Spring (60)Spring WebFlux

    2024-06-17 01:18:04       9 阅读
  5. 数据结构之B树的原理与业务场景

    2024-06-17 01:18:04       9 阅读
  6. Autosar实践——诊断配置(DaVinci Configuration)

    2024-06-17 01:18:04       7 阅读
  7. 2024.06.16 刷题日记

    2024-06-17 01:18:04       4 阅读
  8. linux发展历程

    2024-06-17 01:18:04       6 阅读
  9. atcoder ABC 358-B题详解

    2024-06-17 01:18:04       7 阅读
  10. Qt中的事件循环

    2024-06-17 01:18:04       6 阅读
  11. Linux各目录的作用

    2024-06-17 01:18:04       7 阅读