消息队列中间件,RabbitMQ的使用,利用枚举实现队列,交换机,RountKey的声明

目录

1.声明队列和交换机以及RountKey

2.初始化循环绑定

3.监听队列


1.声明队列和交换机以及RountKey

package com.hykj.dpp.dcn.configure;

import lombok.Getter;


@Getter
public enum RabbitmqBind {

    /**
     * 接收信审推送清洗数据
     */
	DATA_CLEAN_PROCESS(
            RabbitMqExchangeEnum.E_DIRECT_RCP,
            RabbitMqQueueConstants.Q_DATA_CLEAN_PROCESS,
            RabbitmqRoutingKey.K_DATA_CLEAN_PROCESS,
            true),

    SMS_CLEAN(
            RabbitMqExchangeEnum.E_DIRECT_RCP,
            RabbitMqQueueConstants.Q_API_TO_DCN_SMS,
            RabbitmqRoutingKey.K_API_TO_DCN_SMS,
            true),
    LOAN_ACCOUNT_MODIFY(
            RabbitMqExchangeEnum.E_TOPIC_PAY,
            RabbitMqQueueConstants.Q_MODIFY_ACCOUNT,
            RabbitmqRoutingKey.K_MODIFY_ACCOUNT,
            true)

    ;

	/**
	 * 交换机
	 */
    private RabbitMqExchangeEnum exchange;

    /**
     * 队列名称
     */
    private String queueName;

    /**
     * 路由Key
     */
    private RabbitmqRoutingKey routingKey;

    /**
     * 绑定标识
     */
    private Boolean isBind;

    RabbitmqBind(RabbitMqExchangeEnum exchange, String queueName, RabbitmqRoutingKey routingKey, Boolean isBind) {
        this.exchange = exchange;
        this.queueName = queueName;
        this.routingKey = routingKey;
        this.isBind = isBind;
    }

    /**
     * 交换机
     */
    @Getter
    public enum RabbitMqExchangeEnum {

        /**
         * 交换机定义,类型 - 名称
         */
        E_DIRECT_RCP("direct","E_DIRECT_RCP"),

        E_TOPIC_RCP("topic","E_TOPIC_RCP"),

        E_TOPIC_PAY("topic","E_TOPIC_PAY")
        ;

        private String exchangeType;

        private String exchangeName ;

        RabbitMqExchangeEnum(String exchangeType, String exchangeName) {
            this.exchangeType = exchangeType;
            this.exchangeName = exchangeName;
        }
    }

    /**
     * 队列名定义
     */
    public interface RabbitMqQueueConstants {

    	/**
    	 * 接收清洗数据
    	 */
        String Q_DATA_CLEAN_PROCESS = "RMPS_TO_RCP_DATA_CLEAN_PROCESS";
        
        /**
         * 清洗结束通知
         */
        String Q_DATA_CLEAN_FINISH = "RMPS_TO_RCP_DATA_CLEAN_FINISH";

        String Q_API_TO_DCN_SMS = "Q_API_TO_DCN_SMS";

        String Q_MODIFY_ACCOUNT = "PAY_TO_DPP_MODIFY_ACCOUNT";
    }

    /**
     * routingKey
     */
    @Getter
    public enum RabbitmqRoutingKey {

        /**
         * 路由
         */
        K_DATA_CLEAN_PROCESS("K_DATA_CLEAN_PROCESS"),
        K_DATA_CLEAN_FINISH("K_DATA_CLEAN_FINISH"),
        K_API_TO_DCN_SMS("K_API_TO_DCN_SMS"),
        K_MODIFY_ACCOUNT("#.K_MODIFY_ACCOUNT");

        private String keyName;

        RabbitmqRoutingKey(String keyName) {
            this.keyName = keyName;
        }
    }

}

2.初始化循环绑定

package com.hykj.dpp.dcn.configure;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import java.util.Arrays;

@Configuration
@ConditionalOnClass(EnableRabbit.class)
public class RabbitConfiguration {

    @Autowired
    protected RabbitTemplate rabbitTemplate;
    @Autowired
    protected RabbitAdmin rabbitAdmin;

    public static final int DEFAULT_CONCURRENT = 10;
    @Bean("customContainerFactory")
    public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
                                                                 ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConcurrentConsumers(DEFAULT_CONCURRENT);
        factory.setMaxConcurrentConsumers(DEFAULT_CONCURRENT);
        configurer.configure(factory, connectionFactory);
        return factory;
    }

    @Bean
    @ConditionalOnMissingBean
    public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
        return new RabbitTransactionManager(connectionFactory);
    }

    @Bean
    @ConditionalOnMissingBean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    @PostConstruct
    protected void init() {
        rabbitTemplate.setChannelTransacted(true);
        //创建exchange
        Arrays.stream(RabbitmqBind.RabbitMqExchangeEnum.values())
                .forEach(rabbitMqExchangeEnum -> {
                        Exchange exchange = RabbitmqExchange
                                .getInstanceByType(rabbitMqExchangeEnum.getExchangeType())
                                .createExchange(rabbitMqExchangeEnum.getExchangeName());
                        rabbitAdmin.declareExchange(exchange);
                }

        );

        //创建队列并绑定exchange
        Arrays.stream(RabbitmqBind.values()).forEach(RabbitmqBind -> {
            if(RabbitmqBind.getIsBind()){
                rabbitAdmin.declareQueue(new Queue(RabbitmqBind.getQueueName(),
                        true, false, false, null));
                rabbitAdmin.declareBinding(new Binding(RabbitmqBind.getQueueName(),
                        Binding.DestinationType.QUEUE,
                        RabbitmqBind.getExchange().getExchangeName(), RabbitmqBind.getRoutingKey().getKeyName(), null));
            }
        });
    }
}

 绑定的形式由枚举类中定义

3.监听队列

package com.hykj.dpp.dcn.mq.listener;

import com.alibaba.fastjson.JSON;
import com.hykj.dpp.common.constant.*;
import com.hykj.dpp.common.model.RiskProcessLog;
import com.hykj.dpp.common.service.RiskProcessLogService;
import com.hykj.dpp.dcn.configure.RabbitmqBind;
import com.hykj.dpp.dcn.mq.message.DataCleanMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * 数据清洗监听
 */
@Slf4j
@Component
@RabbitListener(queues = {
		RabbitmqBind.RabbitMqQueueConstants.Q_DATA_CLEAN_PROCESS }, concurrency = "1-5", containerFactory = "customContainerFactory")
public class DataCleanListener {
	
	@Resource
	private RiskProcessLogService riskProcessLogService;
	
	
	@RabbitHandler
	public void processMessage(String message) {
		log.info("DataClean recive message :{} ", message);
		process(message);
	}
	
	@RabbitHandler
	public void processMessage(byte[] message) {
		String msg = new String(message);
		log.info("DataClean recive message :{} ", msg);
		process(msg);
	}
	
	/**
	 * 处理推送消息
	 * @param message
	 */
	private void process(String message) {
		if(StringUtils.isBlank(message)) {
			log.error("process message is blank , message:{}" , message);
			return;
		}
		try {
			DataCleanMessage dataCleanMessage = JSON.parseObject(message, DataCleanMessage.class);
			if(StringUtils.isAnyBlank(dataCleanMessage.getBusinessId(),dataCleanMessage.getCountry())) {
				log.error("parse message is blank");
				return;
			}
			// 插入下个节点数据
			RiskProcessLog processLog = new RiskProcessLog();
			processLog.setBusinessId(dataCleanMessage.getBusinessId());
			processLog.setCountry(Country.getCode(dataCleanMessage.getCountry()));
			processLog.setOrg(dataCleanMessage.getOrg());
			processLog.setProfileType(ProfileType.prod.getCode());
			processLog.setSysType(SysType.RMPS.getCode());
            processLog.setServiceType(ServiceType.APPROVE.getCode());
			processLog.setNodeId(NodeDef.DATA_CLEAN);
			processLog.setPostInd(PostInd.PENDING.name());
			riskProcessLogService.save(processLog);
		} catch (Exception e) {
			log.error("process message appear exception: {}" , message, e);
		}
	}

}

 监听并处理任务

 

相关推荐

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-04-10 05:48:03       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-04-10 05:48:03       100 阅读
  3. 在Django里面运行非项目文件

    2024-04-10 05:48:03       82 阅读
  4. Python语言-面向对象

    2024-04-10 05:48:03       91 阅读

热门阅读

  1. 题目:学习使用按位异或 ^

    2024-04-10 05:48:03       39 阅读
  2. git 提交一个pr

    2024-04-10 05:48:03       39 阅读
  3. epoll的使用示例及其解释

    2024-04-10 05:48:03       35 阅读
  4. tsconfig.json文件翻译

    2024-04-10 05:48:03       29 阅读
  5. 异步和同步

    2024-04-10 05:48:03       30 阅读
  6. 解析View树、apk安装

    2024-04-10 05:48:03       27 阅读
  7. Qt中的触屏事件

    2024-04-10 05:48:03       37 阅读
  8. jenkins部署项目报错:certificate has expired

    2024-04-10 05:48:03       36 阅读
  9. Git同时拉取和推送多个分支

    2024-04-10 05:48:03       38 阅读
  10. 10个全面了解python自动化办公代码

    2024-04-10 05:48:03       35 阅读
  11. [蓝桥杯 2013 省 B] 翻硬币

    2024-04-10 05:48:03       28 阅读
  12. 富格林:堤防交易陷阱安全做单交易

    2024-04-10 05:48:03       34 阅读
  13. 边缘设备上的chatGPT

    2024-04-10 05:48:03       34 阅读
  14. Redis 常用命令以及结构

    2024-04-10 05:48:03       33 阅读
  15. Android Binder——Kernel层介绍(七)

    2024-04-10 05:48:03       30 阅读