一、NameServer的架构设计
Broker启动的时候会向所有的NameServer注册,生产者在发送消息时会先从NameServer中获取Broker消息服务器的地址列表,根据负载均衡算法选取一台Broker消息服务器发送消息。NameServer与每台Broker之间保持着长连接,并且每隔10秒会检查Broker是否存活,如果检测到Broker超过120秒未发送心跳,则从路由注册表中将该Broker移除。
但是路由的变化不会马上通知消息生产者,这是为了降低NameServe的复杂性,所以在RocketMQ中需要消息的发送端提供容错机制来保证消息发送的高可用性,这在后续关于RocketMQ消息发送的章节会介绍。
二、从源码来看NameServer的启动流程
2.1 源码获取
Github上的仓库地址:https://github.com/apache/rocketmq ,如果访问速度较慢的去Gitee上获取源码,这里也把Gitee的地址贴上:https://gitee.com/apache/rocketmq
这里我使用的RocketMQ版本是4.9.4。
2.2 Window上使用IDEA运行RocketMQ
因为是在学习阶段,所以把RokcetMQ的代码拉到本地,这里我使用IDEA运行调试代码,在学习的时候也可以加上自己的注释。
1、把RocketMQ的代码拉到本地之后,切换分支到自己想要用的分支即可,这里我是切换到release-4.9.4
2、使用IDEA打开rocketmq
,等IDEA构建完成之后,目录如下。
3、图里的ROCKETMQ文件夹是我创建,主要有conf文件夹,并且将distribution
目录下的conf目录下的内容复制过来。
修改broker.conf的内容:其中一些路径自行修改
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
namesrvAddr=127.0.0.1:9876
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 存储路径E:\java\source\rocketmq\ROCKETMQ
storePathRootDir=E:\\java\\source\\rocketmq\\ROCKETMQ\\store
# CommitLog存储路径
storePathCommitLog=E:\\java\\source\\rocketmq\\ROCKETMQ\\store\\commitlog
# 消费队列存储路径
storePathConsumeQueue=E:\\java\\source\\rocketmq\\ROCKETMQ\\store\\consumequeue
# 消息索引存储路径
storePathIndex=E:\\java\\source\\rocketmq\\ROCKETMQ\\store\\index
# checkpoint文件存储路径
storeCheckpoint=E:\\java\\source\\rocketmq\\ROCKETMQ\\store\\checkpoint
# abort文件存储路径
abortFile=E:\\java\\source\\rocketmq\\ROCKETMQ\\store\\abort
|
还有三个logback日志文件配置:logback_broker.xml、logback_namesrv.xml、logback_tools.xml。这里我主要的修改是将${USER_HOME}
改成${ROCKETMQ_HOME}
。在Window下这些日志会被存放到C盘,这里我通过在启动Broker和NameServer时指定一个环境变量:ROCKETMQ_HOME
,也就是我创建的ROCKETMQ文件夹,主要是学习调试的时候方便查看日志。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
<appender name="DefaultAppender"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${ROCKETMQ_HOME}/logs/rocketmqlogs/${brokerLogDir}/broker_default.log</file>
<append>true</append>
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
<fileNamePattern>${ROCKETMQ_HOME}/logs/rocketmqlogs/otherdays/${brokerLogDir}/broker_default.%i.log.gz</fileNamePattern>
<minIndex>1</minIndex>
<maxIndex>10</maxIndex>
</rollingPolicy>
<triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>100MB</maxFileSize>
</triggeringPolicy>
<encoder>
<pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>
<charset class="java.nio.charset.Charset">UTF-8</charset>
</encoder>
</appender>
|
IDEA中启动Broker和NameServer时执行的环境变量如下图所示:
Broker启动时候指定配置文件:
4、当NameServer和Broker启动之后,便可去example文件的quickstart包运行Consumer和Producer测试了。
2.2 启动流程
NameServer的启动类是org.apache.rocketmq.namesrv.NamesrvStartup#main0(String args)
。如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
public static void main(String[] args) {
main0(args);
}
public static NamesrvController main0(String[] args) {
try {
// 创建NamesrvController
NamesrvController controller = createNamesrvController(args);
// 启动
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
|
第一步
createNamesrvController方案接收参数解析配置文件,配置NamesrvConfig和NettyServerConfig的属性。
对于配置的输入有两种:1、通过 -c 配置文件的路径
。2、使用 --属性名 属性值
命令输入,例如:--listenPort 9876
。
如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
// 创建NamesrvConfig
final NamesrvConfig namesrvConfig = new NamesrvConfig();
// 创建NettyServerConfig
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);
// -c 指定配置文件
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
// ....省略
return controller;
}
|
在创建NamesrvConfig时属性读取:
- rocketmqHome:RocketMQ的主目录,可以通过启动参数
-Drocketmq.home.dir=路径
或者设置环境变量ROCKETMQ_HOME来配置RocketMQ的主目录。
- kvConfigPath:NameServer存储KV配置属性的持久化路径
- configStorePath:NameServer默认配置文件路径。NameServer启动时如果需要通过配置文件配置NameServer启动属性使用
-c
指定。
- orderMessageEnable:是否支持顺序消息,默认不支持。
如下:
1
2
3
4
5
6
7
8
9
10
11
12
|
public class NamesrvConfig {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
private String productEnvName = "center";
private boolean clusterTest = false;
private boolean orderMessageEnable = false;
// ...setter getter
}
|
在创建NettyServerConfig时属性读取:
如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public class NettyServerConfig implements Cloneable {
private int listenPort = 8888;
private int serverWorkerThreads = 8;
private int serverCallbackExecutorThreads = 0;
private int serverSelectorThreads = 3;
private int serverOnewaySemaphoreValue = 256;
private int serverAsyncSemaphoreValue = 64;
private int serverChannelMaxIdleTimeSeconds = 120;
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
private int writeBufferHighWaterMark = NettySystemConfig.writeBufferHighWaterMark;
private int writeBufferLowWaterMark = NettySystemConfig.writeBufferLowWaterMark;
private int serverSocketBacklog = NettySystemConfig.socketBacklog;
private boolean serverPooledByteBufAllocatorEnable = true;
// ....setter getter
}
|
- listenPort:NameServer监听端口,该值默认会被初始化为9876,在
createNamesrvController
方法是会重新指定端口。
- serverWorkerThreads:Netty业务线程池线程个数。
- serverCallbackExecutorThreads:Netty public任务线程池线程个数。Netty网络会根据业务类型创建不同的线程池,比如处理消息发送、消息消费、心跳检测等。如果该业务类型(RequestCode)未注册线程池,则由public线程池执行。
- serverSelectorThreads:I/O线程池线程个数,主要是NameServer、Broker端解析请求、返回相应的线程个数。这类线程主要用于处理网络请求,先解析请求包,然后转发到各个业务线程池完成具体的业务操作,最后将结果返回给调用方。
- serverOnewaySemaphoreValue:send oneway消息请求的并发度(Broker端参数)。
- serverAsyncSemaphoreValue:异步消息发送的最大并发度(Broker端参数)。
- serverChannelMaxIdleTimeSeconds:网络连接最大空闲时间,默认为120s。如果连接空闲时间超过该参数设置的值,连接将被关闭。
- serverSocketSndBufSize:网络socket发送缓存区大小,默认为64KB。
- servrSocketRcvBufSize:网络socket接收缓存区大小,默认为64KB。
- serverPooledByteBufAllocatorEnable:ByteBuffer是否开启缓存,建议开启。
- useEpollNativeSelector:是否启用Epoll I/O模型,Linux环境下建议开启。
这些属性,可以通过启动参数:--属性名 属性值
来指定。
第二步
根据启动属性创建NamesrvController实列并初始化,NamesrvController实例时NameServer的核心控制器。org.apache.rocketmq.namesrv.NamesrvStartup#main0
调用org.apache.rocketmq.namesrv.NamesrvStartup#start
,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
// 初始化 NamesrvController
boolean initResult = controller.initialize();
// 初始化失败,退出
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
// 注册JVM关闭钩子
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {
controller.shutdown();
return null;
}));
// 启动 NamesrvController
controller.start();
return controller;
}
|
关于注册JVM关闭钩子,这是很常见的用法,确保在关闭JVM的时候,先将线程池关闭,释放资源。
这里我们看一下org.apache.rocketmq.namesrv.NamesrvController#initialize
方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
public boolean initialize() {
// 加载KV配置
this.kvConfigManager.load();
// 创建NettyServer
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
this.registerProcessor();
// 每10秒扫描brokerLiveTable
this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.SECONDS);
// 每10分钟打印KV配置
this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically, 1, 10, TimeUnit.MINUTES);
// ... 省略tls相关代码
return true;
}
|
这里会启动两个定时任务:
- NameServer每隔10秒扫描一次brokerLiveTable,移除处于未激活状态的Broker。
org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#scanNotActiveBroker
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
public int scanNotActiveBroker() {
int removeCount = 0;
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
// 如果收到broker上一次心跳包的时间小于120秒,则移除该broker的信息
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
// 关闭连接
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
removeCount++;
}
}
return removeCount;
}
|
- 另外一个定时任务是,每隔10分钟打印一次KV配置。
三、小结
关于NameServer的启动流程先简单的介绍到这里,从源码上了解到了关于NameServer的一些启动参数及其启动流程。在最后的scanNotActiveBroker
方法,有一个类RouteInfoManager
其功能是NameServer的路由信息管理类,下一篇文章将慢慢地了解其原理,以及在本文所说的路由信息到底又有什么内容。