在这篇文章中,我们将学习如何使用Apache RocketMQ来实现消息的同步、异步、单向和延迟发送。通过一些实际的代码示例,帮助大家快速上手RocketMQ的基本功能。
目录
引言
Apache RocketMQ是一款分布式消息中间件,具备高吞吐量、低延迟和高可用性,广泛用于消息队列、事件驱动和流处理等场景。本文将通过实例代码介绍RocketMQ的基本使用方法。
准备工作
在开始之前,请确保已经安装并配置好了RocketMQ服务器,并且客户端能够连接到服务器。以下是一个常量类MqConstant
,包含了NAME_SRV_ADDR
变量,用于存储RocketMQ的NameServer地址。这里localhost请你的服务器或者虚拟机ip,记得在防火墙把端口打开。
package com.takumilove.constant;
public class MqConstant {
public static final String NAME_SRV_ADDR = "localhost:9876";
}
同步消息
同步消息指生产者发送消息后,会等待服务器返回发送结果。
示例代码
package com.takumilove.demo;
import com.takumilove.constant.MqConstant;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.junit.Test;
public class ASimpleTest {
@Test
public void simpleProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
for (int i = 0; i < 10; i++) {
Message message = new Message("testTopic", "同步消息".getBytes());
SendResult sendResult = producer.send(message);
System.out.println(sendResult.getSendStatus());
}
producer.shutdown();
}
}
说明
- 创建
DefaultMQProducer
实例,并指定生产者组名。 - 设置NameServer地址。
- 启动生产者。
- 创建并发送消息,打印发送结果。
- 关闭生产者。
异步消息
异步消息指生产者发送消息后,不等待服务器返回结果,而是通过回调函数处理返回结果。
示例代码
package com.takumilove.demo;
import com.takumilove.constant.MqConstant;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.junit.Test;
public class BASyncTest {
@Test
public void asyncProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("async-producer-group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
Message message = new Message("asyncTopic", "异步消息".getBytes());
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功");
}
@Override
public void onException(Throwable throwable) {
System.out.println("发送失败" + throwable.getMessage());
}
});
System.out.println("我先执行");
System.in.read();
}
}
说明
- 创建
DefaultMQProducer
实例,并指定生产者组名。 - 设置NameServer地址。
- 启动生产者。
- 创建并发送消息,使用
SendCallback
处理异步回调。 - 关闭生产者。
单向消息
单向消息指生产者只负责发送消息,不等待服务器返回结果,也没有回调函数。
示例代码
package com.takumilove.demo;
import com.takumilove.constant.MqConstant;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.junit.Test;
public class COneWayTest {
@Test
public void oneWayProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("oneway-producer-group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
Message message = new Message("onewayTopic", "单向消息".getBytes());
producer.sendOneway(message);
System.out.println("消息已发送");
producer.shutdown();
}
}
说明
- 创建
DefaultMQProducer
实例,并指定生产者组名。 - 设置NameServer地址。
- 启动生产者。
- 创建并发送单向消息。
- 关闭生产者。
延迟消息
延迟消息指消息发送后,并不会立即被消费,而是延迟一段时间后才被消费。
示例代码
package com.takumilove.demo;
import com.takumilove.constant.MqConstant;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.junit.Test;
import java.util.Date;
public class DMsTest {
@Test
public void msProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ms-producer-group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
Message message = new Message("orderMsTopic", "延迟消息".getBytes());
message.setDelayTimeLevel(3);
producer.send(message);
System.out.println("发送时间:" + new Date());
producer.shutdown();
}
}
说明
- 创建
DefaultMQProducer
实例,并指定生产者组名。 - 设置NameServer地址。
- 启动生产者。
- 创建消息并设置延迟时间,延迟级别从1到18,分别表示1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h。
- 发送消息并打印发送时间。
- 关闭生产者。
总结
通过上述实例代码,我们学习了如何使用RocketMQ实现同步、异步、单向和延迟消息的发送与消费。希望本文能帮助大家掌握RocketMQ的基本使用方法,并在实际项目中灵活运用。