RocketMQ消费者依赖的业务Bean还未初始化完成


1.MyFactory的功能是将实现了 InterfaceApiPush 接口的Bean都放在一个静态的HashMap中.
2.MyMQConsumerListener作为一个MQ消费者,它会使用MyFactory的静态HashMap.

样例代码如下

// 工厂

import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.crypto.digest.DigestUtil;
import cn.hutool.json.JSONConfig;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.shade.org.apache.commons.lang3.RandomStringUtils;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.reflect.FieldUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanExpressionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;

import com.infuq.InterfaceApiPush;


@Slf4j
@Component
public class MyFactory implements ApplicationContextAware {

    private static Map<String, InterfaceApiPush> pushHashMap = new HashMap<>();

    public InterfaceApiPush getInterfaceApiPush(String key) {
        return pushHashMap.get(key);
    }

    /**
     * 从上下文中获取推送消息的服务
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

        // 加载实现了InterfaceApiPush接口的Bean
        Map<String, InterfaceApiPush> map = applicationContext.getBeansOfType(InterfaceApiPush.class);

        for (InterfaceApiPush pushService : map.values()) {
            if (ObjectUtil.isNotEmpty(pushService.getApiPushType())) {
                String pushKey = ...;            
                pushHashMap.put(pushKey, pushService);
            }
        }

        log.info("推送服务注册信息:{}", JSONObject.toJSONString(pushHashMap));
    }
}


MQ消费者

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Service;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.List;

import com.infuq.MyFactory;

@Slf4j
@Service
public class MyMQConsumerListener implements MessageListenerConcurrently {
	
	@Autowired
	private MyFactory myFactory;

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
		String key = pay_success_key;
		// 得到的pushService可能为空
		InterfaceApiPush pushService = myFactory.getInterfaceApiPush(key);

        for (MessageExt x : msgs) {
            System.out.println("[MQ消费者]:" + Thread.currentThread().getName() + "消费消息. msgId=" + x.getMsgId());
        }

        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

    }

}

我们假设, 在程序代码中的确是有一个实现了InterfaceApiPush接口的Bean,它的key = pay_success_key. 然而在应用服务启动过程中, 从myFactory中可能得到的pushService是空.

因为在Spring容器启动过程中, org.springframework.context.support.AbstractApplicationContext#refresh 方法被执行, 它会创建所有的Bean实例. 然而当MQ消费者的Bean创建完成之后, MQ消费者就可以对外提供服务了,它就可以消费消息了, 但是这个时候, Spring容器都还没有把所有的Bean创建完成. 比如上述代码中, MyMQConsumerListener已经创建完成, 但是 MyFactory 还未初始化, 仅仅是实例化, 那么它的静态pushHashMap属性里的内容就是空. MQ消费就该失败, 需要重试消息.

也就是说Spring容器是否已经将所有业务Bean创建完成与MQ是否可以提供消费服务没有强依赖关系. 就好像,难道一个系统没有Tomcat就无法对外提供服务了吗? 它只是无法提供HTTP服务,但它引入Dubbo中间件就可以提供TCP协议的服务了.

因此一个业务系统,如果引入了MQ中间件,既要做到消息重试, 也要做到必要的幂等性.

相关推荐

  1. RocketMQ消费者依赖业务Bean初始化完成

    2024-07-16 05:28:03       23 阅读
  2. RocketMQ~生产者与消费者消费模式(pull or push)

    2024-07-16 05:28:03       20 阅读
  3. spring bean继承和依赖

    2024-07-16 05:28:03       32 阅读
  4. RocketMQ两种消费模式

    2024-07-16 05:28:03       34 阅读
  5. 使用 RocketMQ 实现消息顺序消费

    2024-07-16 05:28:03       26 阅读
  6. 26、Spring是如何解决Bean循环依赖

    2024-07-16 05:28:03       53 阅读
  7. SpringBean标签配置IOC和依赖注入详解

    2024-07-16 05:28:03       34 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-16 05:28:03       70 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-16 05:28:03       74 阅读
  3. 在Django里面运行非项目文件

    2024-07-16 05:28:03       62 阅读
  4. Python语言-面向对象

    2024-07-16 05:28:03       72 阅读

热门阅读

  1. 大数据如何推动工业数字化发展

    2024-07-16 05:28:03       23 阅读
  2. 【AI绘画教程】什么是Huggingface

    2024-07-16 05:28:03       27 阅读
  3. python开发面试-20240715

    2024-07-16 05:28:03       27 阅读
  4. 喜欢dp动态规划的第二天(暑假提升)

    2024-07-16 05:28:03       25 阅读
  5. 习题1 回文数 python、C++ 不同解法

    2024-07-16 05:28:03       30 阅读