一、NameServer的架构设计
Broker启动的时候会向所有的NameServer注册,生产者在发送消息时会先从NameServer中获取Broker消息服务器的地址列表,根据负载均衡算法选取一台Broker消息服务器发送消息。NameServer与每台Broker之间保持着长连接,并且每隔10秒会检查Broker是否存活,如果检测到Broker超过120秒未发送心跳,则从路由注册表中将该Broker移除。
但是路由的变化不会马上通知消息生产者,这是为了降低NameServe的复杂性,所以在RocketMQ中需要消息的发送端提供容错机制来保证消息发送的高可用性,这在后续关于RocketMQ消息发送的章节会介绍。
二、从源码来看NameServer的启动流程
2.1 源码获取
Github上的仓库地址:https://github.com/apache/rocketmq ,如果访问速度较慢的去Gitee上获取源码,这里也把Gitee的地址贴上:https://gitee.com/apache/rocketmq
这里我使用的RocketMQ版本是4.9.4。
2.2 Window上使用IDEA运行RocketMQ
因为是在学习阶段,所以把RokcetMQ的代码拉到本地,这里我使用IDEA运行调试代码,在学习的时候也可以加上自己的注释。
1、把RocketMQ的代码拉到本地之后,切换分支到自己想要用的分支即可,这里我是切换到release-4.9.4
2、使用IDEA打开rocketmq
,等IDEA构建完成之后,目录如下。
3、图里的ROCKETMQ文件夹是我创建,主要有conf文件夹,并且将distribution
目录下的conf目录下的内容复制过来。
修改broker.conf的内容:其中一些路径自行修改
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
namesrvAddr=127.0.0.1:9876
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 存储路径E:\java\source\rocketmq\ROCKETMQ
storePathRootDir=E:\\java\\source\\rocketmq\\ROCKETMQ\\store
# CommitLog存储路径
storePathCommitLog=E:\\java\\source\\rocketmq\\ROCKETMQ\\store\\commitlog
# 消费队列存储路径
storePathConsumeQueue=E:\\java\\source\\rocketmq\\ROCKETMQ\\store\\consumequeue
# 消息索引存储路径
storePathIndex=E:\\java\\source\\rocketmq\\ROCKETMQ\\store\\index
# checkpoint文件存储路径
storeCheckpoint=E:\\java\\source\\rocketmq\\ROCKETMQ\\store\\checkpoint
# abort文件存储路径
abortFile=E:\\java\\source\\rocketmq\\ROCKETMQ\\store\\abort
Copy 还有三个logback日志文件配置:logback_broker.xml、logback_namesrv.xml、logback_tools.xml。这里我主要的修改是将${USER_HOME}
改成${ROCKETMQ_HOME}
。在Window下这些日志会被存放到C盘,这里我通过在启动Broker和NameServer时指定一个环境变量:ROCKETMQ_HOME
,也就是我创建的ROCKETMQ文件夹,主要是学习调试的时候方便查看日志。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<appender name= "DefaultAppender"
class= "ch.qos.logback.core.rolling.RollingFileAppender" >
<file> ${ROCKETMQ_HOME}/logs/rocketmqlogs/${brokerLogDir}/broker_default.log</file>
<append> true</append>
<rollingPolicy class= "ch.qos.logback.core.rolling.FixedWindowRollingPolicy" >
<fileNamePattern> ${ROCKETMQ_HOME}/logs/rocketmqlogs/otherdays/${brokerLogDir}/broker_default.%i.log.gz</fileNamePattern>
<minIndex> 1</minIndex>
<maxIndex> 10</maxIndex>
</rollingPolicy>
<triggeringPolicy class= "ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy" >
<maxFileSize> 100MB</maxFileSize>
</triggeringPolicy>
<encoder>
<pattern> %d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>
<charset class= "java.nio.charset.Charset" > UTF-8</charset>
</encoder>
</appender>
Copy IDEA中启动Broker和NameServer时执行的环境变量如下图所示:
Broker启动时候指定配置文件:
4、当NameServer和Broker启动之后,便可去example文件的quickstart包运行Consumer和Producer测试了。
2.2 启动流程
NameServer的启动类是org.apache.rocketmq.namesrv.NamesrvStartup#main0(String args)
。如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void main ( String [] args ) {
main0 ( args );
}
public static NamesrvController main0 ( String [] args ) {
try {
// 创建NamesrvController
NamesrvController controller = createNamesrvController ( args );
// 启动
start ( controller );
String tip = "The Name Server boot success. serializeType=" + RemotingCommand . getSerializeTypeConfigInThisServer ();
log . info ( tip );
System . out . printf ( "%s%n" , tip );
return controller ;
} catch ( Throwable e ) {
e . printStackTrace ();
System . exit ( - 1 );
}
return null ;
}
Copy 第一步
createNamesrvController方案接收参数解析配置文件,配置NamesrvConfig和NettyServerConfig的属性。
对于配置的输入有两种:1、通过 -c 配置文件的路径
。2、使用 --属性名 属性值
命令输入,例如:--listenPort 9876
。
如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public static NamesrvController createNamesrvController ( String [] args ) throws IOException , JoranException {
System . setProperty ( RemotingCommand . REMOTING_VERSION_KEY , Integer . toString ( MQVersion . CURRENT_VERSION ));
//PackageConflictDetect.detectFastjson();
Options options = ServerUtil . buildCommandlineOptions ( new Options ());
commandLine = ServerUtil . parseCmdLine ( "mqnamesrv" , args , buildCommandlineOptions ( options ), new PosixParser ());
if ( null == commandLine ) {
System . exit ( - 1 );
return null ;
}
// 创建NamesrvConfig
final NamesrvConfig namesrvConfig = new NamesrvConfig ();
// 创建NettyServerConfig
final NettyServerConfig nettyServerConfig = new NettyServerConfig ();
nettyServerConfig . setListenPort ( 9876 );
// -c 指定配置文件
if ( commandLine . hasOption ( 'c' )) {
String file = commandLine . getOptionValue ( 'c' );
if ( file != null ) {
InputStream in = new BufferedInputStream ( new FileInputStream ( file ));
properties = new Properties ();
properties . load ( in );
MixAll . properties2Object ( properties , namesrvConfig );
MixAll . properties2Object ( properties , nettyServerConfig );
namesrvConfig . setConfigStorePath ( file );
System . out . printf ( "load config properties file OK, %s%n" , file );
in . close ();
}
}
// ....省略
return controller ;
}
Copy 在创建NamesrvConfig时属性读取:
rocketmqHome:RocketMQ的主目录,可以通过启动参数-Drocketmq.home.dir=路径
或者设置环境变量ROCKETMQ_HOME来配置RocketMQ的主目录。
kvConfigPath:NameServer存储KV配置属性的持久化路径
configStorePath:NameServer默认配置文件路径。NameServer启动时如果需要通过配置文件配置NameServer启动属性使用-c
指定。
orderMessageEnable:是否支持顺序消息,默认不支持。
如下:
1
2
3
4
5
6
7
8
9
10
11
12
public class NamesrvConfig {
private static final InternalLogger log = InternalLoggerFactory . getLogger ( LoggerName . NAMESRV_LOGGER_NAME );
private String rocketmqHome = System . getProperty ( MixAll . ROCKETMQ_HOME_PROPERTY , System . getenv ( MixAll . ROCKETMQ_HOME_ENV ));
private String kvConfigPath = System . getProperty ( "user.home" ) + File . separator + "namesrv" + File . separator + "kvConfig.json" ;
private String configStorePath = System . getProperty ( "user.home" ) + File . separator + "namesrv" + File . separator + "namesrv.properties" ;
private String productEnvName = "center" ;
private boolean clusterTest = false ;
private boolean orderMessageEnable = false ;
// ...setter getter
}
Copy 在创建NettyServerConfig时属性读取:
如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class NettyServerConfig implements Cloneable {
private int listenPort = 8888 ;
private int serverWorkerThreads = 8 ;
private int serverCallbackExecutorThreads = 0 ;
private int serverSelectorThreads = 3 ;
private int serverOnewaySemaphoreValue = 256 ;
private int serverAsyncSemaphoreValue = 64 ;
private int serverChannelMaxIdleTimeSeconds = 120 ;
private int serverSocketSndBufSize = NettySystemConfig . socketSndbufSize ;
private int serverSocketRcvBufSize = NettySystemConfig . socketRcvbufSize ;
private int writeBufferHighWaterMark = NettySystemConfig . writeBufferHighWaterMark ;
private int writeBufferLowWaterMark = NettySystemConfig . writeBufferLowWaterMark ;
private int serverSocketBacklog = NettySystemConfig . socketBacklog ;
private boolean serverPooledByteBufAllocatorEnable = true ;
// ....setter getter
}
Copy
listenPort:NameServer监听端口,该值默认会被初始化为9876,在createNamesrvController
方法是会重新指定端口。
serverWorkerThreads:Netty业务线程池线程个数。
serverCallbackExecutorThreads:Netty public任务线程池线程个数。Netty网络会根据业务类型创建不同的线程池,比如处理消息发送、消息消费、心跳检测等。如果该业务类型(RequestCode)未注册线程池,则由public线程池执行。
serverSelectorThreads:I/O线程池线程个数,主要是NameServer、Broker端解析请求、返回相应的线程个数。这类线程主要用于处理网络请求,先解析请求包,然后转发到各个业务线程池完成具体的业务操作,最后将结果返回给调用方。
serverOnewaySemaphoreValue:send oneway消息请求的并发度(Broker端参数)。
serverAsyncSemaphoreValue:异步消息发送的最大并发度(Broker端参数)。
serverChannelMaxIdleTimeSeconds:网络连接最大空闲时间,默认为120s。如果连接空闲时间超过该参数设置的值,连接将被关闭。
serverSocketSndBufSize:网络socket发送缓存区大小,默认为64KB。
servrSocketRcvBufSize:网络socket接收缓存区大小,默认为64KB。
serverPooledByteBufAllocatorEnable:ByteBuffer是否开启缓存,建议开启。
useEpollNativeSelector:是否启用Epoll I/O模型,Linux环境下建议开启。
这些属性,可以通过启动参数:--属性名 属性值
来指定。
第二步
根据启动属性创建NamesrvController实列并初始化,NamesrvController实例时NameServer的核心控制器。org.apache.rocketmq.namesrv.NamesrvStartup#main0
调用org.apache.rocketmq.namesrv.NamesrvStartup#start
,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static NamesrvController start ( final NamesrvController controller ) throws Exception {
if ( null == controller ) {
throw new IllegalArgumentException ( "NamesrvController is null" );
}
// 初始化 NamesrvController
boolean initResult = controller . initialize ();
// 初始化失败,退出
if ( ! initResult ) {
controller . shutdown ();
System . exit ( - 3 );
}
// 注册JVM关闭钩子
Runtime . getRuntime (). addShutdownHook ( new ShutdownHookThread ( log , ( Callable < Void > ) () -> {
controller . shutdown ();
return null ;
}));
// 启动 NamesrvController
controller . start ();
return controller ;
}
Copy 关于注册JVM关闭钩子,这是很常见的用法,确保在关闭JVM的时候,先将线程池关闭,释放资源。
这里我们看一下org.apache.rocketmq.namesrv.NamesrvController#initialize
方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public boolean initialize () {
// 加载KV配置
this . kvConfigManager . load ();
// 创建NettyServer
this . remotingServer = new NettyRemotingServer ( this . nettyServerConfig , this . brokerHousekeepingService );
this . remotingExecutor =
Executors . newFixedThreadPool ( nettyServerConfig . getServerWorkerThreads (), new ThreadFactoryImpl ( "RemotingExecutorThread_" ));
this . registerProcessor ();
// 每10秒扫描brokerLiveTable
this . scheduledExecutorService . scheduleAtFixedRate ( NamesrvController . this . routeInfoManager :: scanNotActiveBroker , 5 , 10 , TimeUnit . SECONDS );
// 每10分钟打印KV配置
this . scheduledExecutorService . scheduleAtFixedRate ( NamesrvController . this . kvConfigManager :: printAllPeriodically , 1 , 10 , TimeUnit . MINUTES );
// ... 省略tls相关代码
return true ;
}
Copy 这里会启动两个定时任务:
NameServer每隔10秒扫描一次brokerLiveTable,移除处于未激活状态的Broker。org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#scanNotActiveBroker
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public int scanNotActiveBroker () {
int removeCount = 0 ;
Iterator < Entry < String , BrokerLiveInfo >> it = this . brokerLiveTable . entrySet (). iterator ();
while ( it . hasNext ()) {
Entry < String , BrokerLiveInfo > next = it . next ();
long last = next . getValue (). getLastUpdateTimestamp ();
// 如果收到broker上一次心跳包的时间小于120秒,则移除该broker的信息
if (( last + BROKER_CHANNEL_EXPIRED_TIME ) < System . currentTimeMillis ()) {
RemotingUtil . closeChannel ( next . getValue (). getChannel ());
it . remove ();
log . warn ( "The broker channel expired, {} {}ms" , next . getKey (), BROKER_CHANNEL_EXPIRED_TIME );
// 关闭连接
this . onChannelDestroy ( next . getKey (), next . getValue (). getChannel ());
removeCount ++ ;
}
}
return removeCount ;
}
Copy
另外一个定时任务是,每隔10分钟打印一次KV配置。
三、小结
关于NameServer的启动流程先简单的介绍到这里,从源码上了解到了关于NameServer的一些启动参数及其启动流程。在最后的scanNotActiveBroker
方法,有一个类RouteInfoManager
其功能是NameServer的路由信息管理类,下一篇文章将慢慢地了解其原理,以及在本文所说的路由信息到底又有什么内容。