rocketMQ-发送消息

1. 环境准备

maven(基于阿里云SDK)

        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-client</artifactId>
            <version>1.8.8.5.Final</version>
        </dependency>

2. 普通消息

2.1 同步阻塞消息

    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(MqConfig.ACCESS_KEY, MqConfig.SECRET_KEY));
    }
    public static void main(String[] args) throws Exception {

        DefaultMQProducer producer = new DefaultMQProducer(MqConfig.GROUP_ID, getAclRPCHook());
        
        DefaultMQProducer producer = new DefaultMQProducer(MqConfig.GROUP_ID, getAclRPCHook(), true, null);
        producer.setAccessChannel(AccessChannel.CLOUD);
        producer.setNamesrvAddr(MqConfig.NAMESRV_ADDR);
        producer.start();

        // 普通消息
        SendResult sendResult = producer.send(createMsg("student 放学了"));
        producer.shutdown();
    }

    public static Message createMsg(String msgBody) throws UnsupportedEncodingException {
        Message msg = new Message(MqConfig.TOPIC, MqConfig.TAG, msgBody.getBytes(RemotingHelper.DEFAULT_CHARSET));
        msg.setBody("hello mq".getBytes());
        return msg;
    }

SendResult sendResult = producer.send(createMsg("student 放学了"));

当前线程需要等待mq返回插入消息的结果

2.2 批量发送

2.3 非阻塞消息

        // 普通消息,不会等待返回结果,可能出现失败的问题
        producer.sendOneway(createMsg("普通消息-不会等待返回结果"));

  问题:发送失败会不会丢失?

             猜测:会

3. 顺序消息

3.1 分区有序

 //方法  msg消息体   selector 选择queue的方法   arg:selector中需要的参数 orderId:就是select中的arg

public SendResult send(Message msg, MessageQueueSelector selector, Object arg)


    public static void main(String[] args) throws Exception {

        DefaultMQProducer producer = new DefaultMQProducer(MqConfig.ORDER_GROUP_ID, getAclRPCHook(), true, null);
        producer.setAccessChannel(AccessChannel.CLOUD);
        producer.setNamesrvAddr(MqConfig.NAMESRV_ADDR);
        producer.start();

        int orderId=100;
        SendResult sendResult = producer.send(createMsg("分区顺序消息"), new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                Integer id = (Integer) arg;
                int index = id % mqs.size();
                return mqs.get(index);
            }
        }, orderId);

        producer.shutdown();
    }

    public static Message createMsg(String msgBody) throws UnsupportedEncodingException {
        Message msg = new Message(MqConfig.TOPIC, MqConfig.TAG, msgBody.getBytes(RemotingHelper.DEFAULT_CHARSET));
        msg.setBody("hello mq".getBytes());
        return msg;
    }

 3.1.1 适用场景

  • 用户订单,根据订单id的维度做分区
  • 用户组织架构变更:saas环境按照企业id做分区

3.2 全局顺序

将所有的数据放入同一个分区(一般分区有序就够使用了)

问题:无法并行的消费;broker故障转移影响的范围大

4. 事务消息

发送半消息并确认

    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(MqConfig.ACCESS_KEY, MqConfig.SECRET_KEY));
    }

    public static void main(String[] args) throws Exception {
        /**
         * 创建事务消息Producer
         */
        TransactionMQProducer transactionMQProducer = new TransactionMQProducer(MqConfig.GROUP_ID, getAclRPCHook());
        transactionMQProducer.setNamesrvAddr(MqConfig.NAMESRV_ADDR);
        transactionMQProducer.setTransactionCheckListener(new LocalTransactionCheckerImpl());

        transactionMQProducer.setAccessChannel(AccessChannel.CLOUD);
        transactionMQProducer.start();

        // 需要保存sendResult中的msgId,因为二次提交消息失败后,mq会通过回调方式查询结果
        SendResult sendResult = transactionMQProducer.sendMessageInTransaction(createMsg("事务消息"), new LocalTransactionExecuter() {
            @Override
            public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
                System.out.println("开始执行本地事务: " + msg);
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        }, null);
    }

    public static Message createMsg(String msgBody) throws UnsupportedEncodingException {
        Message msg = new Message(MqConfig.TOPIC, MqConfig.TAG, msgBody.getBytes(RemotingHelper.DEFAULT_CHARSET));
        msg.setBody("hello mq".getBytes());
        return msg;
    }

mq回调反查本地事务结果

public class LocalTransactionCheckerImpl implements TransactionCheckListener {
 
    @Override public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
        System.out.println("收到事务消息的回查请求, MsgId: " + msg.getMsgId());
        return LocalTransactionState.COMMIT_MESSAGE;
    }

}

4.1 解决问题

       实现与本地事务一致性的处理(如解决问题:本地事务回滚/未保存,但是向MQ提交的消息可能已经倍消费端消费;如果本地事务先提交,然后再发送到MQ,会出现丢失问题)

4.2 实现原理

   mq首先将消息放入半消息队列中,然后等待Producer再次确认是否提交。mq一直没有收到确认结果,会回调反查

4.3 替代方案-事件表方案

db创建一张"事件表",将向mq发送的事务消息,insert到mysql。

开一个线程一直读取该表,然后向MQ同步写

定时清理已处理完成的事件,避免事件表过大导致查询慢

优点:简单、性能尚可

5. 延迟消息

6. 发送消息重试机制

原理:

建议:使用try-catch包裹发送的消息。  理由:如果mq客户端有bug,导致重试失败,我们自己的业务逻辑可以重试,重试失败,可以暂存DB

ps:曾经有一种异常,该异常的返回值code没有被重试机制捕捉到,导致重试机制失败

待续......

最近更新

  1. TCP协议是安全的吗?

    2023-12-07 03:50:02       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2023-12-07 03:50:02       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2023-12-07 03:50:02       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2023-12-07 03:50:02       18 阅读

热门阅读

  1. 获取图像大小 - 编程指南

    2023-12-07 03:50:02       38 阅读
  2. MongoDB导入导出命令

    2023-12-07 03:50:02       31 阅读
  3. 将Linux 标准输出,错误输出重定向到文件

    2023-12-07 03:50:02       39 阅读
  4. 二:C语言-数据类型和变量

    2023-12-07 03:50:02       33 阅读
  5. Django回顾3

    2023-12-07 03:50:02       26 阅读
  6. [Electron] 将应用日志文件输出

    2023-12-07 03:50:02       39 阅读
  7. 【debug】Image 库 字体问题

    2023-12-07 03:50:02       29 阅读
  8. React使用echarts并且修改echarts图大小

    2023-12-07 03:50:02       36 阅读
  9. 解释 Git 的基本概念和使用方式

    2023-12-07 03:50:02       26 阅读