SpringBoot系列之使用Redis实现延时队列

需求背景

在项目中可能会遇到这样的业务场景,下单后,并没有支付,隔多久后,如果用户还没支付,就自动取消订单,针对这种延时支付的场景,有很多实现方法,可以用MQ来实现,但是如果针对一个小项目,要引入MQ,有时候会觉得成本有点大,那可以用redis来实现,redis实现延时队列也有多种方案,比如可以使用发布订阅模式来实现,也可以直接用zSet集合来实现,本博客通过zSet集合来实现延时队列,仅供参考

实现思路

要实现延时队列,可以使用redis zSet的一些命令,比如生产者生成消息,就加入队列里,先简单用定时任务,通过当前的时间戳获取所有的消息,到期的消息自动消费

  • 发送消息,添加到队列里
# key为队列的名称,score为当前的时间戳加上延迟时间,value为消息体
zadd key score value
  • 根据当前时间戳获取所有的消息数据
# key为队列的名称,min为0,max为当前的时间戳
zrangebyscore key min max
  • 消费了,就移出队列
# key为队列的名称,value为消息体
zrem key value

实验环境

  • JDK 1.8

  • SpringBoot 2.2.1

  • Maven 3.2+

  • Mysql 8.0.26

  • redis 6.2.14

  • 开发工具

    • IntelliJ IDEA
    • smartGit

动手实践

先新建一个SpringBoot项目,使用Spring Initializr

在这里插入图片描述

选择需要的依赖,比如Spring Data redis

在这里插入图片描述

新建一个application.yml,加上redis的配置

spring:
  redis:
    host: 127.0.0.1
    port: 6379
    password:
    database: 0

新建一个消息实体类

package com.example.delayqueue.core;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Message {
   

    private String id;

    private String value;

    private long delayTime;

    protected String topic;

}

定义一个延时队列方法的接口类

package com.example.delayqueue.core;

import java.util.List;

public interface DelayQueue {
   

    boolean push(Message message);

    List<Message> pull();

    boolean remove(Message message);

}

通过redis实现延时队列,具体的实现类

package com.example.delayqueue.core;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.convert.ConvertException;
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

@Slf4j
@Component
public class RedisDelayQueue implements DelayQueue {
   

    private static final String DELAY_QUEUE_NAME = "delay_queue";

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Override
    public boolean push(Message message) {
   
        long score = System.currentTimeMillis() + message.getDelayTime() * 1000;
        String msg = JSONUtil.toJsonStr(message);
        return redisTemplate.opsForZSet().add(DELAY_QUEUE_NAME, msg, score);
    }

    @Override
    public boolean remove(Message message) {
   
        String msg = JSONUtil.toJsonStr(message);
        Long remove = redisTemplate.opsForZSet().remove(DELAY_QUEUE_NAME, msg);
        return remove > 0 ? true : false;
    }

    @Override
    public List<Message> pull() {
   
        List<Message> msgList = CollUtil.newArrayList();
        try {
   
            Set<String> stringSet = Optional.ofNullable(redisTemplate.opsForZSet().rangeByScore(DELAY_QUEUE_NAME, 0, System.currentTimeMillis())).orElse(CollUtil.newHashSet());
            msgList = stringSet.stream().map(str -> {
   
                Message message = null;
                try {
   
                    message = JSONUtil.toBean(str, Message.class);
                } catch (ConvertException e) {
   
                    log.error("toBean exception:{}", e);
                }
                return message;
            }).collect(Collectors.toList());
        } catch (Exception e) {
   
            log.error("exception:{}", e);
        }

        return msgList;
    }
}

消息生产者,生成消息:

package com.example.delayqueue.service;

import cn.hutool.core.date.DateUtil;
import cn.hutool.json.JSONUtil;
import com.example.delayqueue.core.Message;
import com.example.delayqueue.core.RedisDelayQueue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class MessageProducer {
   

    @Autowired
    private RedisDelayQueue redisDelayQueue;

    public void pushMessage(Message message) {
   
        log.info("push message:{},now:{}", JSONUtil.toJsonStr(message), DateUtil.now());
        redisDelayQueue.push(message);
    }

}

消息消费者,暂时用定时任务实现

package com.example.delayqueue.service;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.json.JSONUtil;
import com.example.delayqueue.core.Message;
import com.example.delayqueue.core.RedisDelayQueue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

@Service
@Slf4j
public class MessageConsumer implements InitializingBean {
   

    private ExecutorService executorService;
    
    @Autowired
    private RedisDelayQueue redisDelayQueue;

    @Override
    public void afterPropertiesSet() throws Exception {
   
        executorService = new ThreadPoolExecutor(
                20,30,60L, TimeUnit.SECONDS
                    ,new ArrayBlockingQueue<>(100),new MyThreadFactory());
    }

    @Scheduled(cron = "*/3 * * * * * ")
    public void consumer() {
   
        log.info("ready consumer...");
        executorService.execute(() -> {
   
            List<Message> messageList = Optional.ofNullable(redisDelayQueue.pull()).orElse(CollUtil.newArrayList());
            if (CollUtil.isNotEmpty(messageList)) {
   
                messageList.stream().forEach(message -> {
   
                    log.info("consumer {},consumer time:{}", JSONUtil.toJsonStr(message), DateUtil.now());
                    redisDelayQueue.remove(message);
                });
            }

        });
    }


    class MyThreadFactory implements ThreadFactory{
   
        final AtomicInteger threadNumber = new AtomicInteger(0);
        @Override
        public Thread newThread(Runnable r){
   
            Thread t = new Thread(r);
            t.setName("thread-"+threadNumber.getAndIncrement());
            t.setDaemon(true);
            return t;
        }
    }
}

项目跑起来,加上一个测试类

package com.example.delayqueue.controller;

import cn.hutool.core.util.IdUtil;
import com.example.delayqueue.core.Message;
import com.example.delayqueue.service.MessageProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class SampleController {
   


    @Autowired
    private MessageProducer messageProducer;

    @GetMapping("test")
    public void test() {
   
        Message message = Message.builder()
                .id(IdUtil.fastSimpleUUID())
                .value("msg"+IdUtil.fastSimpleUUID())
                .topic("testTopic")
                .delayTime(3)
                .build();
        messageProducer.pushMessage(message);
    }

}

SpringBoot启动类

package com.example.delayqueue;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
public class SpringbootRedisDelayQueueApplication {
   

    public static void main(String[] args) {
   
        SpringApplication.run(SpringbootRedisDelayQueueApplication.class, args);
    }

}

相关推荐

  1. 使用Redis实现队列

    2024-02-20 15:00:03       33 阅读
  2. redis实现队列

    2024-02-20 15:00:03       27 阅读
  3. Springboot 集成Rabbitmq队列

    2024-02-20 15:00:03       33 阅读
  4. Redis简易队列

    2024-02-20 15:00:03       65 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-02-20 15:00:03       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-02-20 15:00:03       100 阅读
  3. 在Django里面运行非项目文件

    2024-02-20 15:00:03       82 阅读
  4. Python语言-面向对象

    2024-02-20 15:00:03       91 阅读

热门阅读

  1. 基于python+mysql的宠物领养网站系统

    2024-02-20 15:00:03       53 阅读
  2. Python 进阶语法:正则表达式

    2024-02-20 15:00:03       46 阅读
  3. Android app启动优化 2

    2024-02-20 15:00:03       55 阅读
  4. 【算法 - 动态规划】力扣 691. 贴纸拼词

    2024-02-20 15:00:03       55 阅读
  5. typescript type 类型别名详解

    2024-02-20 15:00:03       55 阅读
  6. macad3d解析macad—application,commands,utils

    2024-02-20 15:00:03       41 阅读
  7. unity中UI、shader显示在3D物体前

    2024-02-20 15:00:03       53 阅读
  8. LeetCode 93. 复原 IP 地址

    2024-02-20 15:00:03       56 阅读
  9. yum方式快速安装mysql

    2024-02-20 15:00:03       53 阅读