CountDownLatch实战应用——实现异步多线程业务处理,异常情况回滚全部子线程

😊 @ 作者: 一恍过去
🎊 @ 社区: Java技术栈交流
🎉 @ 主题: CountDownLatch实战应用——实现异步多线程业务处理,异常情况回滚全部子线程
⏱️ @ 创作时间: 2023年12月1日7

在这里插入图片描述

1、概述

CountDownLatch是一个同步器工具类,用来协调多个线程之间的同步,能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行,不可重置使用。

2、实现

使用一个计数器进行实现,计数器初始值为线程的数量,当每一个线程完成自己任务后,计数器的值就会减一,当计数器的值为0时,在CountDownLatch上等待的线程就可以恢复执行接下来的任务。

3、方法说明:

  • public void countDown():递减锁存器的计数,如果计数到达零,则释放所有等待的线程。如果当前计数大于零,则将计数减少.
  • public viod await() /boolean await(long timeout,TimeUnit unit) :使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。如果当前计数为零,则此方法立刻返回true值。当线程调用了CountDownLatch对象的该方法后,当前线程会被阻塞,直到下面的情况之一发生才会返回:
    • 如果计数到达零,则该方法返回true值。
    • 如果当前线程,在进入此方法时已经设置了该线程的中断状态;或者在等待时被中断,则抛出InterruptedException,并且清除当前线程的已中断状态。
    • 如果超出了指定的等待时间,则返回值为false。如果该时间小于等于零,则该方法根本不会等待。参数:timeout-要等待的最长时间、unit-timeout 参数的时间单位

4、代码实例

Controller:

@RestController
@RequestMapping("/test")
@Slf4j
public class TestController {
   

    @Resource
    private CountDownService countDownService;
 
     /**
     * CountDownLatch实现异步多线程不同业务处理,不同service,异常情况回滚全部子线程
     *
     * @return
     */   
    @GetMapping("/countDown/handleDataBack")
    public String countDownHandleDataBack() {
   
        countDownService.handleDataBack();
        return "success";
    }

Sevice:

@Service
@Slf4j
public class CountDownService {
   


    @Resource
    private TestMapper testMapper;


    @Resource
    private ApplicationContext applicationContext;
    
    /**
     * 主线程,同时调用多个个子线程进行业务处理,当其中一个子线程出现异常,则全部子线程进行回滚
     */
    @Transactional(rollbackFor = Exception.class)
    public void handleDataBack() {
   
        AtomicBoolean errorTag = new AtomicBoolean(false);
        // 设置countDown大小,与异步执行的业务数量一样,比如2个
        CountDownLatch countDownLatch = new CountDownLatch(2);
        // 再创建一个CountDownLatch,大小固定为一,用于子线程相互等待,最后确定是否回滚
        CountDownLatch errorCountDown = new CountDownLatch(1);

        // 异步调用其他Service,执行业务处理
        CountDownService bean = applicationContext.getBean(CountDownService.class);
        bean.handleTestOne(countDownLatch, errorCountDown, errorTag);
        bean.handleTestTwo(countDownLatch, errorCountDown, errorTag);

        try {
   
            // 主线程阻塞
            countDownLatch.await();
            // 可以设置最大阻塞时间,防止线程一直挂起
            /*boolean await = countDownLatch.await(1, TimeUnit.SECONDS);
            if (!await) {
                // 超过时间子线程都还没有结束,直接都回滚
                errorTag.set(true);
            }*/
            log.info("继续执行主线程");

            // 继续执行后续的操作,比如insert、update等
            TestEntity entity = new TestEntity();
            entity.setId(new Random().nextInt(999999999));
            entity.setCount(1);
            entity.setCommodityCode("handleTestMain");
            entity.setMoney(new Random().nextInt(1000000));
            entity.setUserId("user-handleTestMain");
            testMapper.insert(entity);

        } catch (Exception e) {
   
            log.error("主线程业务执行异常");
            errorTag.set(true);
        } finally {
   
            // 主线程业务执行完成后,执行errorCountDown计时器减一,使得所有阻塞的子线程,继续执行进入到异常判断中
            errorCountDown.countDown();
        }

        // 如果出现异常
        if (errorTag.get()) {
   
            throw new RuntimeException("异步业务执行出现异常");
        }
        log.info("主线程执行完成");
    }

    /**
     * 子线程具体业务处理
     */
    @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
    @Async
    public void handleTestOne(CountDownLatch countDownLatch, CountDownLatch errorCountDown, AtomicBoolean errorTag) {
   
        log.info("开始执行handleTestOne线程");
        // 模拟业务耗时
        ThreadUtil.sleep(2000);
        try {
   
            // 执行数据库操作
            TestEntity entity = new TestEntity();
            entity.setId(new Random().nextInt(999999999));
            entity.setCount(1);
            entity.setCommodityCode("handleTestOne");
            entity.setMoney(new Random().nextInt(1000000));
            entity.setUserId("user-handleTestOne");
            testMapper.insert(entity);

            // 模拟出现异常
            int a = 1 / 0;

        } catch (Exception e) {
   
            errorTag.set(true);
        }
        // 子线程中,业务处理完成后,利用countDown的特性,计数器减一操作
        countDownLatch.countDown();

        // 子阻塞,直到其他子线程完成操作
        try {
   
            errorCountDown.await();
        } catch (Exception e) {
   
            errorTag.set(true);
        }
        log.info("handleTestOne-子线程执行完成");
        if (errorTag.get()) {
   
            // 抛出异常,回滚数据
            throw new RuntimeException("handleTestOne-子线程业务执行异常");
        }
    }

    /**
     * 子线程具体业务处理
     */
    @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
    @Async
    public void handleTestTwo(CountDownLatch countDownLatch, CountDownLatch errorCountDown, AtomicBoolean errorTag) {
   
        log.info("开始执行handleTestTwo线程");
        // 模拟业务耗时
        ThreadUtil.sleep(500);
        try {
   
            // 执行数据库操作
            TestEntity entity = new TestEntity();
            entity.setId(new Random().nextInt(999999999));
            entity.setCount(1);
            entity.setCommodityCode("handleTestTwo");
            entity.setMoney(new Random().nextInt(1000000));
            entity.setUserId("user-handleTestTwo");
            testMapper.insert(entity);

        } catch (Exception e) {
   
            errorTag.set(true);
        }
        // 子线程中,业务处理完成后,利用countDown的特性,计数器减一操作
        countDownLatch.countDown();

        // 子阻塞,直到其他子线程完成操作
        try {
   
            errorCountDown.await();
        } catch (Exception e) {
   
            errorTag.set(true);
        }
        log.info("handleTestTwo-子线程执行完成");
        if (errorTag.get()) {
   
            // 抛出异常,回滚数据
            throw new RuntimeException("handleTestTwo-子线程业务执行异常");
        }
    }
}

在这里插入图片描述

相关推荐

  1. Thinkphp+workman+redis实现线异步任务处理

    2023-12-17 16:02:02       39 阅读
  2. PyQt线处理业务事件

    2023-12-17 16:02:02       55 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2023-12-17 16:02:02       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2023-12-17 16:02:02       100 阅读
  3. 在Django里面运行非项目文件

    2023-12-17 16:02:02       82 阅读
  4. Python语言-面向对象

    2023-12-17 16:02:02       91 阅读

热门阅读

  1. C语言之枚举类型

    2023-12-17 16:02:02       65 阅读
  2. cisco packet tracer 路由器之间连线

    2023-12-17 16:02:02       53 阅读
  3. python单例模式

    2023-12-17 16:02:02       61 阅读
  4. 【面试】在Python中如何实现单例模式

    2023-12-17 16:02:02       48 阅读
  5. 多线程中的单例模式

    2023-12-17 16:02:02       69 阅读
  6. MySQL之锁

    2023-12-17 16:02:02       58 阅读
  7. AI创作的科幻小说[银河]03

    2023-12-17 16:02:02       57 阅读
  8. 物联网有哪些关键技术?

    2023-12-17 16:02:02       51 阅读
  9. 案例:xpath实例+功能

    2023-12-17 16:02:02       48 阅读
  10. Temporal 常见 F&Q 速查

    2023-12-17 16:02:02       64 阅读
  11. 使用Verdaccio搭建私有npm仓库

    2023-12-17 16:02:02       65 阅读