golang实现延迟队列(delay queue)

golang实现延迟队列

1 延迟队列:邮件提醒、订单自动取消

延迟队列:处理需要在未来某个特定时间执行的任务。这些任务被添加到队列中,并且指定了一个执行时间,只有达到指定的时间点时才能从队列中取出并执行。
应用场景:

  • 邮件提醒
  • 订单自动取消(超过多少时间未支付,就取消订单)
  • 对超时任务的处理等

由于任务的执行是在未来的某个时间点,因此这些任务不会立即执行,而是存储在队列中,直到它的预定执行时间才会被执行。

2 实现

2.1 simple简单版:go自带的time包实现

思路:

  1. 定义Task结构体,包含
  • ExecuteTime time.Time
  • Job func()
  1. 定义DelayQueue
  • TaskQueue []Task
  • func AddTask
  • func RemoveTask
  • ExecuteTask

这种方案存在的问题:

Go程序重启时,存储在slice中的延迟处理任务将全部丢失

完整代码:

package main

import (
	"fmt"
	"time"
)

/*
基于go实现延迟队列
*/
type Task struct {
   
	ExecuteTime time.Time
	Job         func()
}

type DelayQueue struct {
   
	Tasks []*Task
}

func (d *DelayQueue) AddTask(t *Task) {
   
	d.Tasks = append(d.Tasks, t)
}

func (d *DelayQueue) RemoveTask() {
   
	//FIFO: remove the first task to enqueue
	d.Tasks = d.Tasks[1:]
}

func (d *DelayQueue) ExecuteTask() {
   
	for len(d.Tasks) > 0 {
   
		//dequeue a task
		currentTask := d.Tasks[0]
		if time.Now().Before(currentTask.ExecuteTime) {
   
			//if the task execution time is not up, wait
			time.Sleep(currentTask.ExecuteTime.Sub(time.Now()))
		}
		//execute the task
		currentTask.Job()
		//remove task who has been executed
		d.RemoveTask()
	}

}

func main() {
   
	fmt.Println("start delayQueue")
	delayQueue := &DelayQueue{
   }
	firstTask := &Task{
   
		ExecuteTime: time.Now().Add(time.Second * 1),
		Job: func() {
   
			fmt.Println("executed task 1 after delay")
		},
	}
	delayQueue.AddTask(firstTask)
	secondTask := &Task{
   
		ExecuteTime: time.Now().Add(time.Second * 7),
		Job: func() {
   
			fmt.Println("executed task 2 after delay")
		},
	}
	delayQueue.AddTask(secondTask)
	delayQueue.ExecuteTask()
	fmt.Println("all tasks have been done!!!")
}

效果:
在这里插入图片描述

2.2 complex持久版:go+redis

为了防止Go重启后存储到delayQueue的数据丢失,我们可以将任务持久化到redis中。

思路:

  1. 初始化redis连接
  2. 延迟队列采用redis的zset(有序集合)实现

前置准备:

# 安装docker
yum install -y yum-utils
yum-config-manager \
    --add-repo \
    https://download.docker.com/linux/centos/docker-ce.repo
yum install docker
systemctl start docker

# docker搭建redis
mkdir -p /Users/ziyi2/docker-home/redis
docker run -d --name redis -v /Users/ziyi2/docker-home/redis:/data -p 6379:6379 redis

完整代码:

package main

import (
	"fmt"
	"github.com/go-redis/redis"
	log "github.com/ziyifast/log"
	"time"
)

/*
基于redis zset实现延迟队列
*/
var redisdb *redis.Client
var DelayQueueKey = "delay-queue"

func initClient() (err error) {
   
	redisdb = redis.NewClient(&redis.Options{
   
		Addr:     "localhost:6379",
		Password: "", // not set password
		DB:       0,  //use default db
	})
	_, err = redisdb.Ping().Result()
	if err != nil {
   
		log.Errorf("%v", err)
		return err
	}
	return nil
}

func main() {
   
	err := initClient()
	if err != nil {
   
		log.Errorf("init redis client err: %v", err)
		return
	}
	addTaskToQueue("task1", time.Now().Add(time.Second*3).Unix())
	addTaskToQueue("task2", time.Now().Add(time.Second*8).Unix())
	//执行队列中的任务
	getAndExecuteTask()
}

// executeTime为unix时间戳,作为zset中的score。允许redis按照task应该执行时间来进行排序
func addTaskToQueue(task string, executeTime int64) {
   
	err := redisdb.ZAdd(DelayQueueKey, redis.Z{
   
		Score:  float64(executeTime),
		Member: task,
	}).Err()
	if err != nil {
   
		panic(err)
	}
}

// 从redis中取一个task并执行
func getAndExecuteTask() {
   
	for {
   
		tasks, err := redisdb.ZRangeByScore(DelayQueueKey, redis.ZRangeBy{
   
			Min:    "-inf",
			Max:    fmt.Sprintf("%d", time.Now().Unix()),
			Offset: 0,
			Count:  1,
		}).Result()
		if err != nil {
   
			time.Sleep(time.Second * 1)
			continue
		}
		//处理任务
		for _, task := range tasks {
   
			fmt.Println("Execute task: ", task)
			//执行完任务之后用 ZREM 移除该任务
			redisdb.ZRem(DelayQueueKey, task)
		}
		time.Sleep(time.Second * 1)
	}
}

效果:

redis一直从延迟队列中取数据,如果处理完一批则睡眠1s

  • 具体根据大家的业务调整,此处主要介绍思路

在这里插入图片描述

相关推荐

  1. kafka实现延迟队列

    2024-02-22 08:18:03       44 阅读
  2. golang实现循环队列

    2024-02-22 08:18:03       38 阅读
  3. golang实现循环队列

    2024-02-22 08:18:03       29 阅读
  4. DelayQueue

    2024-02-22 08:18:03       36 阅读
  5. Redis实现延迟任务队列(一)

    2024-02-22 08:18:03       48 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-02-22 08:18:03       98 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-02-22 08:18:03       106 阅读
  3. 在Django里面运行非项目文件

    2024-02-22 08:18:03       87 阅读
  4. Python语言-面向对象

    2024-02-22 08:18:03       96 阅读

热门阅读

  1. el-date-picker(日期时间选择)那些事

    2024-02-22 08:18:03       51 阅读
  2. uniapp引入微信小程序直播组件

    2024-02-22 08:18:03       63 阅读
  3. uniapp监听TV电视遥控器的红外按键事件

    2024-02-22 08:18:03       48 阅读
  4. 蓝桥杯基础知识点9 stack、queue、priority_queue

    2024-02-22 08:18:03       48 阅读
  5. Hive--删除数据库

    2024-02-22 08:18:03       48 阅读
  6. 过滤器(Filter)

    2024-02-22 08:18:03       44 阅读
  7. Hive JDBC

    Hive JDBC

    2024-02-22 08:18:03      51 阅读