系列
- RocketMq 訂閱分組創(chuàng)建和刪除
- RocketMq Topic創(chuàng)建和刪除
- RocketMq Topic權(quán)限變更
- RocketMq MQAdmin啟動過程
開篇
- 這個(gè)系列主要用以分析mqadmin常見的比較核心的幾個(gè)命令,主要包括訂閱分組和topic的創(chuàng)建和刪除凌停、Topic的權(quán)限變更速勇、MQAdmin的啟動過程。
- 這篇文章主要是用來分析MQAdmin的啟動過程,核心在于namesrv地址的傳遞以及對應(yīng)的通信Channel的創(chuàng)建。
MQAdmin啟動過后
MQAdmin的啟動過程核心邏輯如下
- 注冊各類mqadmin對應(yīng)的命令和處理函數(shù)。
- 通過System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr)在當(dāng)前執(zhí)行環(huán)境中保存rocketmq集群的namesrv地址痪枫,所有命令執(zhí)行都需要namesrv地址垛吗。
- MQAdmin啟動過程中創(chuàng)建DefaultMQAdminExt對象過程中會通過System.getProperty方法獲取namesrv的地址創(chuàng)建Channel對象凹髓。
MQAdminStartup
public class MQAdminStartup {
protected static List<SubCommand> subCommandList = new ArrayList<SubCommand>();
private static String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,
System.getenv(MixAll.ROCKETMQ_HOME_ENV));
public static void main(String[] args) {
main0(args, null);
}
public static void main0(String[] args, RPCHook rpcHook) {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
// 注冊所有的命令行
initCommand();
try {
initLogback();
switch (args.length) {
case 0:
printHelp();
break;
case 2:
if (args[0].equals("help")) {
SubCommand cmd = findSubCommand(args[1]);
if (cmd != null) {
Options options = ServerUtil.buildCommandlineOptions(new Options());
options = cmd.buildCommandlineOptions(options);
if (options != null) {
ServerUtil.printCommandLineHelp("mqadmin " + cmd.commandName(), options);
}
} else {
System.out.printf("The sub command %s not exist.%n", args[1]);
}
break;
}
case 1:
default:
SubCommand cmd = findSubCommand(args[0]);
if (cmd != null) {
String[] subargs = parseSubArgs(args);
Options options = ServerUtil.buildCommandlineOptions(new Options());
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options),
new PosixParser());
if (null == commandLine) {
return;
}
// mqadmin啟動的時(shí)候通過-n 參數(shù)指定namesrvAddr
// 啟動的時(shí)候會在系統(tǒng)變量中寫入"rocketmq.namesrv.addr"
if (commandLine.hasOption('n')) {
String namesrvAddr = commandLine.getOptionValue('n');
System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr);
}
cmd.execute(commandLine, options, AclUtils.getAclRPCHook(rocketmqHome + MixAll.ACL_CONF_TOOLS_FILE));
} else {
System.out.printf("The sub command %s not exist.%n", args[0]);
}
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
// 注冊各類mqadmin對應(yīng)的命令和處理函數(shù)
public static void initCommand() {
initCommand(new UpdateTopicSubCommand());
initCommand(new DeleteTopicSubCommand());
initCommand(new UpdateSubGroupSubCommand());
initCommand(new DeleteSubscriptionGroupCommand());
initCommand(new UpdateBrokerConfigSubCommand());
initCommand(new UpdateTopicPermSubCommand());
}
}
- 注冊各類mqadmin對應(yīng)的命令和處理函數(shù)。
- System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr)啟動的時(shí)候會在系統(tǒng)變量中寫入"rocketmq.namesrv.addr"怯屉。
DefaultMQAdminExt
public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
private final DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private String adminExtGroup = "admin_ext_group";
private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
private long timeoutMillis = 5000;
// 構(gòu)造函數(shù)初始化DefaultMQAdminExtImpl對象
public DefaultMQAdminExt(RPCHook rpcHook) {
this.defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(this, rpcHook, timeoutMillis);
}
}
public class ClientConfig {
public static final String SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY = "com.rocketmq.sendMessageWithVIPChannel";
// ClientConfig的namesrvAddr通過NameServerAddressUtils.getNameServerAddresses()獲取
private String namesrvAddr = NameServerAddressUtils.getNameServerAddresses();
private String clientIP = RemotingUtil.getLocalAddress();
private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();
protected String namespace;
protected AccessChannel accessChannel = AccessChannel.LOCAL;
}
public class NameServerAddressUtils {
public static final String INSTANCE_PREFIX = "MQ_INST_";
public static final String INSTANCE_REGEX = INSTANCE_PREFIX + "\\w+_\\w+";
public static final String ENDPOINT_PREFIX = "http://";
public static final Pattern NAMESRV_ENDPOINT_PATTERN = Pattern.compile("^" + ENDPOINT_PREFIX + ".*");
public static final Pattern INST_ENDPOINT_PATTERN = Pattern.compile("^" + ENDPOINT_PREFIX + INSTANCE_REGEX + "\\..*");
// 負(fù)責(zé)通過System.getProperty獲取namesrv的地址
public static String getNameServerAddresses() {
// NAMESRV_ADDR_PROPERTY = "rocketmq.namesrv.addr";
return System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
}
}
- DefaultMQAdminExt的父類ClientConfig的namesrvAddr是從System.getProperty("rocketmq.namesrv.addr")獲取的namesrvAddr地址蔚舀。
- 創(chuàng)建DefaultMQAdminExtImpl的對象過程中會把DefaultMQAdminExt當(dāng)作參數(shù)傳遞,該參數(shù)同時(shí)是ClientConfig對象锨络。
- ClientConfig的對象包含namesrv地址信息赌躺。
DefaultMQAdminExtImpl
public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
@Override
public void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.defaultMQAdminExt.changeInstanceNameToPID();
// getOrCreateMQClientInstance會創(chuàng)建mqClientInstance
this.mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQAdminExt, rpcHook);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The AdminExt service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);
default:
break;
}
}
}
- DefaultMQAdminExtImpl內(nèi)部通過MQClientManager創(chuàng)建mqClientInstance對象。
MQClientManager
public class MQClientManager {
private final static InternalLogger log = ClientLogger.getLog();
private static MQClientManager instance = new MQClientManager();
private AtomicInteger factoryIndexGenerator = new AtomicInteger();
private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable =
new ConcurrentHashMap<String, MQClientInstance>();
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
// 通過ClientConfig創(chuàng)建MQClientInstance對象
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}
}
- MQClientManager內(nèi)部創(chuàng)建MQClientInstance對象羡儿。
MQClientInstance
public class MQClientInstance {
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
this.clientConfig = clientConfig;
this.instanceIndex = instanceIndex;
this.nettyClientConfig = new NettyClientConfig();
this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
this.clientRemotingProcessor = new ClientRemotingProcessor(this);
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
// clientConfig.getNamesrvAddr() 在這里不為空
if (this.clientConfig.getNamesrvAddr() != null) {
// mQClientAPIImpl的updateNameServerAddressList更新namesrv地址
this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
}
}
- clientConfig.getNamesrvAddr()不為空寿谴,會更新mQClientAPIImpl的namesrv的地址。
MQClientAPIImpl
public class MQClientAPIImpl {
private final static InternalLogger log = ClientLogger.getLog();
private static boolean sendSmartMsg =
Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg", "true"));
static {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
}
private final RemotingClient remotingClient;
private final TopAddressing topAddressing;
private final ClientRemotingProcessor clientRemotingProcessor;
private String nameSrvAddr = null;
private ClientConfig clientConfig;
public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
final ClientRemotingProcessor clientRemotingProcessor,
RPCHook rpcHook, final ClientConfig clientConfig) {
this.clientConfig = clientConfig;
topAddressing = new TopAddressing(MixAll.getWSAddr(), clientConfig.getUnitName());
this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);
this.clientRemotingProcessor = clientRemotingProcessor;
}
public void updateNameServerAddressList(final String addrs) {
String[] addrArray = addrs.split(";");
List<String> list = Arrays.asList(addrArray);
// 更新remotingClient的namesrv的地址
this.remotingClient.updateNameServerAddressList(list);
}
}
- updateNameServerAddressList會更新remotingClient的namesrv的地址失受。
NettyRemotingClient
public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
// 負(fù)責(zé)保存namesrv的地址
private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>();
public void updateNameServerAddressList(List<String> addrs) {
List<String> old = this.namesrvAddrList.get();
boolean update = false;
if (!addrs.isEmpty()) {
if (null == old) {
update = true;
} else if (addrs.size() != old.size()) {
update = true;
} else {
for (int i = 0; i < addrs.size() && !update; i++) {
if (!old.contains(addrs.get(i))) {
update = true;
}
}
}
if (update) {
Collections.shuffle(addrs);
log.info("name server address updated. NEW : {} , OLD: {}", addrs, old);
this.namesrvAddrList.set(addrs);
}
}
}
private Channel getAndCreateNameserverChannel() throws RemotingConnectException, InterruptedException {
String addr = this.namesrvAddrChoosed.get();
if (addr != null) {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
}
// 創(chuàng)建NameserverChannel的時(shí)候會取namesrv地址
final List<String> addrList = this.namesrvAddrList.get();
if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
addr = this.namesrvAddrChoosed.get();
if (addr != null) {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
}
if (addrList != null && !addrList.isEmpty()) {
for (int i = 0; i < addrList.size(); i++) {
int index = this.namesrvIndex.incrementAndGet();
index = Math.abs(index);
index = index % addrList.size();
String newAddr = addrList.get(index);
this.namesrvAddrChoosed.set(newAddr);
Channel channelNew = this.createChannel(newAddr);
if (channelNew != null) {
return channelNew;
}
}
throw new RemotingConnectException(addrList.toString());
}
} finally {
this.lockNamesrvChannel.unlock();
}
}
return null;
}
}
- NettyRemotingClient的namesrvAddrList保存namesrv的地址讶泰。
- NettyRemotingClient的getAndCreateNameserverChannel方法獲取namesrvAddrList的地址創(chuàng)建namesrv的通信channel。