《RocketMQ实战与进阶》 03 消息发送核心参数与工作原理详解

张开发
2026/4/18 3:35:32 15 分钟阅读

分享文章

《RocketMQ实战与进阶》 03 消息发送核心参数与工作原理详解
本篇将重点关注 DefaultMQProducer 中的相关属性以便从这些属性窥探 RocketMQ 消息发送较为底层的原理。从 DefaultMQProducer 的类图就可以看出其属性主要来源于 ClientConfig、DefaultMQProducer故接下来将分两部分进行介绍。DefaultMQProducer 参数一览DefaultMQProducer 的参数如下InternalLogger log ClientLogger.getLog()客户端的日志实现类RocketMQ 客户端的日志路径为${user.home}/logs/rocketmqlogs/rocketmq_client.log。在排查问题时可以从日志文件下手寻找错误日志为解决问题提供必要的信息。其中 user.home 为用户的主目录。producerGroup发送者所属组开源版本的 RocketMQ发送者所属组主要的用途是事务消息Broker 需要向消息发送者回查事务状态。可以通过相关命令或 RocketMQ-Console 查看某一个 Topic 指定消费组的客户端如下图所示defaultTopicQueueNums 4通过生产者创建 Topic 时默认的队列数量。sendMsgTimeout 3000消息发送默认超时时间单位为毫秒。值得注意的是在 RocketMQ 4.3.0 版本之前由于存在重试机制设置的设计为单次重试的超时时间即如果设置重试次数为 3 次则DefaultMQProducer#send方法可能会超过 9s 才返回该问题在 RocketMQ 4.3.0 版本进行了优化设置的超时时间为总的超时时间即如果超时时间设置 3s重试次数设置为 10 次可能不会重试 10 次例如在重试到第 5 次的时候已经超过 3s 了试图尝试第 6 次重试时会退出抛出超时异常停止重试。compressMsgBodyOverHowmuch压缩的阔值默认为 4k即当消息的消息体超过 4k则会使用 zip 对消息体进行压缩会增加 Broker 端的 CPU 消耗但能提高网络方面的开销。retryTimesWhenSendFailed同步消息发送重试次数。RocketMQ 客户端内部在消息发送失败时默认会重试 2 次。请主要该参数与 sendMsgTimeout 会联合起来生效详情请参照上文所述。retryTimesWhenSendAsyncFailed异步消息发送重试次数默认为 2即重试 2 次通常情况下有三次机会。retryAnotherBrokerWhenNotStoreOK该参数的本意是如果客户端收到的结果不是 SEND_OK应该是不问源由的继续向另外一个 Broker 重试但根据代码分析目前这个参数并不能按预期运作应该是一个 Bug。int maxMessageSize允许发送的最大消息体默认为 4M服务端Broker也有 maxMessageSize 这个参数的设置故客户端的设置不能超过服务端的配置最佳实践为客户端的配置小于服务端的配置。sendLatencyFaultEnable是否开启失败延迟规避机制。RocketMQ 客户端内部在重试时会规避上一次发送失败的 Broker如果开启延迟失败规避则在未来的某一段时间内不向该 Broker 发送消息具体机制在本篇的第三部分详细展开。默认为 false不开启。notAvailableDuration不可用的延迟数组默认值为 {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}即每次触发 Broker 的延迟时间是一个阶梯的会根据每次消息发送的延迟时间来选择在未来多久内不向该 Broker 发送消息。latencyMax设置消息发送的最大延迟级别默认值为 {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}个数与 notAvailableDuration 对应关于 Broker 的延迟关闭机制将在本文第三部详细探讨。ClientConfig 参数一览ClientConfig 顾名思义客户端的配置在 RocketMQ 中消息发送者Producer和消息消费者Consumer即上面的配置生产者、消费者是通用的。namesrvAddrNameServer 的地址列表。clientIP客户端 IP通过RemotingUtil#getLocalAddress方法获取在 4.7.0 版本中优先会返回不是 127.0.0.1 和 192.168 开头的最后一个 IPV4 或第一个 IPV6。客户端 IP 主要是用来定位消费者的clientIP 会当成客户端 id 的组成部分。如下图所示在菜单 [Consumer] 列表中点击一个消费组点击按钮 [client] 可以查阅其客户端消费者。instanceName客户端实例名称是客户端标识 CID 的组成部分在第三篇会详细其 CID 与场景的使用问题。unitName定义一个单元主要用途客户端 CID 的组成部分如果获取 NameServer 的地址是通过 URL 进行动态更新的话会将该值附加到当中即可以区分不同的获取 NameServer 地址的服务。clientCallbackExecutorThreads客户端 public 回调的线程池线程数量默认为 CPU 核数不建议改变该值。namespace客户端命名空间从 4.5.1 版本被引入在第三篇中已详细介绍。pollNameServerInterval客户端从 NameServer 更新 Topic 的间隔默认值 30s就 Producer、Consumer 会每隔 30s 向 NameServer 更新 Topic 的路由信息该值不建议修改。heartbeatBrokerInterval客户端向 Broker 发送心跳包的时间间隔默认为 30s该值不建议修改。persistConsumerOffsetInterval客户端持久化消息消费进度的间隔默认为 5s该值不建议修改。核心参数工作机制与使用建议消息发送高可用设计与故障规避机制熟悉 RocketMQ 的小伙伴应该都知道RocketMQ Topic 路由注册中心 NameServer 采用的是最终一致性模型而且客户端是定时向 NameServer 更新 Topic 的路由信息即客户端Producer、Consumer是无法实时感知 Broker 宕机的这样消息发送者会继续向已宕机的 Broker 发送消息造成消息发送异常。那 RocketMQ 是如何保证消息发送的高可用性呢RocketMQ 为了保证消息发送的高可用性在内部引入了重试机制默认重试 2 次。RocketMQ 消息发送端采取的队列负载均衡默认采用轮循。在 RocketMQ 中消息发送者是线程安全的即一个消息发送者可以在多线程环境中安全使用。每一个消息发送者全局会维护一个 Topic 上一次选择的队列然后基于这个序号进行递增轮循引入了 ThreadLocal 机制即每一个发送者线程持有一个上一次选择的队列用 sendWhichQueue 表示。接下来举例消息队列负载机制例如 topicA 的路由信息如下图所示正如上图所 topicA 在 broker-a、broker-b 上分别创建了 4 个队列例如一个线程使用 Producer 发送消息时通过对 sendWhichQueue getAndIncrement() 方法获取下一个队列。例如在发送之前 sendWhichQueue 该值为 broker-a 的 q1如果由于此时 broker-a 的突发流量异常大导致消息发送失败会触发重试按照轮循机制下一个选择的队列为 broker-a 的 q2 队列此次消息发送大概率还是会失败即尽管会重试 2 次但都是发送给同一个 Broker 处理此过程会显得不那么靠谱即大概率还是会失败那这样重试的意义将大打折扣。故 RocketMQ 为了解决该问题引入了故障规避机制在消息重试的时候会尽量规避上一次发送的 Broker回到上述示例当消息发往 broker-a q1 队列时返回发送失败那重试的时候会先排除 broker-a 中所有队列即这次会选择 broker-b q1 队列增大消息发送的成功率。上述规避思路是默认生效的即无需干预。但 RocketMQ 提供了两种规避策略该参数由sendLatencyFaultEnable控制用户可干预表示是否开启延迟规避机制默认为不开启。sendLatencyFaultEnable 设置为 false默认值不开启延迟规避策略只在重试时生效例如在一次消息发送过程中如果遇到消息发送失败规避 broekr-a但是在下一次消息发送时即再次调用 DefaultMQProducer 的 send 方法发送消息时还是会选择 broker-a 的消息进行发送只要继续发送失败后重试时再次规避 broker-a。sendLatencyFaultEnable 设置为 true开启延迟规避机制一旦消息发送失败会将 broker-a “悲观”地认为在接下来的一段时间内该 Broker 不可用在为未来某一段时间内所有的客户端不会向该 Broker 发送消息。这个延迟时间就是通过 notAvailableDuration、latencyMax 共同计算的就首先先计算本次消息发送失败所耗的时延然后对应 latencyMax 中哪个区间即计算在 latencyMax 的下标然后返回 notAvailableDuration 同一个下标对应的延迟值。温馨提示如果所有的 Broker 都触发了故障规避并且 Broker 只是那一瞬间压力大那岂不是明明存在可用的 Broker但经过你这样规避反倒是没有 Broker 可用来那岂不是更糟糕了针对这个问题会退化到队列轮循机制即不考虑故障规避这个因素按自然顺序进行选择进行兜底。实战经验分享按照实践经验RocketMQ Broker 的繁忙基本都是瞬时的而且通常与系统 PageCache 内核的管理相关很快就能恢复故不建议开启延迟机制。因为一旦开启延迟机制例如 5 分钟内不会向一个 Broker 发送消息这样会导致消息在其他 Broker 激增从而会导致部分消费端无法消费到消息增大其他消费者的处理压力导致整体消费性能的下降。客户端 ID 与使用陷进介绍客户端 ID 主要的目的是能在如下场景正确使用消息发送与消费。同一套代码能否在同一台机器上部署多个实例同一套代码能向不同的 NameServer 集群发送消息、消费消息吗本篇的试验环境部署架构如下部署了两套 RocketMQ 集群在 DefaultCluster 集群上创建 Topic——dw_test_01并在 DefaultClusterb 上创建 Topic——dw_test_02现在的需求是 order-service-app 要向 dw_test_01、dw_test_02 上发送消息。给出的示例代码如下public static void main(String[] args) throws Exception{ // 创建第一个生产者 DefaultMQProducer producer new DefaultMQProducer(dw_test_producer_group1); producer.setNamesrvAddr(192.168.3.10:9876); producer.start(); // 创建第二个生产者 DefaultMQProducer producer2 new DefaultMQProducer(dw_test_producer_group2); producer2.setNamesrvAddr(192.168.3.19:9876); producer2.start(); try { // 向第一个 RocketMQ 集群发送消息 SendResult result1 producer.send( new Message(dw_test_01 , hello 192.168.3.10 nameserver.getBytes())); System.out.printf(%s%n, result1); } catch (Throwable e) { System.out.println(-----first------------); e.printStackTrace(); System.out.println(-----first------------); } try { // 向第一个 RocketMQ 集群发送消息 SendResult result2 producer2.send( new Message(dw_test_02 , hello 192.168.3.19 nameserver.getBytes())); System.out.printf(%s%n, result2); } catch (Throwable e) { System.out.println(-----secornd------------); e.printStackTrace(); System.out.println(-----secornd------------); } //睡眠 10s简单延迟该任务的结束 Thread.sleep(10000); }运行结果如下图所示在向集群 2 发送消息时出现 Topic 不存在但明明创建了 dw_test_02而且如果单独向集群 2 的 dw_test_02 发送消息确能成功初步排查是创建了两个到不同集群的 Producer 引起的那这是为什么呢如果解决呢1. 问题分析要解决该问题首先得理解 RocketMQ Client 的核心组成部分如下图所示上述中几个核心关键点如下MQClientInstanceRocketMQ 客户端一个非常重要的对象代表一个 MQ 客户端并且其唯一标识为 clientId。该对象中会持有众多的消息发送者客户端 producerTable其键为消息发送者组同样可以创建多个消费组以消费组为键存储在 consumerTable 中。一个 JVM 进程中即一个应用程序中是否能创建多个 MQClientInstance 呢同样是可以的MQClientManager 对象持有一个 MQClientInstance 容器键为 clientId。那既然一个 JVM 中能支持创建多个生产者那为什么上面的示例中创建了两个生产者并且生产者组也不一样那为什么不能正常工作呢这是因为上述两个 Producer 对应的 clinetId 相同会对应同一个 MQClientInstance 对象这样两个生产者都会注册到一个 MQClientInstance即这两个生产者使用的配置为第一个生产者的配置即配置的 nameserver 地址为 192.168.3.10:9876而在集群 1 上并没有创建 topic——dw_test_02故无法找到对应的主题而抛出上述错误。我们可以通过调用 DefaultMQProducer 的 buildMQClientId() 方法查看其生成的 clientId运行后的结果如下图所示那解决思路就非常清晰了我们只需要改变两者的 clientId 即可故接下来看一下 RocketMQ 中 clientId 的生成规则。温馨提示该方法定义在 ClientConfig 中RocketMQ 生产者、消费者都是 ClientConfig 的子类。clientId 的生成策略如下clientIp客户端的 IP 地址。instanceName实例名称默认值为 DEFAULT但在真正 clientConfig 的 getInstanceName 方法时如果实例名称为 DEFAULT会自动将其替换为进程的 PID。unitName单元名称如果不为空则会追加到 clientId 中。了解到 clientId 的生成规则后提出解决方案已是水到渠成的事情了。2. 解决方案结合 clientId 三个组成部分我不建议修改 instanceName让其保持默认值 DEFAULT这样在真正的运行过程中会自动变更为进程的 pid这样能解决同一套代码在同一台机器上部署多个进程这样 clientId 并不会重复故我建议大家修改 unitName可以考虑将其修改为集群的名称修改后的代码如下所示public static void main(String[] args) throws Exception{ //省略代码 DefaultMQProducer producer2 new DefaultMQProducer(dw_test_producer_group2); producer2.setNamesrvAddr(192.168.3.19:9876); producer2.setUnitName(DefaultClusterb); producer2.start(); //省略代码运行结果如下图所示完美解决。小结本篇首先介绍了消息发送者所有的配置参数及其基本含义紧接着详细介绍了 RocketMQ 消息发送故障规避机制、消息客户端 ID 的生成策略以及实战中如何使用并且告知如何避坑。

更多文章