1. 环境准备
maven(基于阿里云SDK)
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.8.5.Final</version>
</dependency>
2. 普通消息
2.1 同步阻塞消息
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(MqConfig.ACCESS_KEY, MqConfig.SECRET_KEY));
}
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(MqConfig.GROUP_ID, getAclRPCHook());
DefaultMQProducer producer = new DefaultMQProducer(MqConfig.GROUP_ID, getAclRPCHook(), true, null);
producer.setAccessChannel(AccessChannel.CLOUD);
producer.setNamesrvAddr(MqConfig.NAMESRV_ADDR);
producer.start();
// 普通消息
SendResult sendResult = producer.send(createMsg("student 放学了"));
producer.shutdown();
}
public static Message createMsg(String msgBody) throws UnsupportedEncodingException {
Message msg = new Message(MqConfig.TOPIC, MqConfig.TAG, msgBody.getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.setBody("hello mq".getBytes());
return msg;
}
SendResult sendResult = producer.send(createMsg("student 放学了"));
当前线程需要等待mq返回插入消息的结果
2.2 批量发送
2.3 非阻塞消息
// 普通消息,不会等待返回结果,可能出现失败的问题
producer.sendOneway(createMsg("普通消息-不会等待返回结果"));
问题:发送失败会不会丢失?
猜测:会
3. 顺序消息
3.1 分区有序
//方法 msg消息体 selector 选择queue的方法 arg:selector中需要的参数 orderId:就是select中的arg
public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(MqConfig.ORDER_GROUP_ID, getAclRPCHook(), true, null);
producer.setAccessChannel(AccessChannel.CLOUD);
producer.setNamesrvAddr(MqConfig.NAMESRV_ADDR);
producer.start();
int orderId=100;
SendResult sendResult = producer.send(createMsg("分区顺序消息"), new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
producer.shutdown();
}
public static Message createMsg(String msgBody) throws UnsupportedEncodingException {
Message msg = new Message(MqConfig.TOPIC, MqConfig.TAG, msgBody.getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.setBody("hello mq".getBytes());
return msg;
}
3.1.1 适用场景
- 用户订单,根据订单id的维度做分区
- 用户组织架构变更:saas环境按照企业id做分区
3.2 全局顺序
将所有的数据放入同一个分区(一般分区有序就够使用了)
问题:无法并行的消费;broker故障转移影响的范围大
4. 事务消息
发送半消息并确认
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(MqConfig.ACCESS_KEY, MqConfig.SECRET_KEY));
}
public static void main(String[] args) throws Exception {
/**
* 创建事务消息Producer
*/
TransactionMQProducer transactionMQProducer = new TransactionMQProducer(MqConfig.GROUP_ID, getAclRPCHook());
transactionMQProducer.setNamesrvAddr(MqConfig.NAMESRV_ADDR);
transactionMQProducer.setTransactionCheckListener(new LocalTransactionCheckerImpl());
transactionMQProducer.setAccessChannel(AccessChannel.CLOUD);
transactionMQProducer.start();
// 需要保存sendResult中的msgId,因为二次提交消息失败后,mq会通过回调方式查询结果
SendResult sendResult = transactionMQProducer.sendMessageInTransaction(createMsg("事务消息"), new LocalTransactionExecuter() {
@Override
public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
System.out.println("开始执行本地事务: " + msg);
return LocalTransactionState.COMMIT_MESSAGE;
}
}, null);
}
public static Message createMsg(String msgBody) throws UnsupportedEncodingException {
Message msg = new Message(MqConfig.TOPIC, MqConfig.TAG, msgBody.getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.setBody("hello mq".getBytes());
return msg;
}
mq回调反查本地事务结果
public class LocalTransactionCheckerImpl implements TransactionCheckListener {
@Override public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
System.out.println("收到事务消息的回查请求, MsgId: " + msg.getMsgId());
return LocalTransactionState.COMMIT_MESSAGE;
}
}
4.1 解决问题
实现与本地事务一致性的处理(如解决问题:本地事务回滚/未保存,但是向MQ提交的消息可能已经倍消费端消费;如果本地事务先提交,然后再发送到MQ,会出现丢失问题)
4.2 实现原理
mq首先将消息放入半消息队列中,然后等待Producer再次确认是否提交。mq一直没有收到确认结果,会回调反查
4.3 替代方案-事件表方案
db创建一张"事件表",将向mq发送的事务消息,insert到mysql。
开一个线程一直读取该表,然后向MQ同步写
定时清理已处理完成的事件,避免事件表过大导致查询慢
优点:简单、性能尚可
5. 延迟消息
6. 发送消息重试机制
原理:
建议:使用try-catch包裹发送的消息。 理由:如果mq客户端有bug,导致重试失败,我们自己的业务逻辑可以重试,重试失败,可以暂存DB
ps:曾经有一种异常,该异常的返回值code没有被重试机制捕捉到,导致重试机制失败
待续......