需求背景
在项目中可能会遇到这样的业务场景,下单后,并没有支付,隔多久后,如果用户还没支付,就自动取消订单,针对这种延时支付的场景,有很多实现方法,可以用MQ来实现,但是如果针对一个小项目,要引入MQ,有时候会觉得成本有点大,那可以用redis来实现,redis实现延时队列也有多种方案,比如可以使用发布订阅模式来实现,也可以直接用zSet集合来实现,本博客通过zSet集合来实现延时队列,仅供参考
实现思路
要实现延时队列,可以使用redis zSet的一些命令,比如生产者生成消息,就加入队列里,先简单用定时任务,通过当前的时间戳获取所有的消息,到期的消息自动消费
- 发送消息,添加到队列里
# key为队列的名称,score为当前的时间戳加上延迟时间,value为消息体
zadd key score value
- 根据当前时间戳获取所有的消息数据
# key为队列的名称,min为0,max为当前的时间戳
zrangebyscore key min max
- 消费了,就移出队列
# key为队列的名称,value为消息体
zrem key value
实验环境
JDK 1.8
SpringBoot 2.2.1
Maven 3.2+
Mysql 8.0.26
redis 6.2.14
开发工具
- IntelliJ IDEA
- smartGit
动手实践
先新建一个SpringBoot项目,使用Spring Initializr
选择需要的依赖,比如Spring Data redis
新建一个application.yml,加上redis的配置
spring:
redis:
host: 127.0.0.1
port: 6379
password:
database: 0
新建一个消息实体类
package com.example.delayqueue.core;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Message {
private String id;
private String value;
private long delayTime;
protected String topic;
}
定义一个延时队列方法的接口类
package com.example.delayqueue.core;
import java.util.List;
public interface DelayQueue {
boolean push(Message message);
List<Message> pull();
boolean remove(Message message);
}
通过redis实现延时队列,具体的实现类
package com.example.delayqueue.core;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.convert.ConvertException;
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@Slf4j
@Component
public class RedisDelayQueue implements DelayQueue {
private static final String DELAY_QUEUE_NAME = "delay_queue";
@Autowired
private StringRedisTemplate redisTemplate;
@Override
public boolean push(Message message) {
long score = System.currentTimeMillis() + message.getDelayTime() * 1000;
String msg = JSONUtil.toJsonStr(message);
return redisTemplate.opsForZSet().add(DELAY_QUEUE_NAME, msg, score);
}
@Override
public boolean remove(Message message) {
String msg = JSONUtil.toJsonStr(message);
Long remove = redisTemplate.opsForZSet().remove(DELAY_QUEUE_NAME, msg);
return remove > 0 ? true : false;
}
@Override
public List<Message> pull() {
List<Message> msgList = CollUtil.newArrayList();
try {
Set<String> stringSet = Optional.ofNullable(redisTemplate.opsForZSet().rangeByScore(DELAY_QUEUE_NAME, 0, System.currentTimeMillis())).orElse(CollUtil.newHashSet());
msgList = stringSet.stream().map(str -> {
Message message = null;
try {
message = JSONUtil.toBean(str, Message.class);
} catch (ConvertException e) {
log.error("toBean exception:{}", e);
}
return message;
}).collect(Collectors.toList());
} catch (Exception e) {
log.error("exception:{}", e);
}
return msgList;
}
}
消息生产者,生成消息:
package com.example.delayqueue.service;
import cn.hutool.core.date.DateUtil;
import cn.hutool.json.JSONUtil;
import com.example.delayqueue.core.Message;
import com.example.delayqueue.core.RedisDelayQueue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class MessageProducer {
@Autowired
private RedisDelayQueue redisDelayQueue;
public void pushMessage(Message message) {
log.info("push message:{},now:{}", JSONUtil.toJsonStr(message), DateUtil.now());
redisDelayQueue.push(message);
}
}
消息消费者,暂时用定时任务实现
package com.example.delayqueue.service;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.json.JSONUtil;
import com.example.delayqueue.core.Message;
import com.example.delayqueue.core.RedisDelayQueue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@Service
@Slf4j
public class MessageConsumer implements InitializingBean {
private ExecutorService executorService;
@Autowired
private RedisDelayQueue redisDelayQueue;
@Override
public void afterPropertiesSet() throws Exception {
executorService = new ThreadPoolExecutor(
20,30,60L, TimeUnit.SECONDS
,new ArrayBlockingQueue<>(100),new MyThreadFactory());
}
@Scheduled(cron = "*/3 * * * * * ")
public void consumer() {
log.info("ready consumer...");
executorService.execute(() -> {
List<Message> messageList = Optional.ofNullable(redisDelayQueue.pull()).orElse(CollUtil.newArrayList());
if (CollUtil.isNotEmpty(messageList)) {
messageList.stream().forEach(message -> {
log.info("consumer {},consumer time:{}", JSONUtil.toJsonStr(message), DateUtil.now());
redisDelayQueue.remove(message);
});
}
});
}
class MyThreadFactory implements ThreadFactory{
final AtomicInteger threadNumber = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r){
Thread t = new Thread(r);
t.setName("thread-"+threadNumber.getAndIncrement());
t.setDaemon(true);
return t;
}
}
}
项目跑起来,加上一个测试类
package com.example.delayqueue.controller;
import cn.hutool.core.util.IdUtil;
import com.example.delayqueue.core.Message;
import com.example.delayqueue.service.MessageProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class SampleController {
@Autowired
private MessageProducer messageProducer;
@GetMapping("test")
public void test() {
Message message = Message.builder()
.id(IdUtil.fastSimpleUUID())
.value("msg"+IdUtil.fastSimpleUUID())
.topic("testTopic")
.delayTime(3)
.build();
messageProducer.pushMessage(message);
}
}
SpringBoot启动类
package com.example.delayqueue;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class SpringbootRedisDelayQueueApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootRedisDelayQueueApplication.class, args);
}
}