golang sync pool

sync.Pool是内置对象池技术,可用于缓存临时对象,避免因频繁建立临时对象所带来的消耗以及对GC造成的压力

在很多知名框架中都可以看到sync.Pool的大量使用。比如Gin中用sync.Pool来复用每个请求都会创建的gin.Context对象

但是值得注意的是sync.Pool缓存的对象可能被无通知的清理

基本用法

sync.Pool在初始化时,需要用户提供对象构造函数New。用户使用Get来从对象池中获取对象,然后使用Put将对象归还对象池。

type Item struct {
	A int
}

func TestUsePool(t *testing.T) {
	pool:=sync.Pool{
		New: func() interface{} {
			return &Item{
				A:1,
			}
		},
	}
	i:=pool.Get().(*Item)
	fmt.Println(i.A)

	i1:=&Item{
		A: 2,
	}
	pool.Put(i1)
	i11:=pool.Get().(*Item)
	fmt.Println(i11.A)
}

底层实现

以下基于go-1.16

在Golang的GMP调度中,同一时间一个M(系统线程)上只能运行一个P。也就是说,从线程维度来看,在P上的逻辑都是单线程执行的。

sync.Pool就是充分利用了GMP这一特点。对于同一个sync.Pool,每个P都有一个自己的本地对象池poollocal

type pool struct{
  // 禁止拷贝检测方法
  noCopy noCopy
  
  // 元素类型为poolLocal的数组。储存各个P对应本地对象池
  local unsafe.Pointer // local fixed-size per-P pool
  // local数组长度
  localSize uintptr // size of the local array
  // 上一轮清理前的对象池
  victim unsafe.Pointer // local from previous cycle
  victimSize uintptr // size of victims array
  
  // 创建对象的方法
  New func() interface{}
}
type poolLocal struct{
  poolLocalInternal
  pad	[128-unsafe.SizeOf(poolLocalInternal{}%128)]byte
}

type poolLocalInternal struct{
  // Get、Put操作优先存取private变量
  private interface{}
  // p本地对象池
  shared poolChain
}

poolChain实现

// 池中的双端队列
type poolDequeue struct {
  // 储存队列的头、尾
	headTail uint64
  // 队列元素
	vals []eface
}

// 链节点
type poolChainElt struct {
	poolDequeue
	next, prev *poolChainElt
}

// 池链,指向头尾节点
type poolChain struct {
	head *poolChainElt
	tail *poolChainElt
}

poolChain由图及代码可知是链表+ring buffer的结构。其中采用ringBuffer的理由如下

  • 预先分配内存(可能是为了在put的时候省去内存分配消耗),且分配内存项可不断复用
  • ringBuffer实质上是数组,是连续内存结构,非常利用CPU Cache。在访问poolDequeue某一项,其附近数据项都有可能统一加载到Cache Line,访问速度更快

另一个值得注意的点是head与tail居然并不是独立两个变量。而使用一个64位变量,前32位为head,后32位为tail。

这种打包操作是常见的lock free优化手段

lock free是在多线程情况下访问共有内存时不阻塞彼此的编程手段

对于poolDequeue来说,可能会被多个P同时访问,那么比如在ring buffer仅剩一个时,head-tail==1,同时访问,可能两个P都能获取到对象,而这并不符合预期。

所以采用了CAS操作,是多个P都可能拿到对象,但只有一个P调用CAS成功

Put

Put方法将对象放入池中,按照以下顺序优先放入

  1. 当前P对应的本地缓存池的私有对象
  2. 当前P对应的本地缓存池的共享链表
func (p *pool) Put(x interface{}) {
	if x == nil {
		return
	}

	// 获取池中当前P对应的本地缓存池
	l,_ := p.pin()

	// 优先设置private,若成功,将不会写入shared池
	if l.private == nil {
		l.private = x
		x = nil
	}

	// 推入对象到当前P对应的本地缓存池共享链表
	if x != nil {
		l.shared.pushHead(x)
	}

	runtime_procUnpin()
}
pushHead

pushHead将对象推入链头部

func (c *poolChain) pushHead(val any) {
  // 若链头节点为空,则初始化
	d := c.head
	if d == nil {
		// Initialize the chain.
		const initSize = 8 // Must be a power of 2
		d = new(poolChainElt)
		d.vals = make([]eface, initSize)
		c.head = d
		storePoolChainElt(&c.tail, d)
	}

  // 将元素推入头节点双向队列中
	if d.pushHead(val) {
		return
	}

  // 若当前双向队列满,则分配两倍于原队列的新队列
	newSize := len(d.vals) * 2
	if newSize >= dequeueLimit {
		// Can't make it any bigger.
		newSize = dequeueLimit
	}

  // 使新队列为链头,并插入对象到新队列中
	d2 := &poolChainElt{prev: d}
	d2.vals = make([]eface, newSize)
	c.head = d2
	storePoolChainElt(&d.next, d2)
	d2.pushHead(val)
}

pin

pin方法主要用于

  • 初始化或者重新创建local数组。当local数组为空,或者与当前runtime.GOMAXPROCS不一致,就会触发重新创建local数组以和P数量一致
  • 从当前P中取对应的本地缓存池poolLocal
  • 防止当前P被抢占。
// pin pins the current goroutine to P, disables preemption and
// returns poolLocal pool for the P and the P's id.
// Caller must call runtime_procUnpin() when done with the pool.
func (p *Pool) pin() (*poolLocal, int) {
  // 获取当前P的id,并禁止抢占
	pid := runtime_procPin()

  // 若池的本地缓存池数量大于pid,说明P数量没有变化,可以直接取P所对应的本地缓存池
	s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
	l := p.local                              // load-consume
	if uintptr(pid) < s {
    // 为什么是以pid去获取所在local的位置呢
		return indexLocal(l, pid), pid
	}
  
	return p.pinSlow()
}

func indexLocal(l unsafe.Pointer, i int) *poolLocal {
	lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
	return (*poolLocal)(lp)
}
pinSlow
func (p *Pool) pinSlow() (*poolLocal, int) {
  // 取消P的禁止抢占,使能加上全局池锁
	runtime_procUnpin()
  // 全局池加锁后,先再次尝试直接获取
	allPoolsMu.Lock()
	defer allPoolsMu.Unlock()
  
	pid := runtime_procPin()
	// poolCleanup won't be called while we are pinned.
	s := p.localSize
	l := p.local
	if uintptr(pid) < s {
		return indexLocal(l, pid), pid
	}
  
  // 若池的本地池为空,添加到全局中
	if p.local == nil {
		allPools = append(allPools, p)
	}
  
	// 新建与当前P数量一致的本地缓存池,并返回当前P的本地缓存池
	size := runtime.GOMAXPROCS(0)
	local := make([]poolLocal, size)
	atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
	runtime_StoreReluintptr(&p.localSize, uintptr(size))     // store-release
	return &local[pid], pid
}

runtime_procPin

runtime_procPin是procPin的封装,主要是为了防止P被抢占以及返回P id

func procPin() int {
  // 先获取当前goroutine
	_g_ := getg()
  
  // 接着获取goroutine绑定的系统线程,并对该线程加锁
  // 加锁之后P便不会被抢占,使得不会被GC
	mp := _g_.m
	mp.locks++
  
  // 返回系统线程绑定的P id
	return int(mp.p.ptr().id)
}

Get

获取对象的顺序如下

  1. 当前P对应的本地缓存池的私有对象
  2. 当前P对应的本地缓存池的共享链表
  3. 其他P对应的本地缓存池的共享链表
  4. 上轮GC幸存缓存池私有对象和共享链表
  5. New方法构造
func (p *pool) Get() interface{} {
	l, pid := p.pin()

  // 优先尝试获取当前P对应的本地缓存池的私有对象
	x := l.private
	l.private = nil

	if x == nil {
    // 接着尝试当前P对应的本地缓存池的共享链表的头节点
		x,_ = l.shared.popHead()

		// 当无法从当前p缓存池获取数据,就会尝试从其他P缓存池获取
		// 对于其他p的poolChain会调用popTail
		// 若其他p也没有,那就尝试从victim中取数据
		if x == nil {
			x = p.getSlow(pid)
		}
	}
	runtime_procUnpin()

  // 都获取不到的情况,重新构造
	if x == nil && p.New != nil {
		x = p.New()
	}
	
	return x
}

getSlow是先从其他P窃取,然后从victim缓存中获取

func (p *Pool) getSlow(pid int) any {
	size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
	locals := p.local                            // load-consume
  
  // 尝试从其他P池窃取对象
	for i := 0; i < int(size); i++ {
		l := indexLocal(locals, (pid+i+1)%int(size))
    // 仅被允许获取其他P的尾部对象
		if x, _ := l.shared.popTail(); x != nil {
			return x
		}
	}

  // 若未从其他P窃取到对象,还可以从上轮GC遗留的本地池中获取
	size = atomic.LoadUintptr(&p.victimSize)
	if uintptr(pid) >= size {
		return nil
	}
	locals = p.victim
	l := indexLocal(locals, pid)
	if x := l.private; x != nil {
		l.private = nil
		return x
	}
	for i := 0; i < int(size); i++ {
		l := indexLocal(locals, (pid+i)%int(size))
		if x, _ := l.shared.popTail(); x != nil {
			return x
		}
	}

	// 获取不到,说明无幸存者
	atomic.StoreUintptr(&p.victimSize, 0)

	return nil
}

poolCleanup

poolCleanup在GC之前将pool清空,通过victim将回收拆为了两步,防止GC大量清理导致的抖动

func init() {
	runtime_registerPoolCleanup(poolCleanup)
}

func poolCleanup() {
	// 清理oldPools上的幸存对象
	for _, p := range oldPools {
		p.victim = nil
		p.victimSize = 0
	}

	// 迁移池本地缓存到池victim
	for _, p := range allPools {
		p.victim = p.local
		p.victimSize = p.localSize
		p.local = nil
		p.localSize = 0
	}

	// 全局池迁移到oldPools
	oldPools, allPools = allPools, nil
}

Ref

  1. https://www.cyhone.com/articles/think-in-sync-pool/
  2. https://www.cnblogs.com/gaochundong/p/lock_free_programming.html
  3. https://zhuanlan.zhihu.com/p/99710992

相关推荐

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-04-28 18:30:02       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-04-28 18:30:02       101 阅读
  3. 在Django里面运行非项目文件

    2024-04-28 18:30:02       82 阅读
  4. Python语言-面向对象

    2024-04-28 18:30:02       91 阅读

热门阅读

  1. 陕西省工程系列高级职称评审申报指南

    2024-04-28 18:30:02       34 阅读
  2. Viewpage+TabLayout+Fragment常用功能实现

    2024-04-28 18:30:02       35 阅读
  3. sql将日期区间拆分为多行

    2024-04-28 18:30:02       30 阅读
  4. 代码随想录训练营25day-贪心算法3

    2024-04-28 18:30:02       35 阅读
  5. 常用的启发式算法及其应用

    2024-04-28 18:30:02       30 阅读
  6. 【NC16619】传球游戏

    2024-04-28 18:30:02       28 阅读
  7. Gromacs——教程学习(4)

    2024-04-28 18:30:02       33 阅读
  8. 通过ffmpeg 下载在线的.m3u8格式视频

    2024-04-28 18:30:02       28 阅读
  9. OSD图像技术

    2024-04-28 18:30:02       32 阅读