Featured image of post RocketMQ的NameServer的路由管理:路由的注册、删除与发现

RocketMQ的NameServer的路由管理:路由的注册、删除与发现

从源码分析NameServer的路由信息管理功能:路由元信息、路由注册、路由删除和路由发现

一、NameServer的路由元信息

NameServer的主要作用是为消息生产者和消息消费者提供Topic的路由信息。所以NameServer需要保存和管理路由的基础信息。

NameServer的路由管理的实现类是:org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
public class RouteInfoManager {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    // 120秒,broker 上一次心跳时间超过这个数便会被剔除
    private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final HashMap<String/* topic */, Map<String /* brokerName */ , QueueData>> topicQueueTable;
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

    public RouteInfoManager() {
        this.topicQueueTable = new HashMap<>(1024);
        this.brokerAddrTable = new HashMap<>(128);
        this.clusterAddrTable = new HashMap<>(32);
        this.brokerLiveTable = new HashMap<>(256);
        this.filterServerTable = new HashMap<>(256);
    }
    //...省略
}

org.apache.rocketmq.common.protocol.route.QueueData

1
2
3
4
5
6
7
8
public class QueueData implements Comparable<QueueData> {
    private String brokerName;
    private int readQueueNums;
    private int writeQueueNums;
    private int perm;
    private int topicSysFlag;
    // ... setter getter
}

org.apache.rocketmq.common.protocol.route.BrokerData

1
2
3
4
5
6
7
8
public class BrokerData implements Comparable<BrokerData> {
    private String cluster;
    private String brokerName;
    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;

    private final Random random = new Random();
    // ... setter getter
}

org.apache.rocketmq.namesrv.routeinfo.BrokerLiveInfo

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
class BrokerLiveInfo {
    private long lastUpdateTimestamp;
    private DataVersion dataVersion;
    private Channel channel;
    private String haServerAddr;

    public BrokerLiveInfo(long lastUpdateTimestamp, DataVersion dataVersion, Channel channel,
                          String haServerAddr) {
        this.lastUpdateTimestamp = lastUpdateTimestamp;
        this.dataVersion = dataVersion;
        this.channel = channel;
        this.haServerAddr = haServerAddr;
    }
    // ... setter getter
}
  • topicQueueTable:topic消息队列的路由信息,消息发送的时候会根据路由表进行负载均衡。Key为topic名称,value也是一个Map:以brokerName为key,value是队列数据如上代码所示,包含读/写队列数量、权重等。
  • brokerAddrTable:broker的基础信息,Key为brokerName,value包含brokerName,broker所在的集群信息,主备broker的地址。
  • clusterAddrTable:broker集群信息,Key为集群名称(clusterName),value存储的是集群中所有broker的名称(brokerName)。
  • brokerLiveTable:Broker状态信息,NameServer每次收到心跳包时会替换该信息。这也是NameServer每10秒要扫描的信息。
  • filterServerTable:Broker上的FilterServer列表,用于类模式消息过滤。类模式过滤机制在4.4及以后版本被废弃

类图如下:

1.1 RocketMQ运行时的路由元信息

RocketMQ的一个Topic是可以有多个消息队列,一个Broker默认会为每一个Topic创建4个读队列和4个写队列。多个Broker组成一个集群,多个BrokerName一样的Broker组成主从架构。brokerId大于0表示从节点,brokerId等于0表示是主节点。假如配置如下的broker集群,集群名c1:

在启动Broker的时候,指定配置文件,修改broker配置文件的:brokerClusterName、brokerName和brokerId。

1.1.1 本地运行IDEA Debug查看运行时的路由元信息

关于本地调试可查看之前的文章:2.2 Window上使用IDEA运行RocketMQ

首先创建创建四个Broker的配置文件主要修改:brokerClusterName、brokerName和brokerId,注意store目录也需要修改,每个broker使用不同的store目录,或者是使用不同的rocketmq目录。listenPort也需要指定不同的端口,因为在本地调试使用的同一台电脑,而且listenProt的值不能相隔太近不然会报错:Address already in use: bind

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
brokerClusterName = c1
brokerName = broker-a
brokerId = 0
namesrvAddr=127.0.0.1:9876
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
listenPort=10911
# 存储路径E:\java\source\rocketmq\ROCKETMQ
storePathRootDir=E:\\java\\source\\rocketmq\\ROCKETMQ\\store-a0
# CommitLog存储路径
storePathCommitLog=E:\\java\\source\\rocketmq\\ROCKETMQ\\store-a0\\commitlog
# 消费队列存储路径
storePathConsumeQueue=E:\\java\\source\\rocketmq\\ROCKETMQ\\store-a0\\consumequeue
# 消息索引存储路径
storePathIndex=E:\\java\\source\\rocketmq\\ROCKETMQ\\store-a0\\index
# checkpoint文件存储路径
storeCheckpoint=E:\\java\\source\\rocketmq\\ROCKETMQ\\store-a0\\checkpoint
# abort文件存储路径
abortFile=E:\\java\\source\\rocketmq\\ROCKETMQ\\store-a0\\abort

IDEA运行四个Broker实例并指定配置文件,如下图所示:

配置完成之后,先以Debug模式运行NamesrvStartup(启动NameServer),再启动四个Broker。都启动完成之后,在org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#scanNotActiveBroker方法里面打个断点。该方法每隔10秒扫描一次brokerLiveTable,移除处于未激活状态的Broker,路由元信息如下:

二、路由注册流程分析

Broker通过心跳机制向NameServer发送心跳包,每个隔30秒就会向NameServer集群发送心跳包,NameServer收到心跳包之后会先更新brokerLiveTable的lastUpdateTimestamp。NameServer每隔10秒就会扫描brokerLiveTable中各个Broker上报来的lastUpdateTimestamp,如果连续超过120秒没收到Broker的心跳包,NameServer会把该Broker的路由信息移除。

2.1 Broker发送心跳包

我们在本地是通过运行org.apache.rocketmq.broker.BrokerStartup#start方法启动,从这里开始看,进到BrokerController类的start方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void start() throws Exception {
    // ...省略一些代码
    
    // 定时任务每个30秒向NameServer注册路由信息
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
            } catch (Throwable e) {
                log.error("registerBrokerAll Exception", e);
            }
        } // brokerConfig.getRegisterNameServerPeriod() ---> 30 * 1000
    }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
    if (this.brokerStatsManager != null) {
        this.brokerStatsManager.start();
    }
    if (this.brokerFastFailure != null) {
        this.brokerFastFailure.start();
    }

}

点击org.apache.rocketmq.broker.BrokerController#registerBrokerAll方法查看:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
    // topicConfig的包装
    TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
	// 设置topicConfig
    if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
        || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
        ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
        for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
            TopicConfig tmp =
                new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),

# NameServer的路由元信息

NameServer的主要作用是为消息生产者和消息消费者提供Topic的路由信息所以NameServer需要保存和管理路由的基础信息

NameServer的路由管理的实现类是:`org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager`:

```java
public class RouteInfoManager {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    // 120秒,broker 上一次心跳时间超过这个数便会被剔除
    private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final HashMap<String/* topic */, Map<String /* brokerName */ , QueueData>> topicQueueTable;
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

    public RouteInfoManager() {
        this.topicQueueTable = new HashMap<>(1024);
        this.brokerAddrTable = new HashMap<>(128);
        this.clusterAddrTable = new HashMap<>(32);
        this.brokerLiveTable = new HashMap<>(256);
        this.filterServerTable = new HashMap<>(256);
    }
    //...省略
}

org.apache.rocketmq.common.protocol.route.QueueData

1
2
3
4
5
6
7
8
public class QueueData implements Comparable<QueueData> {
    private String brokerName;
    private int readQueueNums;
    private int writeQueueNums;
    private int perm;
    private int topicSysFlag;
    // ... setter getter
}

org.apache.rocketmq.common.protocol.route.BrokerData

1
2
3
4
5
6
7
8
public class BrokerData implements Comparable<BrokerData> {
    private String cluster;
    private String brokerName;
    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;

    private final Random random = new Random();
    // ... setter getter
}

org.apache.rocketmq.namesrv.routeinfo.BrokerLiveInfo

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
class BrokerLiveInfo {
    private long lastUpdateTimestamp;
    private DataVersion dataVersion;
    private Channel channel;
    private String haServerAddr;

    public BrokerLiveInfo(long lastUpdateTimestamp, DataVersion dataVersion, Channel channel,
                          String haServerAddr) {
        this.lastUpdateTimestamp = lastUpdateTimestamp;
        this.dataVersion = dataVersion;
        this.channel = channel;
        this.haServerAddr = haServerAddr;
    }
    // ... setter getter
}
  • topicQueueTable:topic消息队列的路由信息,消息发送的时候会根据路由表进行负载均衡。Key为topic名称,value也是一个Map:以brokerName为key,value是队列数据如上代码所示,包含读/写队列数量、权重等。
  • brokerAddrTable:broker的基础信息,Key为brokerName,value包含brokerName,broker所在的集群信息,主备broker的地址。
  • clusterAddrTable:broker集群信息,Key为集群名称(clusterName),value存储的是集群中所有broker的名称(brokerName)。
  • brokerLiveTable:Broker状态信息,NameServer每次收到心跳包时会替换该信息。这也是NameServer每10秒要扫描的信息。
  • filterServerTable:Broker上的FilterServer列表,用于类模式消息过滤。类模式过滤机制在4.4及以后版本被废弃

类图如下:

1.1 RocketMQ运行时的路由元信息

RocketMQ的一个Topic是可以有多个消息队列,一个Broker默认会为每一个Topic创建4个读队列和4个写队列。多个Broker组成一个集群,多个BrokerName一样的Broker组成主从架构。brokerId大于0表示从节点,brokerId等于0表示是主节点。假如配置如下的broker集群,集群名c1:

在启动Broker的时候,指定配置文件,修改broker配置文件的:brokerClusterName、brokerName和brokerId。

1.1.1 本地运行IDEA Debug查看运行时的路由元信息

关于本地调试可查看之前的文章:2.2 Window上使用IDEA运行RocketMQ

首先创建创建四个Broker的配置文件主要修改:brokerClusterName、brokerName和brokerId,注意store目录也需要修改,每个broker使用不同的store目录,或者是使用不同的rocketmq目录。listenPort也需要指定不同的端口,因为在本地调试使用的同一台电脑,而且listenProt的值不能相隔太近不然会报错:Address already in use: bind

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
brokerClusterName = c1
brokerName = broker-a
brokerId = 0
namesrvAddr=127.0.0.1:9876
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
listenPort=10911
# 存储路径E:\java\source\rocketmq\ROCKETMQ
storePathRootDir=E:\\java\\source\\rocketmq\\ROCKETMQ\\store-a0
# CommitLog存储路径
storePathCommitLog=E:\\java\\source\\rocketmq\\ROCKETMQ\\store-a0\\commitlog
# 消费队列存储路径
storePathConsumeQueue=E:\\java\\source\\rocketmq\\ROCKETMQ\\store-a0\\consumequeue
# 消息索引存储路径
storePathIndex=E:\\java\\source\\rocketmq\\ROCKETMQ\\store-a0\\index
# checkpoint文件存储路径
storeCheckpoint=E:\\java\\source\\rocketmq\\ROCKETMQ\\store-a0\\checkpoint
# abort文件存储路径
abortFile=E:\\java\\source\\rocketmq\\ROCKETMQ\\store-a0\\abort

IDEA运行四个Broker实例并指定配置文件,如下图所示:

配置完成之后,先以Debug模式运行NamesrvStartup(启动NameServer),再启动四个Broker。都启动完成之后,在org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#scanNotActiveBroker方法里面打个断点。该方法每隔10秒扫描一次brokerLiveTable,移除处于未激活状态的Broker,路由元信息如下:

二、路由注册流程分析

Broker通过心跳机制向NameServer发送心跳包,每个隔30秒就会向NameServer集群发送心跳包,NameServer收到心跳包之后会先更新brokerLiveTable的lastUpdateTimestamp。NameServer每隔10秒就会扫描brokerLiveTable中各个Broker上报来的lastUpdateTimestamp,如果连续超过120秒没收到Broker的心跳包,NameServer会把该Broker的路由信息移除。

2.1 Broker发送心跳包

我们在本地是通过运行org.apache.rocketmq.broker.BrokerStartup#start方法启动,从这里开始看,进到BrokerController类的start方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void start() throws Exception {
    // ...省略一些代码
    
    // 定时任务每个30秒向NameServer注册路由信息
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
            } catch (Throwable e) {
                log.error("registerBrokerAll Exception", e);
            }
        } // brokerConfig.getRegisterNameServerPeriod() ---> 30 * 1000
    }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
    if (this.brokerStatsManager != null) {
        this.brokerStatsManager.start();
    }
    if (this.brokerFastFailure != null) {
        this.brokerFastFailure.start();
    }

}

点击org.apache.rocketmq.broker.BrokerController#registerBrokerAll方法查看:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
    // topicConfig的包装
    TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
	// 设置topicConfig
    if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
        || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
        ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
        for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
            TopicConfig tmp =
                new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
                    this.brokerConfig.getBrokerPermission());
            topicConfigTable.put(topicConfig.getTopicName(), tmp);
        }
        topicConfigWrapper.setTopicConfigTable(topicConfigTable);
    }
	// 是否需要注册
    if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
        this.getBrokerAddr(),
        this.brokerConfig.getBrokerName(),
        this.brokerConfig.getBrokerId(),
        this.brokerConfig.getRegisterBrokerTimeoutMills())) {
        // 执行路由注册
        doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
    }
}

继续追踪查看org.apache.rocketmq.broker.BrokerController#doRegisterBrokerAll

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
    TopicConfigSerializeWrapper topicConfigWrapper) {
    // 路由注册
    List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
        this.brokerConfig.getBrokerClusterName(),
        this.getBrokerAddr(),
        this.getHAServerAddr(),
        topicConfigWrapper,
        this.filterServerManager.buildNewFilterServerList(),
        oneway,
        this.brokerConfig.getRegisterBrokerTimeoutMills(),
        this.brokerConfig.isCompressedRegister());
	// 注册结果
    if (registerBrokerResultList.size() > 0) {
        RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
        if (registerBrokerResult != null) {
            if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
                this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
            }
            this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
            if (checkOrderConfig) {
                this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
            }
        }
    }
}

BrokerController#doRegisterBrokerAll调用了org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll方法如下所示,BrokerOuterAPI#registerBrokerAll封装Broker的基本信息和Topic信息发送给NameServer,并且把注册结果放到集合里面返回,在BrokerController#doRegisterBrokerAll方法进行相关的处理(注册成功/失败)。

在向每个NameServer注册时都会启动一个线程池,使用的是专门的线程池BrokerFixedThreadPoolExecutor,其实这里也体现了线程池的一个使用:业务线程池和I/O线程池分开,不要阻塞I/O线程池,这中设计在Netty上也有体现。在工作中我们也应该要这样使用线程池。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public List<RegisterBrokerResult> registerBrokerAll(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final String haServerAddr,
    final TopicConfigSerializeWrapper topicConfigWrapper,
    final List<String> filterServerList,
    final boolean oneway,
    final int timeoutMills,
    final boolean compressed) {
    final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
    List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
    if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
		// 构造注册请求的请求头
        final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
        requestHeader.setBrokerAddr(brokerAddr);
        requestHeader.setBrokerId(brokerId);
        requestHeader.setBrokerName(brokerName);
        requestHeader.setClusterName(clusterName);
        requestHeader.setHaServerAddr(haServerAddr);
        requestHeader.setCompressed(compressed);
		// 请求体 ---> TopicConfig
        RegisterBrokerBody requestBody = new RegisterBrokerBody();
        requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
        requestBody.setFilterServerList(filterServerList);
        final byte[] body = requestBody.encode(compressed);
        final int bodyCrc32 = UtilAll.crc32(body);
        requestHeader.setBodyCrc32(bodyCrc32);
        // CountDownLatch 向每个NameServer的注册都启动一个线程处理
        final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
        // 遍历NameServer列表,向每个NameServer都
        for (final String namesrvAddr : nameServerAddressList) {
            brokerOuterExecutor.execute(() -> {
                try {
                    // 发起心跳(请求)向NameServer注册
                    RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
                    // 请求结果
                    if (result != null) {
                        registerBrokerResultList.add(result);
                    }
                    log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
                } catch (Exception e) {
                    log.warn("registerBroker Exception, {}", namesrvAddr, e);
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        try {
            // 等待向所有的NameServer都注册完
            countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
        }
    }
	// 返回注册结果
    return registerBrokerResultList;
}

从上面的代码我们是可以看到Broker发送的心跳包信息有Header(请求头)和Body(请求体)

请求头:

1、brokerAddr:broker地址。

2、brokerId:brokerId=0表示主节点,brokerId>0表示从节点。

3、brokerName:broker名称。

4、clusterName:集群名称。

5、haServerAddr:主节点地址,初次请求时该值为空,从节点向NameServer注册后返回。

请求体:

1、topicConfigWrapper,主题配置,topicConfigWrapper内部封装的是TopicConfig Manager中的topicConfigTable,内部存储的是Broker启动时默认的一些topic,如MixAll.SELF_TEST_TOPIC、MixAll.DEFAULT_TOPIC(AutoCreateTopic-Enable=true)、MixAll.BENCHMARK_TOPIC、MixAll.OFFSET_MOVED_EVENT、BrokerConfig#brokerClusterName、BrokerConfig#brokerName。Broker中topic默认存储在${ROCKET_HOME}/store/confg/topics.json中。

2、filterServerList,消息过滤服务器列表。

2.2 NameServer处理心跳包

Broker发送心跳包之后,由org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest来处理网络请求,从下方代码可见,根据不同的RequestCode进行不同操作,处理注册的RequestCode为REGISTER_BROKER。org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#registerBroker方法大概的操作就是把请求头信息取出来,验证请求体并反序列化获取Topic信息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
    private static InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

    protected final NamesrvController namesrvController;

    public DefaultRequestProcessor(NamesrvController namesrvController) {
        this.namesrvController = namesrvController;
    }

    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {

        if (ctx != null) {
            log.debug("receive request, {} {} {}",
                request.getCode(),
                RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                request);
        }


        switch (request.getCode()) {
            case RequestCode.PUT_KV_CONFIG:
                return this.putKVConfig(ctx, request);
            case RequestCode.GET_KV_CONFIG:
                return this.getKVConfig(ctx, request);
            case RequestCode.DELETE_KV_CONFIG:
                return this.deleteKVConfig(ctx, request);
            case RequestCode.QUERY_DATA_VERSION:
                return queryBrokerTopicConfig(ctx, request);
            case RequestCode.REGISTER_BROKER:
                // 路由注册
                Version brokerVersion = MQVersion.value2Version(request.getVersion());
                if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                    return this.registerBrokerWithFilterServer(ctx, request);
                } else {
                    return this.registerBroker(ctx, request);
                }
            // ...省略 case
            default:
                break;
        }
        return null;
    }

    //...省略代码
}

registerBrokerWithFilterServer方法或registerBroker注册的处理会被转发到org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker方法:

  • 加写锁,防止并发修改路由表。首先判断Broker所属的集群(clusterName)是否存在,如果不存在则创建集群(clusterAddrTable),然后将Broker的名称添加到集群的Broker集合中。
  • 维护BrokerData信息,先从brokerAddrTable中根据Broker的名称来获取BrokerData,如果不存在,则新建一个BrokerData并保存进brokerAddrTable,registerFirst设置为true。如果该Broker已经存在对应的BrokerData,直接替换掉原来的,registerFirst为false。registerFirst为true表示第一次注册。
  • 如果接收到的Broker信息为主节点,并且Broker的Topic配置发生了变化或者是第一次注册,则需要创建或更新Topic的路由元数据(QueueData),并且把路由元数据设置/更新到topicQueueTable。其实就是为默认主题自动注册路由信息,其中包含MixAll.DEFAULT_TOPIC的路由信息。当消息生产者发送消息到主题时,如果该主题未创建,并且BrokerConfig的autoCreateTopicEnable为true,则返回MixAll.DEFAULT_TOPIC的路由信息。
  • 更新brokerLiveTable,存储能正常使用的Broker信息。BrokerLiveInfo是执行路由删除操作的重要依据。
  • 注册Broker的过滤器Server地址列表,一个Broker会关联多个FilterServer消息过滤服务器。如果此Broker是从节点,还需要查找该Broker的主节点信息,并且更新对应的masterAdd属性。
  • 最后解锁,返回注册结果
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
public class RouteInfoManager {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final HashMap<String/* topic */, Map<String /* brokerName */ , QueueData>> topicQueueTable;
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

    public RouteInfoManager() {
        this.topicQueueTable = new HashMap<>(1024);
        this.brokerAddrTable = new HashMap<>(128);
        this.clusterAddrTable = new HashMap<>(32);
        this.brokerLiveTable = new HashMap<>(256);
        this.filterServerTable = new HashMap<>(256);
    }
    // ...省略部分代码
    public RegisterBrokerResult registerBroker(
            final String clusterName,
            final String brokerAddr,
            final String brokerName,
            final long brokerId,
            final String haServerAddr,
            final TopicConfigSerializeWrapper topicConfigWrapper,
            final List<String> filterServerList,
            final Channel channel) {
        RegisterBrokerResult result = new RegisterBrokerResult();
        try {
            // org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor 解析请求类型,如果请求类型是REGISTER_BROKER
            // 则会请求到此方法注册broker
            try {
                // 路由注册需要枷锁,防止并发修改RouteInfoManger中的路由表。
                this.lock.writeLock().lockInterruptibly();
                // 首先判断broker所在的集群是否存在,如果不存在则创建集群 new HashSet<String>
                Set<String> brokerNames = this.clusterAddrTable.computeIfAbsent(clusterName, k -> new HashSet<>());
                // 将broker名加入集群broker集合
                brokerNames.add(brokerName);
                brokerNames.add(brokerName);

                boolean registerFirst = false;
                // 维护brokerData信息,先从brokerAddrTable中根据broker名获取broker信息
                BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                // 如果不存在,则创建
                if (null == brokerData) {
                    // 要注册broker,第一次注册
                    registerFirst = true;
                    brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());
                    this.brokerAddrTable.put(brokerName, brokerData);
                }
                Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
                //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
                //The same IP:PORT must only have one record in brokerAddrTable
                // 译:slave切换到master是 先把slave移除,再添加。同一个 IP:PORT 必须只有一个记录
                Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
                while (it.hasNext()) {
                    Entry<Long, String> item = it.next();
                    if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
                        log.debug("remove entry {} from brokerData", item);
                        it.remove();
                    }
                }
                // 把brokerId
                String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
                if (MixAll.MASTER_ID == brokerId) {
                    log.info("cluster [{}] brokerName [{}] master address change from {} to {}",
                            brokerData.getCluster(), brokerData.getBrokerName(), oldAddr, brokerAddr);
                }

                registerFirst = registerFirst || (null == oldAddr);
				// Topic的配置信息不为空并且Broker是主节点
                if (null != topicConfigWrapper
                        && MixAll.MASTER_ID == brokerId) {
                    // 如果broker是主节点并且topic配置信息发生该表(dataVersion不一致)或者是初次注册,需要创建或更新topic路由元数据
                    // 并填充topicQueueTable,其实就是为默认主题自动注册路由信息,其中包含 MixAll.DEFAULT_TOPIC的路由信息。
                    if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                            || registerFirst) {
                        ConcurrentMap<String, TopicConfig> tcTable =
                                topicConfigWrapper.getTopicConfigTable();
                        if (tcTable != null) {
                            for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                                // 更新或创建新的 QueueData
                                this.createAndUpdateQueueData(brokerName, entry.getValue());
                            }
                        }
                    }
                }
                // 更新BrokerLiveInfo,存储状态正常的Broker信息表,BrokeLiveInfo是执行路由删除操作的重要依据
                BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                        new BrokerLiveInfo(
                                System.currentTimeMillis(),
                                topicConfigWrapper.getDataVersion(),
                                channel,
                                haServerAddr));
                if (null == prevBrokerLiveInfo) {
                    log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
                }
				// 注册Broker的过滤器Server地址列表
                if (filterServerList != null) {
                    if (filterServerList.isEmpty()) {
                        this.filterServerTable.remove(brokerAddr);
                    } else {
                        this.filterServerTable.put(brokerAddr, filterServerList);
                    }
                }
				// 如果是从节点,设置其主节点
                if (MixAll.MASTER_ID != brokerId) {
                    String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                    if (masterAddr != null) {
                        BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                        if (brokerLiveInfo != null) {
                            result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
                            result.setMasterAddr(masterAddr);
                        }
                    }
                }
            } finally {
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("registerBroker Exception", e);
        }

        return result;
    }
    
    private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
        // 创建队列信息
        QueueData queueData = new QueueData();
        queueData.setBrokerName(brokerName);
        queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
        queueData.setReadQueueNums(topicConfig.getReadQueueNums());
        queueData.setPerm(topicConfig.getPerm());
        queueData.setTopicSysFlag(topicConfig.getTopicSysFlag());
		// 如果不存在该队列的信息则新建 queueDataMap 存放到 topicQueueTable
        Map<String, QueueData> queueDataMap = this.topicQueueTable.get(topicConfig.getTopicName());
        if (null == queueDataMap) {
            queueDataMap = new HashMap<>();
            queueDataMap.put(queueData.getBrokerName(), queueData);
            this.topicQueueTable.put(topicConfig.getTopicName(), queueDataMap);
            log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);
        } else {
            // 存在,直接更新替换旧的
            QueueData old = queueDataMap.put(queueData.getBrokerName(), queueData);
            if (old != null && !old.equals(queueData)) {
                log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), old,
                        queueData);
            }
        }
    }
    // ...省略部分代码
}

NameServer与Broker保持着长连接,Broker的状态信息存储在brokerLive-Table中,NameServer每收到一个心跳包,将更新brokerLiveTable中关于Broker的状态信息以及路由表(topicQueueTable、brokerAddrTable、brokerLiveTable、filterServer-Table)。更新上述路由表(HashTable)使用了锁粒度较少的读写锁,允许多个消息发送者并发读操作,保证消息发送时的高并发。同一时刻NameServer只处理一个Broker心跳包,多个心跳包请求串行执行。

三、路由删除流程分析

NameServer会每隔10s扫描一次brokerLiveTable状态表,如果BrokerLive的lastUpdateTimestamp时间戳距当前时间超过120s,则认为Broker失效,移除该Broker,关闭与Broker的连接,同时更新topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable。

RocketMQ有两个触发点来触发路由删除操作:

1、NameServer定时扫描brokerLiveTable,检测上次心跳包与当前系统时间的时间戳,如果时间戳大于120s,则需要移除该Broker信息。

2、Broker在正常关闭的情况下,会执行unregisterBroker指令。

但是不管是哪一个种方式出发的路由删除,删除方法都是一样的,都是从topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable中移除Broker相关的信息。所以分析的入口还是从我们很熟悉的org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#scanNotActiveBroker方法开始。

很简单就是每10s执行一次。逻辑也很简单,先遍历brokerLiveInfo路由表(HashMap),检测BrokerLiveInfo的LastUpdateTimestamp上次收到心跳包的时间,如果超过120s,则认为该Broker已不可用,然后将它移除并关闭连接,最后删除与该Broker相关的路由信息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
public int scanNotActiveBroker() {
    int removeCount = 0;
    Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, BrokerLiveInfo> next = it.next();
        long last = next.getValue().getLastUpdateTimestamp();
        // 如果收到broker上一次心跳包的时间小于120秒,则移除该broker的信息
        if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
            RemotingUtil.closeChannel(next.getValue().getChannel());
            it.remove();
            log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
            // 关闭连接,topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable中移除Broker相关的信息
            this.onChannelDestroy(next.getKey(), next.getValue().getChannel());

            removeCount++;
        }
    }

    return removeCount;
}

org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#onChannelDestroy:

  • 获取读锁,如果Channel不为空,就遍历brokerLiveTable尝试获取使用了该Channel的Broker。最后解锁。
  • 获取写锁,根据brokerAddr从brokerLiveTable、filterServerTable中移除Broker相关的信息。
  • 维护brokerAddrTable。遍历brokerAddrTable,从BrokerData的brokerAddrs中,找到具体的Broker,从BrokerData中将其移除。如果移除后在BrokerData中不再包含其他Broker,则在brokerAddrTable中移除该brokerName对应的条目。
  • 维护clusterAddrTable,也是遍历。找到Broker并将其从集群中基础。如果移除后,集群不包含任何Broker,则将该集群从clusterAddrTable中移除。
  • 维护topicQueueTable,遍历所有主题的队列,如果队列中包含要删除的Broker的队列,则移除,如果Topic只包含待移除Broker的队列,则从topicQueueTable删除该Topic
  • 释放写锁,完成路由删除操作。
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
public void onChannelDestroy(String remoteAddr, Channel channel) {
    String brokerAddrFound = null;
    if (channel != null) {
        try {
            try {
                // 获取读锁
                this.lock.readLock().lockInterruptibly();
                Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =
                        this.brokerLiveTable.entrySet().iterator();
                // 遍历brokerLiveTable
                while (itBrokerLiveTable.hasNext()) {
                    Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();
                    // 获取使用该channel的brokerAddr
                    if (entry.getValue().getChannel() == channel) {
                        brokerAddrFound = entry.getKey();
                        break;
                    }
                }
            } finally {
                this.lock.readLock().unlock();
            }
        } catch (Exception e) {
            log.error("onChannelDestroy Exception", e);
        }
    }
	// channel为空或者没有使用该channel的Broker
    if (null == brokerAddrFound) {
        brokerAddrFound = remoteAddr;
    } else {
        log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);
    }

    if (brokerAddrFound != null && brokerAddrFound.length() > 0) {

        try {
            try {
                // 申请写锁
                this.lock.writeLock().lockInterruptibly();
                // 根据brokerAddr从brokerLiveTable、filterServerTable中移除Broker相关的信息
                this.brokerLiveTable.remove(brokerAddrFound);
                this.filterServerTable.remove(brokerAddrFound);
                String brokerNameFound = null;
                boolean removeBrokerName = false;
                Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
                        this.brokerAddrTable.entrySet().iterator();
                // 遍历 brokerAddrTable
                while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
                    BrokerData brokerData = itBrokerAddrTable.next().getValue();

                    Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
                    while (it.hasNext()) {
                        Entry<Long, String> entry = it.next();
                        Long brokerId = entry.getKey();
                        String brokerAddr = entry.getValue();
                        // 移除该 brokerAddr的信息
                        if (brokerAddr.equals(brokerAddrFound)) {
                            brokerNameFound = brokerData.getBrokerName();
                            it.remove();
                            log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
                                    brokerId, brokerAddr);
                            break;
                        }
                    }

                    if (brokerData.getBrokerAddrs().isEmpty()) {
                        removeBrokerName = true;
                        itBrokerAddrTable.remove();
                        log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
                                brokerData.getBrokerName());
                    }
                }

                if (brokerNameFound != null && removeBrokerName) {
                    Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
                    // 遍历 clusterAddrTable
                    while (it.hasNext()) {
                        Entry<String, Set<String>> entry = it.next();
                        String clusterName = entry.getKey();
                        Set<String> brokerNames = entry.getValue();
                        boolean removed = brokerNames.remove(brokerNameFound);
                        if (removed) {
                            log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
                                    brokerNameFound, clusterName);
							// 成功移除Broker之后,集群不包含任何Broker,则将该集群从clusterAddrTable中移除
                            if (brokerNames.isEmpty()) {
                                log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
                                        clusterName);
                                it.remove();
                            }

                            break;
                        }
                    }
                }

                if (removeBrokerName) {
                    String finalBrokerNameFound = brokerNameFound;
                    Set<String> needRemoveTopic = new HashSet<>();
					// 遍历 topicQueueTable
                    topicQueueTable.forEach((topic, queueDataMap) -> {
                        // 移除该Broker的队列
                        QueueData old = queueDataMap.remove(finalBrokerNameFound);
                        log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",
                                topic, old);
						// 如果队列已经为空,移除该Topic
                        if (queueDataMap.size() == 0) {
                            log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",
                                    topic);
                            // 该Topic添加到待移除的集合中
                            needRemoveTopic.add(topic);
                        }
                    });
					// 移除Topic
                    needRemoveTopic.forEach(topicQueueTable::remove);
                }
            } finally {
                // 释放写锁
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("onChannelDestroy Exception", e);
        }
    }
}

四、路由发现流程分析

RocketMQ的路由发现不是实时的,当Topic的路由发生改变之后,NameServer并不会主动推送给Client。而是Client定时向NameServer拉去最新的Topic路由信息。所以是Client发请求来NameServer获取Topic信息,那么我们的分析入口就是org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest方法:RequestCode.GET_ROUTEINFO_BY_TOPIC拉取Topic路由信息

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
    private static InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

    protected final NamesrvController namesrvController;

    public DefaultRequestProcessor(NamesrvController namesrvController) {
        this.namesrvController = namesrvController;
    }

    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {

        if (ctx != null) {
            log.debug("receive request, {} {} {}",
                request.getCode(),
                RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                request);
        }


        switch (request.getCode()) {
            // ...省略 case
            case RequestCode.REGISTER_BROKER:
                // 路由注册
                Version brokerVersion = MQVersion.value2Version(request.getVersion());
                if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                    return this.registerBrokerWithFilterServer(ctx, request);
                } else {
                    return this.registerBroker(ctx, request);
                }
            case RequestCode.UNREGISTER_BROKER:
                return this.unregisterBroker(ctx, request);
            case RequestCode.GET_ROUTEINFO_BY_TOPIC:
                // 获取路由信息
                return this.getRouteInfoByTopic(ctx, request);
            case RequestCode.GET_BROKER_CLUSTER_INFO:
                return this.getBrokerClusterInfo(ctx, request);
            case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
                return this.wipeWritePermOfBroker(ctx, request);
            // ...省略 case
            default:
                break;
        }
        return null;
    }

    //...省略代码
}

org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getRouteInfoByTopic

  • 调用RouterInfoManager的pickupTopicRouteData方法,从路由表topicQueueTable、brokerAddrTable、filterServerTable中分别填充TopicRouteData中的List、List和 filterServer地址表。
  • 如果找到主题对应的路由信息并且该主题为顺序消息,则从NameServer KVConfig中获取关于顺序消息相关的配置填充路由信息。如果找不到路由信息Code,则使用TOPIC_NOT_EXISTS,表示没有找到对应的路由。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final GetRouteInfoRequestHeader requestHeader =
        (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
	// 获取Topic的路由信息 RouteInfoManager ---> pickupTopicRouteData()
    TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());

    if (topicRouteData != null) {
        if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
            String orderTopicConf =
                this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
                    requestHeader.getTopic());
            topicRouteData.setOrderTopicConf(orderTopicConf);
        }
		// 序列化topicRouteData
        byte[] content;
        Boolean standardJsonOnly = requestHeader.getAcceptStandardJsonOnly();
        if (request.getVersion() >= Version.V4_9_4.ordinal() || (null != standardJsonOnly && standardJsonOnly)) {
            content = topicRouteData.encode(SerializerFeature.BrowserCompatible,
                SerializerFeature.QuoteFieldNames, SerializerFeature.SkipTransientField,
                SerializerFeature.MapSortField);
        } else {
            content = RemotingSerializable.encode(topicRouteData);
        }

        response.setBody(content);
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }

    response.setCode(ResponseCode.TOPIC_NOT_EXIST);
    response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
        + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
    return response;
}

TopicRouteData:

  • orderTopicConf:顺序消息配置内容,来自kvConfig
  • queueDatas:topic队列元数据
  • brokerDatas:topic分布的broker元数据

五、总结

本文从NameServer维护的路由元信息到路由注册、删除和发现都带大家了解了一遍。接下来是关于RocketMQ消息发送的文章了。

Licensed under CC BY-NC-SA 4.0