前言
在前面所介绍的RocketMQ的文章中,介绍了RocketMQ的消息发送方式有3种:同步、异步和单向。具体的可查看:RocketMQ的主要组件及其功能
在接下来的关于RocketMQ的消息发送的文章中,可以带着几个问题来阅读:
1、RocketMQ的消息队列是如何负载的?
2、RocketMQ的消息发送如何实现消息发送是如何实现高可用的?
3、当批量发送消息的时候如何实现一致性?
一、RocketMQ消息的组成
RocketMQ的消息实现类是:org.apache.rocketmq.common.message.Message
|
|
消息的组成有:消息所属的Topic、消息标记(RocketMQ不做处理),扩展属性,消息体以及RocketMQ 4.3.0引入的事务消息相关的事务Id。
看一下上面代码Message的一个有参构造函数的tags、keys以及waitStoreMsgOK都会被添加的扩展属性(properties):
- tags设置消息的tag,可用于消息过滤。设置到扩展信息保存时,Key为
TAGS
。 - keys消息索引键,用空格隔开,RocketMQ可以根基这些key来快速检索信息。保存到扩展信息时,Key为
KEYS
。 - waitStoreMsgOK:消息发送是是否等消息存储完成后再返回。保存到扩展信息时,Key为
WAIT
。
二、消息的Topic路由机制
生产者在向Topic发送消息是时,需要查询Topic的路由信息。初次发送时会根据Topic的名称向NameServer集群查询Topic的路由信息,然后将其存储在本地内存缓存中,并且每隔30秒遍历中的Topic,向NameServer查询最新的路由信息。如果成功查询到路由信息,会将这些路由信息更新到本地缓存中,以此来实现Topic路由信息的动态感知。
RocketMQ跟一些消息中间件一样,运行的时候如果Topic不存在可以自动创建Topic。生产者在向Topic发送消息的时候,如果Topic时不存在的,在向NameServer查询该主题的路由信息的时候会先返回空,如果开启了自动创建Topic的机制(没错,这个机制是可以禁用的),会用一个默认的Topic再次向NameServer查询路由信息,然后生产者会使用默认Topic的路由信息进行负载均衡,但不会直接使用默认路由信息作为新主题创建对应的路由信息。这里画了一个时序图,可以了解一下:
三、RocketMQ消息的高可用设计原理
生产者在获取到Topic的路由信息之后,RocketMQ默认使用轮询算法进行路由的负载均衡。当然RocketMQ也是支持自定义负载均衡算法,但是在使用自定义的路由负载均衡算法之后,RocketMQ的重试机制会失效。因为RocketMQ在实现消息发送的高可用的时候引入了两个重要特性:
- 消息发送重试机制。在发送消息出现失败的时候,默认会重试两次。
- 故障规避机制。在消息第一次发送失败之后,下一次发送到刚刚失败的Broker上大概是失败的。因此为了保证重试的可靠性,在重试的时候会尽量避免上一次发送失败的Broker,而是选择其他Broker上的队列进行发送,从而提高消息发送的成功率。
这里画了一个流程图,可以看一下:
在接下来的消息发送、消息存储、消息消费之前,我们整体了解一下消息发送的流程:
四、总结
本文介绍了RockMQ中的消息设计及其高可用的原理,接下来将介绍RocketMQ的消息发送流程