stream流中的坑,peek/map/filter

起因

所在系统为一个对账系统,涉及的业务为发布账单,数据结构定的是供应商账单发布,生成企业账单和个人账单。发布账单处理完本系统业务后,需要生成站内通知和调用外部接口生成短信通知。后来增加需求,需要在发布完成后调用外部接口向总平台发送对应人员的待办。看着是不是还行,逻辑上没什么问题,磨人的地方就在stream流的各种处理时。
先简单介绍一下原代码,原代码中在处理最外层增加了事务保障,在处理完本系统的逻辑业务之后,使用publishEvent转而去发站内通知和短信通知。所以我是直接在publishEvent中去发待办。

事件的整个流程

在这里插入图片描述

处理过程中遇到的问题以及代码版本

版本一(此时未增加50个限制)

版本一代码
if (event instanceof BillPublishedEvent) {
            try {
                log.info("监听到账单发布事件,需要根据个人账单生成账单生成通知");
                BillSupplier billSupplier = ((BillPublishedEvent) event).getBillSupplier();

                List<BillPerson> billPersonList = billPersonService.list(new LambdaQueryWrapper<BillPerson>().eq(BillPerson::getBillSupplierGuid, billSupplier.getGuid()));
                if(billPersonList.size()>0){
                    List<Integer> personIdList = billPersonList.stream().map(BillPerson::getPersonId).collect(Collectors.toList());
                    List<String> personUserNameList = userMapper.selectList(new LambdaQueryWrapper<User>().in(User::getUserId, personIdList)).stream().map(User::getUserName).collect(Collectors.toList());
                    String modelId = CommUtils.makeAutoGUID();
                    Integer toDoResult = toDoMessageAppService.sendToDoMessage(modelId);
                    if(toDoResult.equals("200")){
                        boolean update = billPersonService.lambdaUpdate()
                                .set(BillPerson::getPublishToDo, modelId.concat("_10"))
                                .in(BillPerson::getGuid, billPersonList.stream().map(BillPerson::getGuid).collect(Collectors.toList()))
                                .update();
                        if(update){
                            log.info("{}的个人账单发布账单待办通知修改完成");
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
版本一遇到的问题(事务中的增删改查只会在整个处理完成后执行)

问题所在代码为:

问题是:别看执行完成了,返回的update为true,但其实根本就没有更新。原因是整个处理过程是在事务中。
经过这次我理解的事务中的任何修改、生成、删除等操作都只会在整个处理过程完成后执行,其余时间都是保存在某个地方。而下面这个更新实际是从数据库中获取再更新,那当然不可能更新成功。
当时发现是因为billPerson数据是需要insert进去的,但执行到这步,查看数据库时根本没数据。
boolean update = billPersonService.lambdaUpdate()
                                .set(BillPerson::getPublishToDo, modelId.concat("_10"))
                                .in(BillPerson::getGuid, billPersonList.stream().map(BillPerson::getGuid).collect(Collectors.toList()))
                                .update();
后修改为
if (event instanceof BillPublishedEvent) {
            try {
                log.info("监听到账单发布事件,需要根据个人账单生成账单生成通知");
                BillSupplier billSupplier = ((BillPublishedEvent) event).getBillSupplier();
                List<BillPerson> billPersonList = billPersonService.list(new LambdaQueryWrapper<BillPerson>().eq(BillPerson::getBillSupplierGuid, billSupplier.getGuid()));
                if (billPersonList.size() > 0) {
                    List<Integer> personIdList = billPersonList.stream().map(BillPerson::getPersonId).collect(Collectors.toList());
                    List<String> personUserNameList = userMapper.selectList(new LambdaQueryWrapper<User>().in(User::getUserId, personIdList)).stream().map(User::getUserName).collect(Collectors.toList());
                    String modelId = CommUtils.makeAutoGUID();
                    try {
                        Integer toDoResult = toDoMessageAppService.sendToDoMessage(personUserNameList,modelId);
                        if (toDoResult.equals("200")) {
                            String modelIdNew = modelId.concat("_10");
                            billPersonList = billPersonList.stream().peek(billPerson -> billPerson.setPublishToDo(modelIdNew)).collect(Collectors.toList());
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

版本二 (增加50个限制)

版本二代码
if (event instanceof BillPublishedEvent) {
            try {
                log.info("监听到账单发布事件,需要根据个人账单生成账单生成通知");
                BillSupplier billSupplier = ((BillPublishedEvent) event).getBillSupplier();
                List<BillPerson> billPersonList = billPersonService.list(new LambdaQueryWrapper<BillPerson>().eq(BillPerson::getBillSupplierGuid, billSupplier.getGuid()));
                if (billPersonList.size() > 0) {
                    List<Integer> personIdList = billPersonList.stream().map(BillPerson::getPersonId).collect(Collectors.toList());
                    List<String> personUserNameList = userMapper.selectList(new LambdaQueryWrapper<User>().in(User::getUserId, personIdList)).stream().map(User::getUserName).collect(Collectors.toList());
                    String modelId = CommUtils.makeAutoGUID();
                    while(!personUserNameList.isEmpty()){
                        // 此处截取50个手机号已发送待办逻辑是:
                        // 先取出前50个手机号发送待办,然后从原list中去除这50个手机号。
                        List<String> currentHandleList = personUserNameList.subList(0, Math.min(50, personUserNameList.size()));
                        try {
                            Integer toDoResult = toDoMessageAppService.sendToDoMessage(currentHandleList,modelId);
                            personUserNameList = personUserNameList.subList(50,personUserNameList.size());
                            if (toDoResult.equals("200")) {
                                String modelIdNew = modelId.concat("_10");
                                billPersonList = billPersonList.stream().peek(billPerson -> billPerson.setPublishToDo(modelIdNew)).collect(Collectors.toList());
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                            continue;
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
版本二遇到的问题(循环中不可修改list)

问题所在代码为:

// 会出现 ConcurrentModificationException 异常,因为做了个蠢事,不可以在循环中删除List,因为循环本身是根据index定位的
List<String> currentHandleList = personUserNameList.subList(0, Math.min(50, personUserNameList.size()));
personUserNameList = personUserNameList.subList(50,personUserNameList.size());                
后修改为
                    int paramSize = 50;
                    for (int i = 0; i < personUserNameList.size(); i += paramSize) {
                        List<BillPerson> currentHandlePersonList = new ArrayList<>();
                        int endIndex = Math.min(i + paramSize, personUserNameList.size());
                        List<String> subList = personUserNameList.subList(i, endIndex);
                        // 后面跟处理
                    }

版本三 (真正的stream流坑)

版本三代码
if (event instanceof BillPublishedEvent) {
            try {
                log.info("监听到账单发布事件,需要根据个人账单生成账单生成通知");
                BillSupplier billSupplier = ((BillPublishedEvent) event).getBillSupplier();

                // 此处是为了向商网办公发送待办
                List<BillPerson> billPersonList = billPersonService.list(new LambdaQueryWrapper<BillPerson>().eq(BillPerson::getBillSupplierGuid, billSupplier.getGuid()));
                if (billPersonList.size() > 0) {
                    Map<Integer, String> map = new HashMap<>();  // key为personId或者说是userId,value为userName
                    List<Integer> personIdList = billPersonList.stream().map(BillPerson::getPersonId).collect(Collectors.toList());
                    // 因为billPerson表只有personId,user表才有userId和user手机号,这里将personId和手机号作为key-value写入map中了,以便50个手机号执行完后更新对应的个人账单
                    List<String> personUserNameList = userMapper.selectList(new LambdaQueryWrapper<User>().in(User::getUserId, personIdList)).stream().map(user -> {
                        if (!map.containsKey(user.getUserId())) {
                            map.put(user.getUserId(), user.getUserName());
                        }
                        return user.getUserName();
                    }).collect(Collectors.toList());
                    String modelId = CommUtils.makeAutoGUID();
                    int paramSize = 50;

                    for (int i = 0; i < personUserNameList.size(); i += paramSize) {
                        int endIndex = Math.min(i + paramSize, personUserNameList.size());
                        List<String> subList = personUserNameList.subList(i, endIndex);
                        log.info("i为{},endIndex为{},subList为{}", i, endIndex, subList);
                        try {
                            Integer toDoResult = toDoMessageAppService.sendToDoMessage(subList, modelId);
                            // 这里要求休息一会,不然会被检测为爆破
                            Thread.sleep(3000);
                            
                            if (toDoResult.equals("200")) {
                                List<String> subListCopy = subList;
                                List<Integer> personidListHandled = map.entrySet().stream().filter(entry -> subListCopy.contains(entry.getValue())).map(Map.Entry::getKey).collect(Collectors.toList());
                                log.info("这次处理的用户对应的personId为{}", personidListHandled);
                                String modelIdNew = modelId.concat("_10");
                                // 找到全量billPersonList中这次处理的个人账单list,并修改某个值。
                                billPersonList = billPersonList.stream().filter(e -> personidListHandled.contains(e.getPersonId())).peek(billPerson -> billPerson.setPublishToDo(modelIdNew)).collect(Collectors.toList());
                                
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                            continue;
                        }
                    }
                } else {
                    // log.info("容器本身事件:" + event);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
版本三问题

问题所在代码为

此时,版本一中的问题重现了,因为个人账单中的那个字段根本就没有修改。
我可以理解filter筛选出来了,但接收时是使用的原list,按说在第一次循环有50条数据,但之后的循环就不会有数据能被筛选出来了
但是就不,偏偏数据都在,但是那个字段根本就没更新。
billPersonList = billPersonList.stream().filter(e -> personidListHandled.contains(e.getPersonId())).peek(billPerson -> billPerson.setPublishToDo(modelIdNew)).collect(Collectors.toList());
最终版
// 这里要用新list接收一下
currentHandlePersonList = billPersonList.stream().filter(e -> personidListHandled.contains(e.getPersonId())).peek(billPerson -> billPerson.setPublishToDo(modelIdNew)).collect(Collectors.toList());

List<BillPerson> targetPersonList = currentHandlePersonList;   // 这里需要再赋值一下,否则在下面在stream().filter时会报错
// 此处使用replaceAll替换,只要两者的某个条件相同,就用List2中的数据替换list1中的数据
billPersonList.replaceAll(e -> {
	List<BillPerson> currentHandlePersonListOne = 
	targetPersonList.stream().filter(tar -> tar.getGuid().equals(e.getGuid())).collect(Collectors.toList());
	if(currentHandlePersonListOne.size() >0){
      	return currentHandlePersonListOne.get(0);
	}
  	return e;
});

这次还学到

map中根据value值获取到对应的key值们

List<Integer> personidListHandled = map.entrySet().stream().filter(entry -> subListCopy.contains(entry.getValue())).map(Map.Entry::getKey).collect(Collectors.toList());

相关推荐

  1. Stream方法详解

    2024-03-30 18:42:03       14 阅读
  2. StreamanyMatch和allMatch和noneMatch区别

    2024-03-30 18:42:03       38 阅读
  3. stream常见使用

    2024-03-30 18:42:03       7 阅读
  4. <span style='color:red;'>Stream</span><span style='color:red;'>流</span>

    Stream

    2024-03-30 18:42:03      31 阅读
  5. Stream

    2024-03-30 18:42:03       40 阅读
  6. <span style='color:red;'>Stream</span><span style='color:red;'>流</span>

    Stream

    2024-03-30 18:42:03      34 阅读
  7. <span style='color:red;'>Stream</span><span style='color:red;'>流</span>

    Stream

    2024-03-30 18:42:03      22 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-03-30 18:42:03       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-03-30 18:42:03       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-03-30 18:42:03       19 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-03-30 18:42:03       20 阅读

热门阅读

  1. redis学习-缓存穿透、缓存击穿、缓存雪崩

    2024-03-30 18:42:03       19 阅读
  2. Linux安装MySQL8.0

    2024-03-30 18:42:03       20 阅读
  3. 弹幕游戏开发-积分与连胜

    2024-03-30 18:42:03       17 阅读
  4. Redis的数据类型String使用场景实战

    2024-03-30 18:42:03       16 阅读
  5. ansible-2

    ansible-2

    2024-03-30 18:42:03      14 阅读
  6. Libpcap 学习笔记

    2024-03-30 18:42:03       19 阅读
  7. 飞机大作战(c语言)

    2024-03-30 18:42:03       18 阅读
  8. Leetcode 680. 验证回文串 II

    2024-03-30 18:42:03       16 阅读
  9. C++入门——基础语法(一)

    2024-03-30 18:42:03       12 阅读
  10. css的各种样式

    2024-03-30 18:42:03       14 阅读
  11. Qt常用容器之:QMap

    2024-03-30 18:42:03       17 阅读
  12. Android 8.1 预置应用开机自启动

    2024-03-30 18:42:03       15 阅读