golang 协程池 动态扩缩容

参考 github.com/panjf2000/ants

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"

	_ "github.com/panjf2000/ants"
)

type pool struct {
	// 协程池最大容量
	cap int32
	// 当前运行的协程个数
	run   int32
	block bool

	// 空闲goroutinue
	idleWorkers []*worker
	// 最大空闲时间,超过就挥手
	idleTimeoutSec uint32

	lock  sync.Mutex
	cond  *sync.Cond
	close chan struct{}
	wg    sync.WaitGroup
}

func NewPool(cap int32) *pool {
	c := &pool{
		cap: cap,
		//idleWorkers: make([]*worker, 0, cap),
		close: make(chan struct{}),
	}
	c.cond = sync.NewCond(&c.lock)
	go c.checkIdleWork()
	return c
}

func (p *pool) checkIdleWork() {
	for {
		select {
		case <-time.After(time.Second * 5):
			{
				p.lock.Lock()
				defer p.lock.Unlock()

				var i = 0
				for ; i < len(p.idleWorkers) &&
					uint32(time.Now().Unix())-p.idleWorkers[i].lastWorkTs >= p.idleTimeoutSec; i++ {
				}

				fmt.Printf("checkIdleWork del=%d\n", i)
				if i > 0 {
					for j := 0; j < i; j++ {
						atomic.AddInt32(&p.run, -1)
						p.idleWorkers[j].task <- nil
					}

					p.idleWorkers = p.idleWorkers[i:]
					fmt.Printf("checkIdleWork now run=%d, del=%d\n", len(p.idleWorkers), i)
				}
			}
		}
	}
}

func (p *pool) Done() {
	p.wg.Done()
}

func (p *pool) Close() {
	fmt.Printf("pool Close\n")
	close(p.close)
	p.wg.Wait()
}

func (p *pool) backPool(w *worker) {
	p.lock.Lock()
	p.idleWorkers = append(p.idleWorkers, w)
	p.lock.Unlock()
	p.cond.Signal()
	fmt.Printf("backPool\n")
}

func (p *pool) Submit(task func()) error {

	p.lock.Lock()
	if len(p.idleWorkers) > 0 {
		work := p.idleWorkers[len(p.idleWorkers)-1]
		p.idleWorkers = p.idleWorkers[:len(p.idleWorkers)-1]
		work.task <- task
		p.lock.Unlock()
		fmt.Printf("use idle\n")
		return nil
	}
	p.lock.Unlock()

	if p.run < p.cap {
		fmt.Printf("use new\n")
		work := &worker{
			pool: p,
			task: make(chan func()),
		}

		p.wg.Add(1)
		work.run()
		atomic.AddInt32(&p.run, 1)

		work.task <- task
		return nil

	} else {

	loop:
		if p.block {
			return fmt.Errorf("pool max")
		}

		fmt.Printf("pool max, wait\n")
		p.lock.Lock()
		p.cond.Wait()

		if len(p.idleWorkers) > 0 {
			work := p.idleWorkers[len(p.idleWorkers)-1]
			p.idleWorkers = p.idleWorkers[:len(p.idleWorkers)-1]
			work.task <- task
			p.lock.Unlock()
			fmt.Printf("use idle\n")
			return nil
		}
		p.lock.Unlock()

		if p.run < p.cap {
			work := &worker{
				pool: p,
				task: make(chan func()),
			}
			p.wg.Add(1)
			work.run()
			atomic.AddInt32(&p.run, 1)
			work.task <- task

			fmt.Printf("use new\n")
			return nil
		}

		goto loop

		return nil
	}
}

type worker struct {
	pool       *pool
	task       chan func()
	lastWorkTs uint32
}

func (w *worker) run() {

	go func() {
		defer w.pool.Done()

		for {
			select {
			case <-w.pool.close:
				return
			case taskFunc := <-w.task:
				if taskFunc == nil {
					return
				}
				taskFunc()
				w.lastWorkTs = uint32(time.Now().Unix())
				w.pool.backPool(w)
			}
		}
	}()
}

func main() {

	p := NewPool(20)

	for i := 0; i < 5; i++ {

		p.Submit(func() {
			func(idx int) {
				fmt.Printf("work %d ..\n", idx)
				time.Sleep(time.Second * 4)
			}(i)
		})

	}

	time.Sleep(time.Second * 10)
	p.Close()
}

相关推荐

  1. golang 动态

    2024-04-09 17:22:03       34 阅读
  2. golang 实现

    2024-04-09 17:22:03       64 阅读
  3. zookeeper动态(无需重启)

    2024-04-09 17:22:03       42 阅读
  4. Golangants使用笔记

    2024-04-09 17:22:03       63 阅读
  5. Lua

    2024-04-09 17:22:03       39 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-04-09 17:22:03       98 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-04-09 17:22:03       106 阅读
  3. 在Django里面运行非项目文件

    2024-04-09 17:22:03       87 阅读
  4. Python语言-面向对象

    2024-04-09 17:22:03       96 阅读

热门阅读

  1. 谷粒商城学习日志

    2024-04-09 17:22:03       33 阅读
  2. 蓝桥杯刷题 深度优先搜索-[2410]最大连通(C++)

    2024-04-09 17:22:03       34 阅读
  3. ChopticsDriver调用说明

    2024-04-09 17:22:03       37 阅读
  4. [安卓逆向]常见调试和反调试及解决方案

    2024-04-09 17:22:03       37 阅读
  5. Redis 常见面试题

    2024-04-09 17:22:03       33 阅读
  6. Arrays类

    Arrays类

    2024-04-09 17:22:03      36 阅读
  7. 系统设计之订单系统中如何防止商品超卖

    2024-04-09 17:22:03       39 阅读
  8. Vue Router的介绍与引入

    2024-04-09 17:22:03       34 阅读
  9. 自动化测试岗面试问题整理

    2024-04-09 17:22:03       29 阅读
  10. 基于单片机的机械手臂控制系统设计

    2024-04-09 17:22:03       30 阅读