用go实现一个任务调度类 (泛型)

用go实现一个任务调度类 (泛型)

源码地址:
https://github.com/robinfoxnan/BirdTalkServer/blob/main/server/core/workmanager.go

1.概述

实现了一个简单的任务管理系统,允许用户定义任务和工作者,并将任务分配给工作者进行处理。这个系统旨在提供一个灵活的任务管理框架,可以根据需要动态地添加和移除工作者,以及处理任务。

2.主要功能

  1. 定义了 Task 接口和 Worker 接口,用于表示任务和工作者;
  2. 提供了基础的任务类型 BaseTask 和基础的工作者类型 BaseWorker,用户可以基于这些基础类型来实现自定义的任务和工作者。需要在 BaseTask结构上继承一个新的结构,并实现Process方法;
  3. 实现了一个泛型任务管理器 Manager,用于管理工作者并分配任务给工作者。根据最大工作者个数和任务队列长度,动态地添加工作者。提供了停止所有工作者的方法,提供了方法来等待所有工作者完成任务。

3.类型和接口

3.1Task 任务接口

type Task interface {
	Process()
}

任务接口定义了一个 Process() 方法,用于执行任务的处理逻辑。

3.2Worker 接口

type Worker interface {
	Init(id int64, taskChan chan Task, wg *sync.WaitGroup, f WorkerCleanF)
	Start()
	Stop()
}

工作者接口定义了三个方法:

  • Init() 方法用于初始化工作者。创建后,设置工作者ID,任务通道,同步组,以及一个析构函数类似的清理函数;
  • Start() 方法用于启动工作者协程,开始处理任务;
  • Stop() 方法用于停止工作者;(关闭通道)

3.3BaseTask 结构体

这是一个最基础的示例,后续自定义结构可以包含这个结构:

type BaseTask struct {
	Id int64
}

基础任务结构体包含一个任务 ID,实现了 Task 接口的 Process() 方法,用于执行任务的处理逻辑。

3.4BaseWorker 结构体

type BaseWorker struct {
	Id       int64
	waitGrp  *sync.WaitGroup
	taskChan chan Task
	cleanFun WorkerCleanF
	quitChan chan struct{}
}

基础工作者结构体包含工作者 ID、等待组、任务通道、清理函数和退出通道,实现了 Worker 接口的 Init()Start()Stop() 方法,用于初始化工作者、启动工作者和停止工作者。

4. Manager 结构体

type Manager[T Task, W Worker] struct {
	workers       map[int64]W    // 使用一个map管理各个协程
	maxWorkers    int64          // 最大协程数量
	workerCounter int64          // 使用原子方式计数
	taskChan      chan Task      // 任务通道
	lock          sync.Mutex     // map用的锁
	wg            sync.WaitGroup // 同步组
	newWorkerFunc func() W       // 用于创建泛型中工作者结构的函数
	exiting       int32          // 退出状态标记,防止停止过程中加入任务
	workerIdSeq   int64          // 协程序号,可以用雪花算法代替,一般应该够用
}

任务管理器结构体包含了一个工作者映射、最大工作者数量、工作者计数器、任务通道、互斥锁、等待组、新建工作者函数、退出标志和工作者 ID 序列,提供了方法来添加任务、移除工作者、等待所有工作者完成任务和停止所有工作者。

5. 使用示例

最简单的一个测试示例

    manager := NewManager[Task, *BaseWorker](20, NewBaseWorker)

	// 添加示例任务到管理器
	go func() {
		for i := 0; i < 10; i++ {
			var t = &BaseTask{Id: int64(i)}
			manager.AddTask(t)
		}
	}()

	time.Sleep(time.Minute * 1)
	manager.StopAll()
	// 等待所有工作者完成任务
	manager.Wait()

我们需要重新定义一个结构用于表示任务,通常需要更多的字段

type CustomTask struct {
	BaseTask
	AdditionalInfo string
    // 这里添加更多的字段
}

// 实现 Task 接口的 Run 方法,
// 必须要实现这个函数,这是任务调度的功能入口,在协程中运行
func (t *CustomTask) Process() {
	fmt.Printf("CustomTask with additional info '%s' is running\n", t.AdditionalInfo)
	// 调用父类的 Process 方法
	//t.BaseTask.Process()
}


重写测试:


func TestWorkers(t *testing.T) {
	manager := NewManager[Task, *BaseWorker](20, NewBaseWorker)

	// 添加示例工作者到管理器

	// 添加示例任务到管理器
	go func() {
		for i := 0; i < 10; i++ {
			var t = &BaseTask{Id: int64(i)}
			manager.AddTask(t)
		}
        
		for i := 10; i < 16; i++ {
			var t = &CustomTask{BaseTask: BaseTask{Id: int64(i)}, AdditionalInfo: "Custom Info"}
			manager.AddTask(t)
		}
	}()

	time.Sleep(time.Minute * 1)
	manager.StopAll()
	// 等待所有工作者完成任务
	manager.Wait()
}

结论

各个语言实现的这个轮子基本都差不多。

相关推荐

  1. go实现一个任务调度

    2024-03-25 15:42:04       44 阅读
  2. GO - 编程

    2024-03-25 15:42:04       39 阅读
  3. GO——

    2024-03-25 15:42:04       24 阅读
  4. 14 # 约束

    2024-03-25 15:42:04       53 阅读
  5. Go专家编程——

    2024-03-25 15:42:04       25 阅读
  6. JDBC封装,和反射实现

    2024-03-25 15:42:04       67 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-03-25 15:42:04       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-03-25 15:42:04       100 阅读
  3. 在Django里面运行非项目文件

    2024-03-25 15:42:04       82 阅读
  4. Python语言-面向对象

    2024-03-25 15:42:04       91 阅读

热门阅读

  1. C++之Const与指针

    2024-03-25 15:42:04       34 阅读
  2. C++之内存分区

    2024-03-25 15:42:04       42 阅读
  3. ChatGPT:开启智能对话,提升论文写作能力

    2024-03-25 15:42:04       35 阅读
  4. 线段树CF 练习题

    2024-03-25 15:42:04       45 阅读
  5. oppo,快手25届暑期实习内推

    2024-03-25 15:42:04       43 阅读
  6. 网络工程师软考中级考试大纲

    2024-03-25 15:42:04       40 阅读
  7. 贪心算法的魅力与应用

    2024-03-25 15:42:04       40 阅读
  8. node.js常用命令

    2024-03-25 15:42:04       35 阅读
  9. 概率论基础概念

    2024-03-25 15:42:04       40 阅读
  10. 多线程面试专题

    2024-03-25 15:42:04       36 阅读
  11. 分布式:这里详细的说一下分布式

    2024-03-25 15:42:04       44 阅读
  12. 分布式简介

    2024-03-25 15:42:04       45 阅读
  13. NPM常用命令详解

    2024-03-25 15:42:04       43 阅读
  14. 使用js判断元素是否在页面最上层且被展示

    2024-03-25 15:42:04       44 阅读