DelayQueue实现延时任务

DelayQueue

DelayQueue是一个有序的阻塞队列,用于在指定的延迟之后从队列中提取元素。它在调度任务、缓存清除、延迟任务处理等场景中非常有用。

基本概念

DelayQueue位于java.util.concurrent包中,它是一个无界的阻塞队列,元素必须实现Delayed接口。队列中的元素按照它们的到期时间排序,只有到期的元素才能从队列中提取。

Delayed接口

Delayed接口扩展了Comparable接口,要求实现以下方法:

  • long getDelay(TimeUnit unit): 返回元素的剩余延迟时间。
  • int compareTo(Delayed other): 用于比较元素的到期时间。
/**
 * Test类实现了Delayed接口
 * Delayed接口要求实现getDelay和compareTo方法。
 */
public class Test implements Delayed {

    /**
     * 返回元素的延迟时间。
     * @param unit 时间单位
     * @return 延迟时间,单位为指定的时间单位。
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return 0;
    }

    /**
     * 比较两个延迟元素的顺序。
     * @param o 另一个延迟元素
     * @return 比较结果,负数表示当前元素在前,正数表示当前元素在后,0表示相等。
     */
    @Override
    public int compareTo(Delayed o) {
        return 0;
    }
}


示例

下面是一个简单的示例,展示了如何使用DelayQueue

第一步:定义延时任务

/**
 * DelayTask类实现了Delayed接口,用于表示延迟任务。
 * @param <T> 任务数据的类型
 */
@Data
public class DelayTask<T> implements Delayed {

    // 执行的任务数据
    private T data;
    // 任务的执行时间,使用纳秒表示
    private long activeTime;

    public DelayTask(T data, Duration delayTime) {
        this.data = data;
        this.activeTime = System.nanoTime() + delayTime.toNanos();
    }

    /**
     * 获取任务的剩余延迟时间。
     *
     * @param unit 时间单位
     * @return 剩余的延迟时间,单位为指定的时间单位
     */
    @Override
    public long getDelay(TimeUnit unit) {
        // unit时间单位
        // 而 convert 方法用于将时间从一个单位转换到另一个单位
        return unit.convert(Math.max(0, activeTime - System.nanoTime()), TimeUnit.NANOSECONDS);
    }

    /**
     * 比较两个延迟任务的顺序。
     *
     * @param o 另一个延迟任务
     * @return 比较结果,负数表示当前任务在前,正数表示当前任务在后,0表示相等
     */
    @Override
    public int compareTo(Delayed o) {
        long l = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
        if (l > 0) {
            return 1;
        } else if (l < 0) {
            return -1;
        } else {
            return 0;
        }
    }
}

第二步:测试

/**
 * DelayTaskTest类用于测试DelayQueue的使用。
 * 通过SpringBootTest注解加载Spring应用上下文。
 */
@SpringBootTest(classes = LearningApplication.class)
@Slf4j  
public class DelayTaskTest {
    @Test
    public void test() throws InterruptedException {
        // 创建一个DelayQueue,用于存放DelayTask任务
        DelayQueue<DelayTask> tasks = new DelayQueue<>();

        tasks.add(new DelayTask("task1", Duration.ofSeconds(3)));
        tasks.add(new DelayTask("task2", Duration.ofSeconds(6)));
        tasks.add(new DelayTask("task3", Duration.ofSeconds(7)));

        // 无限循环,从队列中取出并执行任务
        while (true) {
            // 从队列中取出到期的任务,如果没有到期任务则阻塞等待
            DelayTask take = tasks.take();
            // 处理任务 这里仅打印
            log.info("{} is executed", take.getData());
        }
    }
}

DelayTaskTest类通过测试方法展示了如何使用DelayQueue来处理延迟任务。它向队列中添加了多个延迟任务,并通过无限循环从队列中取出并执行到期的任务。这种机制在定时任务调度、延迟消息处理等场景中非常有用。

取出任务用take(): 检索并移除队列头部的元素,如果没有到期元素则阻塞

内部实现原理

DelayQueue的内部实现依赖于PriorityQueuePriorityQueue保证了队列元素的自然顺序或通过提供的比较器进行排序。

  • 添加元素:当元素被添加到DelayQueue时,会根据其到期时间进行排序。【有序】
  • 提取元素:只有在getDelay()方法返回的延迟时间小于等于零时,元素才能从队列中提取出来。

应用场景

DelayQueue可以应用在以下场景:

  1. 定时任务调度:用于执行延迟任务。比如订单超时取消
  2. 缓存过期处理:在缓存中存储元素,并在元素到期时将其移除。
  3. 消息延迟处理:在消息队列中处理延迟消息。

小结

DelayQueue在处理延迟任务和定时调度任务中非常有用。通过实现Delayed接口,可以轻松地创建自定义的延迟元素,并将其添加到DelayQueue中进行管理。

模拟超时订单处理

Order 实体类

@Data
public class Order implements Delayed {
    private String orderId;
    private boolean completed;
    private long endTime;

    public Order(String orderId, Duration delayTime) {
        this.orderId = orderId;
        this.completed = false;
        this.endTime = System.currentTimeMillis() + delayTime.toMillis();
    }
    
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
    }
}

OrderController 类

@RestController
@RequestMapping("/orders")
public class OrderController {
    @Autowired
    private OrderService orderService;

    @PostMapping("/create")
    public String createOrder(@RequestParam String orderId, @RequestParam long delayInSeconds) {
        orderService.createOrder(orderId, delayInSeconds);
        return "Order created: " + orderId;
    }

    @PostMapping("/complete")
    public String completeOrder(@RequestParam String orderId) {
        boolean result = orderService.completeOrder(orderId);
        return result ? "Order completed: " + orderId : "Order not found or already completed: " + orderId;
    }
  
    // ...
}

OrderService 类

@Service
public class OrderService {
    @Autowired
    private OrderMapper orderMapper;
    private final DelayQueue<Order> delayQueue = new DelayQueue<>();

    public OrderService() {
        OrderConsumer consumer = new OrderConsumer(delayQueue, this);
        Thread consumerThread = new Thread(consumer);
        consumerThread.start();
    }

    public void createOrder(String orderId, long delayInSeconds) {
        Order order = new Order(orderId, Duration.ofSeconds(delayInSeconds));
        delayQueue.add(order);
        orderMapper.insertOrder(order); // 插入订单到数据库
    }

    public boolean completeOrder(String orderId) {
        for (Order order : delayQueue) {
            if (order.getOrderId().equals(orderId) && !order.isCompleted()) {
                order.complete();
                orderMapper.updateOrderStatus(orderId, "COMPLETED");
                return true;
            }
        }
        return false;
    }

    public void cancelOrder(String orderId) {
        orderMapper.updateOrderStatus(orderId, "CANCELLED");
    }
}

OrderConsumer 类

@Slf4j
public class OrderConsumer implements Runnable {
    private final DelayQueue<Order> delayQueue;
    private final OrderService orderService;

    public OrderConsumer(DelayQueue<Order> delayQueue, OrderService orderService) {
        this.delayQueue = delayQueue;
        this.orderService = orderService;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Order order = delayQueue.take();
                if (!order.isCompleted()) {
                    log.info("Order {} is cancelled due to timeout.", order.getOrderId());
                    orderService.cancelOrder(order.getOrderId());
                } else {
                    log.info("Order {} is completed.", order.getOrderId());
                }
            } catch (InterruptedException e) {
                log.error(e);
            }
        }
    }
}

❤觉得有用的可以留个关注~❤

相关推荐

  1. DelayQueue实现任务

    2024-07-12 22:36:05       19 阅读
  2. 基于Redis实现任务

    2024-07-12 22:36:05       34 阅读
  3. SpringBoot 异步任务

    2024-07-12 22:36:05       109 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-12 22:36:05       67 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-12 22:36:05       72 阅读
  3. 在Django里面运行非项目文件

    2024-07-12 22:36:05       58 阅读
  4. Python语言-面向对象

    2024-07-12 22:36:05       69 阅读

热门阅读

  1. P1035 [NOIP2002 普及组] 级数求和 题解

    2024-07-12 22:36:05       20 阅读
  2. 留学生需要注意所谓的“写作套路”

    2024-07-12 22:36:05       23 阅读
  3. C++ 项目实践课设 图书馆管理系统

    2024-07-12 22:36:05       17 阅读
  4. 计算机网络 5.6网桥与交换机

    2024-07-12 22:36:05       15 阅读
  5. 安全开发--多语言基础知识

    2024-07-12 22:36:05       18 阅读
  6. Requests库如何用于发送HTTP请求

    2024-07-12 22:36:05       24 阅读