1. 在pom.xml中添加rocketmq依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
注意:rocketmq的版本需要与java版本对应,rocketmq starter包含了
rocketmq-client
2. 配置yaml文件
server:
port: 8081
rocketmq:
consumer:
group: defaultGroup
name-server: 127.0.0.1:9876
producer:
group: defaultGroup
3. 生产者代码代码
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "defaultGroup")
public class ConsumerService implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("Received message: " + s);
}
}
4. 消费者代码
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; @Service @RocketMQMessageListener(topic = "test-topic", consumerGroup = "defaultGroup") public class ConsumerService implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("Received message: " + s); } }
5. 测试案例
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class MQ_Test implements CommandLineRunner {
private final ProducerService producer;
public MQ_Test(ProducerService producerService) {
this.producer = producerService;
}
public static void main(String[] args) {
SpringApplication.run(MQ_Test.class, args);
}
@Override
public void run(String... args) throws Exception {
producer.sendMessage("test-topic", "Hello RocketMQ!");
}
}
结果如下: