go的fasthttp学习~stackless的writer

代码

package stackless

import (
	"errors"
	"fmt"
	"io"
	"sync"

	"github.com/valyala/bytebufferpool"
)

// Writer is an interface stackless writer must conform to.
// Writer 是 Stackless writer 必须遵守的接口
// The interface contains common subset for Writers from compress/* packages.
type Writer interface {
	Write(p []byte) (int, error)
	Flush() error
	Close() error
	Reset(w io.Writer)
}

// NewWriterFunc must return new writer that will be wrapped into
// stackless writer.
type NewWriterFunc func(w io.Writer) Writer

// NewWriter creates a stackless writer around a writer returned
// from newWriter.
//
// The returned writer writes data to dstW.
// 返回的Writer将数据写入到dstW
// 使用大量栈空间的Writers被封装为stackless writer,这样可以支持更高的并发
// Writers that use a lot of stack space may be wrapped into stackless writer,
// thus saving stack space for high number of concurrently running goroutines.
//
// newWriter的意思是把内部结构体writer的私有变量 xw xWriter 通过外部的方法转换为一个stackless的成员
// 然后让内部结构体writer的zw指向它
// 后续都是通过w.zw来操作写入
func NewWriter(dstW io.Writer, newWriter NewWriterFunc) Writer {
	w := &writer{
		dstW: dstW,
	}
	w.zw = newWriter(&w.xw)
	return w
}

type writer struct {
	dstW io.Writer
	zw   Writer
	xw   xWriter

	err error
	n   int

	p  []byte
	op op
}

type op int

const (
	opWrite op = iota
	opFlush
	opClose
	opReset
)

func (w *writer) Write(p []byte) (int, error) {
	w.p = p
	err := w.do(opWrite)
	w.p = nil
	return w.n, err
}

func (w *writer) Flush() error {
	return w.do(opFlush)
}

func (w *writer) Close() error {
	return w.do(opClose)
}

func (w *writer) Reset(dstW io.Writer) {
	// 归还writer中的bytebufferpool.ByteBuffer
	w.xw.Reset()
	// 初始化writer中的zw指向的stackless的内容
	w.do(opReset) //nolint:errcheck
	w.dstW = dstW
}

func (w *writer) do(op op) error {
	w.op = op
	if !stacklessWriterFunc(w) {
		return errHighLoad
	}
	err := w.err
	if err != nil {
		return err
	}
	// 如果 w.xw 的 bb 中还有数据,则将其写入dstW
	if w.xw.bb != nil && len(w.xw.bb.B) > 0 {
		_, err = w.dstW.Write(w.xw.bb.B)
	}
	// 更新xw
	w.xw.Reset()

	return err
}

var errHighLoad = errors.New("cannot compress data due to high load")

var (
	stacklessWriterFuncOnce sync.Once
	stacklessWriterFuncFunc func(ctx any) bool
)

func stacklessWriterFunc(ctx any) bool {
	stacklessWriterFuncOnce.Do(func() {
		stacklessWriterFuncFunc = NewFunc(writerFunc)
	})
	// 第一次执行stacklessWriterFuncFunc时
	// 此时macProc个子协程已经就绪,等待writerFunc执行代码
	return stacklessWriterFuncFunc(ctx)
}

// 多协程等待执行,err存储到 w 中,外部会读取
func writerFunc(ctx any) {
	w := ctx.(*writer)
	switch w.op {
	case opWrite:
		w.n, w.err = w.zw.Write(w.p)
	case opFlush:
		w.err = w.zw.Flush()
	case opClose:
		w.err = w.zw.Close()
	case opReset:
		w.zw.Reset(&w.xw) // zw内部reset的时候,需要将w.xw的位置也重新赋值上
		w.err = nil
	default:
		panic(fmt.Sprintf("BUG: unexpected op: %d", w.op))
	}
}

type xWriter struct {
	bb *bytebufferpool.ByteBuffer
}

func (w *xWriter) Write(p []byte) (int, error) {
	if w.bb == nil {
		w.bb = bufferPool.Get()
	}
	return w.bb.Write(p)
}

func (w *xWriter) Reset() {
	if w.bb != nil {
		bufferPool.Put(w.bb)
		w.bb = nil
	}
}

var bufferPool bytebufferpool.Pool

单测

package stackless

import (
	"bytes"
	"compress/flate"
	"compress/gzip"
	"fmt"
	"io"
	"testing"
	"time"
)

func TestCompressFlateSerial(t *testing.T) {
	t.Parallel()

	if err := testCompressFlate(); err != nil {
		t.Fatalf("unexpected error: %v", err)
	}
}

func TestCompressFlateConcurrent(t *testing.T) {
	t.Parallel()

	if err := testConcurrent(testCompressFlate, 10); err != nil {
		t.Fatalf("unexpected error: %v", err)
	}
}

func testCompressFlate() error {
	// 第一个参数是一个函数,将入参 w io.Writer 转换为 flate.Writer
	// 第二个参数是一个函数,将入参 r io.Reader 转换为 flate.Reader
	return testWriter(func(w io.Writer) Writer {
		// 这里是将 stackless的 xwriter 转换为 flate.Writer
		zw, err := flate.NewWriter(w, flate.DefaultCompression)
		if err != nil {
			panic(fmt.Sprintf("BUG: unexpected error: %v", err))
		}
		return zw
	}, func(r io.Reader) io.Reader {
		return flate.NewReader(r)
	})
}

func TestCompressGzipSerial(t *testing.T) {
	t.Parallel()

	if err := testCompressGzip(); err != nil {
		t.Fatalf("unexpected error: %v", err)
	}
}

func TestCompressGzipConcurrent(t *testing.T) {
	t.Parallel()

	if err := testConcurrent(testCompressGzip, 10); err != nil {
		t.Fatalf("unexpected error: %v", err)
	}
}

func testCompressGzip() error {
	return testWriter(func(w io.Writer) Writer {
		return gzip.NewWriter(w)
	}, func(r io.Reader) io.Reader {
		zr, err := gzip.NewReader(r)
		if err != nil {
			panic(fmt.Sprintf("BUG: cannot create gzip reader: %v", err))
		}
		return zr
	})
}

// 第一个参数是一个函数,将入参 w io.Writer 包裹为 flate.Writer
// 第二个参数是一个函数,将入参 r io.Reader 包裹为 flate.Reader
func testWriter(newWriter NewWriterFunc, newReader func(io.Reader) io.Reader) error {
	dstW := &bytes.Buffer{}
	// 生成一个stackless的Writer w
	// 并调用newWriter进行初始化:w.zw = newWriter(&w.xw) w.xw是一个xWriter
	// type xWriter struct {
	//	  bb *bytebufferpool.ByteBuffer
	// }
	//
	// 说白了就是将 w.xw 作为flate.Writer的底层writer来制作一个新的flate.Writer,并让 w.zw 指向这个 flate.Writer
	// 利用stackless的Writer去操作自己的w.zw,进而实现flate.Writer操作w.xw的目的
	w := NewWriter(dstW, newWriter)

	for i := 0; i < 5; i++ {
		// 这里明明已经将dstW赋值给stackless的writer了,又将其作为参数传入
		// 是为了通过dstW读取值,判断写入writer的内容是对的
		if err := testWriterReuse(w, dstW, newReader); err != nil {
			return fmt.Errorf("unexpected error when re-using writer on iteration %d: %w", i, err)
		}
		dstW = &bytes.Buffer{}
		w.Reset(dstW)
	}

	return nil
}

func testWriterReuse(w Writer, r io.Reader, newReader func(io.Reader) io.Reader) error {
	wantW := &bytes.Buffer{}
	// creates a writer that duplicates its writes to all the provided writers
	// similar to the Unix tee(1) command
	// 写入多个writer,如果一个失败,则全部返回失败
	mw := io.MultiWriter(w, wantW)
	for i := 0; i < 30; i++ {
		fmt.Fprintf(mw, "foobar %d\n", i)
		// 屏蔽掉也没问题
		if i%13 == 0 {
			// 实质是对 w.zw的Flush()
			// if err := w.Flush(); err != nil {
			// 	return fmt.Errorf("error on flush: %w", err)
			// }
		}
	}
	// 实质是对 w.zw的Close()
	// 没有Close和Flush的话,数据不会被刷入stackless的xWriter中
	w.Close()

	zr := newReader(r)
	data, err := io.ReadAll(zr)
	if err != nil {
		return fmt.Errorf("unexpected error: %w, data=%q", err, data)
	}

	// 从两个writer中读出来的结果相同
	wantData := wantW.Bytes()
	if !bytes.Equal(data, wantData) {
		return fmt.Errorf("unexpected data: %q. Expecting %q", data, wantData)
	}

	return nil
}

func testConcurrent(testFunc func() error, concurrency int) error {
	ch := make(chan error, concurrency)
	for i := 0; i < concurrency; i++ {
		go func() {
			ch <- testFunc()
		}()
	}
	for i := 0; i < concurrency; i++ {
		select {
		case err := <-ch:
			if err != nil {
				return fmt.Errorf("unexpected error on goroutine %d: %w", i, err)
			}
		case <-time.After(time.Second):
			return fmt.Errorf("timeout on goroutine %d", i)
		}
	}
	return nil
}

// 到底什么是flush
// Flush flushes any pending data to the underlying writer.
// 将任何挂起的数据写入底层的writer (underlying writer)
// It is useful mainly in compressed network protocols, to ensure that
// a remote reader has enough data to reconstruct a packet.
// 在压缩网络协议(compressed network protocols)中很有用
// 确保一个远端的reader有足够数据去重构一个包
//
// Flush does not return until the data has been written.
// Calling Flush when there is no pending data still causes the Writer
// to emit a sync marker of at least 4 bytes.
// 发出至少4个字节的同步标记(emit a sync marker)
// If the underlying writer returns an error, Flush returns that error.
//
// In the terminology of the zlib library, Flush is equivalent to Z_SYNC_FLUSH.

// flushes and closes the writer.
// close前先flushes writer
//
// func (w *Writer) Close() error {
//     return w.d.close()
// }

运用


var flateReaderPool sync.Pool

func acquireStacklessGzipWriter(w io.Writer, level int) stackless.Writer {
	nLevel := normalizeCompressLevel(level)
	p := stacklessGzipWriterPoolMap[nLevel]
	v := p.Get()
	if v == nil {
		// stackless.NewWriter 第二个参数是一个函数
		// 用于将一个具体的 *gzip.Writer 赋值给  stackless.Writer 接口
		//
		// newWriter的意思是把内部结构体writer的私有变量 xw xWriter 通过外部的方法转换为一个stackless的成员
		// 然后让内部结构体writer的zw指向它
		// 后续都是通过w.zw来操作写入
		return stackless.NewWriter(w, func(w io.Writer) stackless.Writer {
			// acquireRealGzipWriter:从资源池取一个/new一个gzipWriter并reset
			return acquireRealGzipWriter(w, level)
		})
	}
	sw := v.(stackless.Writer)
	sw.Reset(w)
	return sw
}

func releaseStacklessGzipWriter(sw stackless.Writer, level int) {
	sw.Close()
	nLevel := normalizeCompressLevel(level)
	p := stacklessGzipWriterPoolMap[nLevel]
	p.Put(sw)
}

// acquireRealGzipWriter
// 从资源池取一个/new一个gzipWriter并reset
func acquireRealGzipWriter(w io.Writer, level int) *gzip.Writer {
	nLevel := normalizeCompressLevel(level)
	p := realGzipWriterPoolMap[nLevel]
	v := p.Get()
	if v == nil {
		zw, err := gzip.NewWriterLevel(w, level)
		if err != nil {
			// gzip.NewWriterLevel only errors for invalid
			// compression levels. Clamp it to be min or max.
			if level < gzip.HuffmanOnly {
				level = gzip.HuffmanOnly
			} else {
				level = gzip.BestCompression
			}
			zw, _ = gzip.NewWriterLevel(w, level)
		}
		return zw
	}
	zw := v.(*gzip.Writer)
	zw.Reset(w)
	return zw
}

总结

如此精妙的设计,真的太厉害了
利用stackless的NewWriter,来实现接管一个真正的复杂的高层Writer
复杂的高层Writer利用stackless的writer提供的xWriter来做它的底层io,将编码后的数据存入里面
stackless的Flush和Close其实就是接管的复杂的高层Writer的这两,
最终stackless的worker将上述结果存入 dstWriter 中

相关推荐

  1. gofasthttp学习stacklesswriter

    2024-03-17 21:48:01       38 阅读
  2. gofasthttp学习

    2024-03-17 21:48:01       39 阅读
  3. gosingleflight学习

    2024-03-17 21:48:01       48 阅读
  4. goslice学习

    2024-03-17 21:48:01       41 阅读
  5. PostgreSQLfull_page_writes

    2024-03-17 21:48:01       60 阅读
  6. Go语言GC

    2024-03-17 21:48:01       56 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-03-17 21:48:01       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-03-17 21:48:01       101 阅读
  3. 在Django里面运行非项目文件

    2024-03-17 21:48:01       82 阅读
  4. Python语言-面向对象

    2024-03-17 21:48:01       91 阅读

热门阅读

  1. MySQL 索引

    2024-03-17 21:48:01       39 阅读
  2. Qt——智能指针实战

    2024-03-17 21:48:01       45 阅读
  3. spring MVC 自定义注解实现路径匹配

    2024-03-17 21:48:01       42 阅读
  4. qt之画图

    2024-03-17 21:48:01       34 阅读
  5. Spring中的bean相关问题

    2024-03-17 21:48:01       48 阅读
  6. Ts中WebSocket连接管理与维护教程

    2024-03-17 21:48:01       30 阅读