spring boot项目对接阿里云的RocketMq5

要在Spring Boot项目中实现一个通用的消息消费服务,可以将前面的概念整合并利用Spring的依赖注入特性来创建一个更灵活、可配置的服务。下面是如何创建这样的服务,包括通过application.properties来配置连接信息,以及使用@Service注解定义消费服务。

步骤 1: 配置application.properties

首先,在application.properties文件中添加RocketMQ的配置信息。

# RocketMQ 配置
rocketmq.endpoint=你的RocketMQ接入点
rocketmq.username=你的RocketMQ用户名
rocketmq.password=你的RocketMQ密码

步骤 2: 创建RocketMQConsumerService

接下来,定义RocketMQConsumerService服务类。这个类将读取application.properties中的配置,并提供一个方法来启动消息消费者。

package com.aliyun.openservices;

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.Collections;

@Service
public class RocketMQConsumerService {

    @Value("${rocketmq.endpoint}")
    private String endpoint;
    @Value("${rocketmq.username}")
    private String username;
    @Value("${rocketmq.password}")
    private String password;

    public void startConsumer(String topicName, String filterExpression, String consumerGroupId) throws ClientException {
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfiguration configuration = ClientConfiguration.newBuilder()
            .setEndpoints(endpoint)
            .setCredentialProvider(new StaticSessionCredentialsProvider(username, password))
            .build();

        FilterExpression expression = new FilterExpression(filterExpression, FilterExpressionType.TAG);

        PushConsumer consumer = provider.newPushConsumerBuilder()
            .setClientConfiguration(configuration)
            .setConsumerGroup(consumerGroupId)
            .setSubscriptionExpressions(Collections.singletonMap(topicName, expression))
            .setMessageListener(messageView -> {
                // 实现你的消息处理逻辑
                System.out.println("Received message: " + messageView.toString());
                return ConsumeResult.SUCCESS;
            })
            .build();
        
        // 注意: 实际应用中你可能需要更优雅的方式来启动和关闭Consumer
    }
}

步骤 3: 使用服务

最后,你可以在Spring Boot应用的任何地方注入并使用RocketMQConsumerService服务。例如,在一个配置类或启动监听器中启动消费者:

package com.aliyun.openservices;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class RocketMQConsumerRunner implements CommandLineRunner {

    @Autowired
    private RocketMQConsumerService rocketMQConsumerService;

    @Override
    public void run(String... args) throws Exception {
        // 启动消费者
        rocketMQConsumerService.startConsumer("topicName", "*", "consumerGroupId");
    }
}

这个CommandLineRunner实现确保了当Spring Boot应用启动时,会自动启动消息消费服务。你需要根据实际情况替换topicNamefilterExpression(这里用*表示接收所有消息)和consumerGroupId的值。

通过这种方式,你可以轻松地在Spring Boot应用中集成和使用RocketMQ的消费服务,同时保持高度的灵活性和配置能力。

相关推荐

  1. spring boot项目对接阿里RocketMq5

    2024-03-27 07:46:04       43 阅读
  2. 对比阿里SofaMQ与RocketMQ

    2024-03-27 07:46:04       48 阅读
  3. 阿里RocketMQ消费MQTT消息

    2024-03-27 07:46:04       33 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-03-27 07:46:04       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-03-27 07:46:04       100 阅读
  3. 在Django里面运行非项目文件

    2024-03-27 07:46:04       82 阅读
  4. Python语言-面向对象

    2024-03-27 07:46:04       91 阅读

热门阅读

  1. tomcat和web服务器是什么??

    2024-03-27 07:46:04       43 阅读
  2. 数据结构-----顺序队列和链式队列的应用

    2024-03-27 07:46:04       42 阅读
  3. 使用docker的好处???(docker的优势)

    2024-03-27 07:46:04       42 阅读
  4. MongoDB面试专题

    2024-03-27 07:46:04       35 阅读