Featured image of post RocketMQ消息发送的设计

RocketMQ消息发送的设计

本文带你了解RocketMQ的消息都有什么组成和消息发送的设计

前言

在前面所介绍的RocketMQ的文章中,介绍了RocketMQ的消息发送方式有3种:同步、异步和单向。具体的可查看:RocketMQ的主要组件及其功能

在接下来的关于RocketMQ的消息发送的文章中,可以带着几个问题来阅读:

1、RocketMQ的消息队列是如何负载的?

2、RocketMQ的消息发送如何实现消息发送是如何实现高可用的?

3、当批量发送消息的时候如何实现一致性?

一、RocketMQ消息的组成

RocketMQ的消息实现类是:org.apache.rocketmq.common.message.Message

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Message implements Serializable {
    private static final long serialVersionUID = 8445773977080406428L;

    private String topic;
    private int flag;
    private Map<String, String> properties;
    private byte[] body;
    private String transactionId;
    
    public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) {
        this.topic = topic;
        this.flag = flag;
        this.body = body;

        if (tags != null && tags.length() > 0) {
            this.setTags(tags);
        }

        if (keys != null && keys.length() > 0) {
            this.setKeys(keys);
        }

        this.setWaitStoreMsgOK(waitStoreMsgOK);
    }
    
    // ... setter getter constructor
}

消息的组成有:消息所属的Topic、消息标记(RocketMQ不做处理),扩展属性,消息体以及RocketMQ 4.3.0引入的事务消息相关的事务Id。

看一下上面代码Message的一个有参构造函数的tags、keys以及waitStoreMsgOK都会被添加的扩展属性(properties):

  • tags设置消息的tag,可用于消息过滤。设置到扩展信息保存时,Key为TAGS
  • keys消息索引键,用空格隔开,RocketMQ可以根基这些key来快速检索信息。保存到扩展信息时,Key为KEYS
  • waitStoreMsgOK:消息发送是是否等消息存储完成后再返回。保存到扩展信息时,Key为WAIT

二、消息的Topic路由机制

生产者在向Topic发送消息是时,需要查询Topic的路由信息。初次发送时会根据Topic的名称向NameServer集群查询Topic的路由信息,然后将其存储在本地内存缓存中,并且每隔30秒遍历中的Topic,向NameServer查询最新的路由信息。如果成功查询到路由信息,会将这些路由信息更新到本地缓存中,以此来实现Topic路由信息的动态感知。

RocketMQ跟一些消息中间件一样,运行的时候如果Topic不存在可以自动创建Topic。生产者在向Topic发送消息的时候,如果Topic时不存在的,在向NameServer查询该主题的路由信息的时候会先返回空,如果开启了自动创建Topic的机制(没错,这个机制是可以禁用的),会用一个默认的Topic再次向NameServer查询路由信息,然后生产者会使用默认Topic的路由信息进行负载均衡,但不会直接使用默认路由信息作为新主题创建对应的路由信息。这里画了一个时序图,可以了解一下:

三、RocketMQ消息的高可用设计原理

生产者在获取到Topic的路由信息之后,RocketMQ默认使用轮询算法进行路由的负载均衡。当然RocketMQ也是支持自定义负载均衡算法,但是在使用自定义的路由负载均衡算法之后,RocketMQ的重试机制会失效。因为RocketMQ在实现消息发送的高可用的时候引入了两个重要特性:

  1. 消息发送重试机制。在发送消息出现失败的时候,默认会重试两次。
  2. 故障规避机制。在消息第一次发送失败之后,下一次发送到刚刚失败的Broker上大概是失败的。因此为了保证重试的可靠性,在重试的时候会尽量避免上一次发送失败的Broker,而是选择其他Broker上的队列进行发送,从而提高消息发送的成功率。

这里画了一个流程图,可以看一下:

在接下来的消息发送、消息存储、消息消费之前,我们整体了解一下消息发送的流程:

四、总结

本文介绍了RockMQ中的消息设计及其高可用的原理,接下来将介绍RocketMQ的消息发送流程

Licensed under CC BY-NC-SA 4.0