RocketMQ作為一款優(yōu)秀的中間件蜂大,應(yīng)用領(lǐng)域非常廣泛,金融蝶怔、電商奶浦、電信、醫(yī)療踢星、社科财喳、安保等不同的領(lǐng)域都有其大規(guī)模的應(yīng)用,無疑安全性很受質(zhì)疑斩狱,因?yàn)閮?nèi)部沒有安全相關(guān)的業(yè)務(wù)模塊,消息的發(fā)送和消費(fèi)得不到很好的安全管控需要業(yè)務(wù)方自己去封裝安全模塊扎瓶,無形中增加了使用成本所踊。在RocketMQ4.4.0版本升級中加入了ACL權(quán)限管控,這個功能的完善直接推動了RocketMQ在各個領(lǐng)域的推廣使用概荷,特別是金融秕岛、電商、安保等安全要求較高的領(lǐng)域误证。
1继薛、簡單使用
1.1、ACL是什么
ACL是access control list的簡稱愈捅,俗稱訪問控制列表遏考。訪問控制,基本上會涉及到用戶蓝谨、資源灌具、權(quán)限青团、角色等概念,那在RocketMQ中上述會對應(yīng)哪些對象呢咖楣?
用戶:用戶是訪問控制的基礎(chǔ)要素督笆,RocketMQ ACL必然也會引入用戶的概念,即支持用戶名诱贿、密碼娃肿。 資源:需要保護(hù)的對象,消息發(fā)送涉及的Topic珠十、消息消費(fèi)涉及的消費(fèi)組料扰,應(yīng)該進(jìn)行保護(hù),故可以抽象成資源宵睦。 權(quán)限:針對資源记罚,能進(jìn)行的操作。 角色:RocketMQ中壳嚎,只定義兩種角色:是否是管理員桐智。
1.2、RocketMQ中配置ACL
acl默認(rèn)的配置文件名:plain_acl.yml,需要放在${ROCKETMQ_HOME}/store/config目錄下
需要使用acl必須在服務(wù)端開啟此功能烟馅,在Broker的配置文件中配置说庭,aclEnable = true開啟此功能
配置plain_acl.yml文件
globalWhiteRemoteAddresses:
- 10.10.15.*
- 192.168.0.*
accounts:
- accessKey: RocketMQ
secretKey: 12345678
whiteRemoteAddress:
admin: false
defaultTopicPerm: DENY
defaultGroupPerm: SUB
topicPerms:
- topicA=DENY
- topicB=PUB|SUB
- topicC=SUB
groupPerms:
# the group should convert to retry topic
- groupA=DENY
- groupB=PUB|SUB
- groupC=SUB
- accessKey: rocketmq2
secretKey: 12345678
whiteRemoteAddress: 192.168.1.*
# if it is admin, it could access all resources
admin: true
下面我們介紹一下plain_acl.yml文件中相關(guān)的參數(shù)含義及使用
字段 | 取值 | 含義 | |
---|---|---|---|
globalWhiteRemoteAddresses | ;192.168..*;192.168.0.1 | 全局IP白名單 | |
accessKey | 字符串 | Access Key 用戶名 | |
secretKey | 字符串 | Secret Key 密碼 | |
whiteRemoteAddress | ;192.168..*;192.168.0.1 | 用戶IP白名單 | |
admin | true;false | 是否管理員賬戶 | |
defaultTopicPerm | DENY;PUB;SUB;PUB | SUB | 默認(rèn)的Topic權(quán)限 |
defaultGroupPerm | DENY;PUB;SUB;PUB | SUB | 默認(rèn)的ConsumerGroup權(quán)限 |
topicPerms | topic=權(quán)限 | 各個Topic的權(quán)限 | |
groupPerms | group=權(quán)限 | 各個ConsumerGroup的權(quán)限 |
權(quán)限標(biāo)識符的含義
權(quán)限 | 含義 |
---|---|
DENY | 拒絕 |
ANY | PUB 或者 SUB 權(quán)限 |
PUB | 發(fā)送權(quán)限 |
SUB | 訂閱權(quán)限 |
處理流程
特殊的請求例如 UPDATE_AND_CREATE_TOPIC 等,只能由 admin 賬戶進(jìn)行操作郑趁;
對于某個資源刊驴,如果有顯性配置權(quán)限,則采用配置的權(quán)限寡润;如果沒有顯性配置權(quán)限捆憎,則采用默認(rèn)的權(quán)限
RocketMQ的權(quán)限控制存儲的默認(rèn)實(shí)現(xiàn)是基于yml配置文件。用戶可以動態(tài)修改權(quán)限控制定義的屬性梭纹,而不需重新啟動Broker服務(wù)節(jié)點(diǎn)
如果ACL與高可用部署(Master/Slave架構(gòu))同時啟用躲惰,那么需要在Broker Master節(jié)點(diǎn)的${ROCKETMQ_HOME}/store/conf/plain_acl.yml配置文件中 設(shè)置全局白名單信息,即為將Slave節(jié)點(diǎn)的ip地址設(shè)置至Master節(jié)點(diǎn)plain_acl.yml配置文件的全局白名單中
1.3变抽、代碼示例
1.3.1础拨、生產(chǎn)者代碼
public class AclProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name", getAclRPCHook());
producer.setNamesrvAddr("10.10.15.246:9876;10.10.15.247:9876");
producer.start();
for (int i = 0; i < 10; i++) {
try {
Message msg = new Message("topicA" ,"TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("RocketMQ","12345678"));
}
}
查看結(jié)果
報錯提示topicA沒有權(quán)限,我們在plain_acl.yml文件中配置的也確實(shí)是RocketMQ用戶拒絕绍载,生產(chǎn)消費(fèi)topicA主題信息诡宗,我們改變主題為topicB,則發(fā)現(xiàn)發(fā)送消息成功,topicB=PUB|SUB設(shè)置的權(quán)限是生產(chǎn)消費(fèi)都可以击儡。
查看結(jié)果
1.3.2塔沃、消費(fèi)者代碼
public class AclConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("groupA", getAclRPCHook(),new AllocateMessageQueueAveragely());
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("topicB", "*");
consumer.setNamesrvAddr("10.10.15.246:9876;10.10.15.247:9876");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("RocketMQ","12345678"));
}
}
查看結(jié)果:發(fā)現(xiàn)沒有任何消息被消費(fèi),也沒有報錯信息曙痘,對于RocketMQ用戶topicB設(shè)置的就是可以可以生產(chǎn)可以消費(fèi)的芳悲,但是我們發(fā)現(xiàn)其groupA=DENY是拒絕的立肘,說明消費(fèi)組是groupA則拒絕消費(fèi)任何消息,我們改成groupB或者groupC查看結(jié)果名扛。
2谅年、源碼分析
Broker端ACL原理圖
2.1、Broker初始化時ACL相關(guān)操作
Broker服務(wù)啟動時創(chuàng)建BrokerController并初始化initialize()時調(diào)用acl相關(guān)的初始化方法initialAcl()
private void initialAcl() {
//broker配置文件中是否開啟ACL功能肮韧,默認(rèn)關(guān)閉
if (!this.brokerConfig.isAclEnable()) {
log.info("The broker dose not enable acl");
return;
}
//獲取權(quán)限訪問校驗(yàn)器的列表融蹂,加載的META-INF/service/org.apache.rocketmq.acl.AccessValidator文件中指向
//org.apache.rocketmq.acl.plain.PlainAccessValidator,默認(rèn)只有一個
List<AccessValidator> accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);
if (accessValidators == null || accessValidators.isEmpty()) {
log.info("The broker dose not load the AccessValidator");
return;
}
for (AccessValidator accessValidator: accessValidators) {
final AccessValidator validator = accessValidator;
//注冊服務(wù)端就的“鉤子”對象弄企,對權(quán)限進(jìn)行校驗(yàn)
this.registerServerRPCHook(new RPCHook() {
@Override
public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
//Do not catch the exception
validator.validate(validator.parse(request, remoteAddr));
}
@Override
public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
}
});
}
}
源碼中有相關(guān)的注解超燃,我們查看一下注冊registerServerRPCHook方法
public void registerServerRPCHook(RPCHook rpcHook) {
//服務(wù)端的NettyRemotingServer服務(wù)注冊“鉤子”函數(shù)
getRemotingServer().registerRPCHook(rpcHook);
this.fastRemotingServer.registerRPCHook(rpcHook);
}
關(guān)于NettyRemotingServer服務(wù)和NettyRemotingClient服務(wù)配合使用,后面章節(jié)RocketMQ Remoting會重點(diǎn)分析
2.2拘领、 PlainAccessValidator權(quán)限驗(yàn)證器
PlainAccessValidator.parse()意乓,根據(jù)客戶端不同的請求Code其需要的檢驗(yàn)資源也不一樣
switch (request.getCode()) {
//發(fā)送消息需要校驗(yàn)當(dāng)前的賬戶的topic是否具有PUB權(quán)限
case RequestCode.SEND_MESSAGE:
accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.PUB);
break;
case RequestCode.SEND_MESSAGE_V2:
accessResource.addResourceAndPerm(request.getExtFields().get("b"), Permission.PUB);
break;
case RequestCode.CONSUMER_SEND_MSG_BACK:
accessResource.addResourceAndPerm(request.getExtFields().get("originTopic"), Permission.PUB);
accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("group")), Permission.SUB);
break;
//拉取消息時需要知道該consumer賬戶下拉取的topic是否具有SUB權(quán)限,并且還要知道訂閱組consumerGroup是否有sub權(quán)限
case RequestCode.PULL_MESSAGE:
accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB);
accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("consumerGroup")), Permission.SUB);
break;
case RequestCode.QUERY_MESSAGE:
accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB);
break;
case RequestCode.HEART_BEAT:
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
accessResource.addResourceAndPerm(getRetryTopic(data.getGroupName()), Permission.SUB);
for (SubscriptionData subscriptionData : data.getSubscriptionDataSet()) {
accessResource.addResourceAndPerm(subscriptionData.getTopic(), Permission.SUB);
}
}
break;
case RequestCode.UNREGISTER_CLIENT:
final UnregisterClientRequestHeader unregisterClientRequestHeader =
(UnregisterClientRequestHeader) request
.decodeCommandCustomHeader(UnregisterClientRequestHeader.class);
accessResource.addResourceAndPerm(getRetryTopic(unregisterClientRequestHeader.getConsumerGroup()), Permission.SUB);
break;
case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
final GetConsumerListByGroupRequestHeader getConsumerListByGroupRequestHeader =
(GetConsumerListByGroupRequestHeader) request
.decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);
accessResource.addResourceAndPerm(getRetryTopic(getConsumerListByGroupRequestHeader.getConsumerGroup()), Permission.SUB);
break;
case RequestCode.UPDATE_CONSUMER_OFFSET:
final UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader =
(UpdateConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
accessResource.addResourceAndPerm(getRetryTopic(updateConsumerOffsetRequestHeader.getConsumerGroup()), Permission.SUB);
accessResource.addResourceAndPerm(updateConsumerOffsetRequestHeader.getTopic(), Permission.SUB);
break;
default:
break;
}
根據(jù)request.getCode()獲取當(dāng)前的操作需要的權(quán)限標(biāo)識集合约素,供后面與系統(tǒng)的權(quán)限配置文件plain_acl.yml中的權(quán)限標(biāo)識符校驗(yàn)時使用
2.3届良、PlainPermissionLoader資源加載器
Broker初始化相關(guān)服務(wù)的時候創(chuàng)建了PlainAccessValidator,我們發(fā)現(xiàn)其默認(rèn)的構(gòu)造方法中調(diào)用了其權(quán)限資源加載器PlainPermissionLoader
public PlainAccessValidator() {
aclPlugEngine = new PlainPermissionLoader();
}
創(chuàng)建PlainPermissionLoader對象
public PlainPermissionLoader() {
//加載服務(wù)端的權(quán)限文件plain_acl.yml
load();
//開啟線程每500ms檢測權(quán)限文件是否改變圣猎,若改變則執(zhí)行l(wèi)oad()從新加載權(quán)限文件
watch();
}
查看load方法流程
public void load() {
Map<String, PlainAccessResource> plainAccessResourceMap = new HashMap<>();
List<RemoteAddressStrategy> globalWhiteRemoteAddressStrategy = new ArrayList<>();
JSONObject plainAclConfData = AclUtils.getYamlDataObject(fileHome + File.separator + fileName,
JSONObject.class);
if (plainAclConfData == null || plainAclConfData.isEmpty()) {
throw new AclException(String.format("%s file is not data", fileHome + File.separator + fileName));
}
log.info("Broker plain acl conf data is : ", plainAclConfData.toString());
//獲取全局白名單IP集合
JSONArray globalWhiteRemoteAddressesList = plainAclConfData.getJSONArray("globalWhiteRemoteAddresses");
if (globalWhiteRemoteAddressesList != null && !globalWhiteRemoteAddressesList.isEmpty()) {
for (int i = 0; i < globalWhiteRemoteAddressesList.size(); i++) {
globalWhiteRemoteAddressStrategy.add(remoteAddressStrategyFactory.
getRemoteAddressStrategy(globalWhiteRemoteAddressesList.getString(i)));
}
}
//獲取賬戶權(quán)限集合
JSONArray accounts = plainAclConfData.getJSONArray("accounts");
if (accounts != null && !accounts.isEmpty()) {
List<PlainAccessConfig> plainAccessConfigList = accounts.toJavaList(PlainAccessConfig.class);
for (PlainAccessConfig plainAccessConfig : plainAccessConfigList) {
//構(gòu)建每個賬戶的權(quán)限資源
PlainAccessResource plainAccessResource = buildPlainAccessResource(plainAccessConfig);
//放入Map中AccessKey作為key士葫,該賬戶的權(quán)限資源作為value
plainAccessResourceMap.put(plainAccessResource.getAccessKey(),plainAccessResource);
}
}
this.globalWhiteRemoteAddressStrategy = globalWhiteRemoteAddressStrategy;
this.plainAccessResourceMap = plainAccessResourceMap;
}
加載資源文件,解析其中的權(quán)限標(biāo)識送悔,等待權(quán)限校驗(yàn)器PlainAccessValidator調(diào)用其validate()對權(quán)限校驗(yàn)
2.4慢显、權(quán)限校驗(yàn)流程
核心的校驗(yàn)方法PlainPermissionLoader.validate()
public void validate(PlainAccessResource plainAccessResource) {
//全局的白名單IP進(jìn)行校驗(yàn)
for (RemoteAddressStrategy remoteAddressStrategy : globalWhiteRemoteAddressStrategy) {
//匹配成功說明是全局的白名單IP,具有所有權(quán)限欠啤,直接返回荚藻。
if (remoteAddressStrategy.match(plainAccessResource)) {
return;
}
}
//判斷用戶名是否為空,null則拋出AclException異常
if (plainAccessResource.getAccessKey() == null) {
throw new AclException(String.format("No accessKey is configured"));
}
//校驗(yàn)賬戶是否存在于服務(wù)端的權(quán)限資源文件中plain_acl.yml洁段,不在則拋出異常
if (!plainAccessResourceMap.containsKey(plainAccessResource.getAccessKey())) {
throw new AclException(String.format("No acl config for %s", plainAccessResource.getAccessKey()));
}
PlainAccessResource ownedAccess = plainAccessResourceMap.get(plainAccessResource.getAccessKey());
//檢查該賬戶的白名單IP是否匹配上客戶端IP,匹配成功具有所有權(quán)限鞋喇,除UPDATE_AND_CREATE_TOPIC等特殊權(quán)限需要管理員權(quán)限
if (ownedAccess.getRemoteAddressStrategy().match(plainAccessResource)) {
return;
}
//校驗(yàn)簽名
String signature = AclUtils.calSignature(plainAccessResource.getContent(), ownedAccess.getSecretKey());
if (!signature.equals(plainAccessResource.getSignature())) {
throw new AclException(String.format("Check signature failed for accessKey=%s", plainAccessResource.getAccessKey()));
}
//校驗(yàn)賬戶內(nèi)的資源權(quán)限
checkPerm(plainAccessResource, ownedAccess);
}
查看其對于當(dāng)前賬戶內(nèi)部的資源校驗(yàn)
void checkPerm(PlainAccessResource needCheckedAccess, PlainAccessResource ownedAccess) {
//判斷請求的命令的Code是否需要管理員權(quán)限,并判斷該用戶是否是管理員
if (Permission.needAdminPerm(needCheckedAccess.getRequestCode()) && !ownedAccess.isAdmin()) {
throw new AclException(String.format("Need admin permission for request code=%d, but accessKey=%s is not", needCheckedAccess.getRequestCode(), ownedAccess.getAccessKey()));
}
Map<String, Byte> needCheckedPermMap = needCheckedAccess.getResourcePermMap();
Map<String, Byte> ownedPermMap = ownedAccess.getResourcePermMap();
if (needCheckedPermMap == null) {
// If the needCheckedPermMap is null,then return
return;
}
for (Map.Entry<String, Byte> needCheckedEntry : needCheckedPermMap.entrySet()) {
String resource = needCheckedEntry.getKey();
Byte neededPerm = needCheckedEntry.getValue();
//判斷是否是group眉撵,在構(gòu)建resourcePermMap時候,group的key=RETRY_GROUP_TOPIC_PREFIX + consumerGroup
boolean isGroup = PlainAccessResource.isRetryTopic(resource);
//系統(tǒng)的權(quán)限配置文件中配置項包不含該客戶端命令請求需要的權(quán)限
if (!ownedPermMap.containsKey(resource)) {
//判斷其是否是topic還是group的權(quán)限標(biāo)識落塑,獲取該類型的全局的權(quán)限是什么
byte ownedPerm = isGroup ? needCheckedAccess.getDefaultGroupPerm() :
needCheckedAccess.getDefaultTopicPerm();
//核對權(quán)限
if (!Permission.checkPermission(neededPerm, ownedPerm)) {
throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup)));
}
continue;
}
//系統(tǒng)的權(quán)限配置文件中配置項包含該客戶端命令請求需要的權(quán)限纽疟,則直接判斷其權(quán)限
if (!Permission.checkPermission(neededPerm, ownedPermMap.get(resource))) {
throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup)));
}
}
}
所有的檢驗(yàn)流程如果有一項不滿足則拋出AclException異常
2.5、客戶端發(fā)送請求
上面圖中只是分析了Broker服務(wù)端的處理流程憾赁,客戶端如何調(diào)用我們具體分析下我們以發(fā)送消息為例:
我們之前分析過Producer的消息發(fā)送的核心方法是DefaultMQProducerImpl.sendKernelImpl()該方法
//是否注冊了“鉤子”
if (this.hasSendMessageHook()) {
context = new SendMessageContext();
context.setProducer(this);
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMq(mq);
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true")) {
context.setMsgType(MessageType.Trans_Msg_Half);
}
if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
context.setMsgType(MessageType.Delay_Msg);
}
//封裝其ACL請求的參數(shù)信息
this.executeSendMessageHookBefore(context);
}
hasSendMessageHook(),我們在構(gòu)建Producer的時候創(chuàng)建了該對象污朽,加入到DefaultMQProducerImpl的sendMessageHookList屬性中。
我們查看其發(fā)送消息NettyRemotingClient類中調(diào)用AclClientRPCHook.doBeforeRequest()發(fā)送前的數(shù)據(jù)準(zhǔn)備
public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
byte[] total = AclUtils.combineRequestContent(request,
parseRequestContent(request, sessionCredentials.getAccessKey(), sessionCredentials.getSecurityToken()));
String signature = AclUtils.calSignature(total, sessionCredentials.getSecretKey());
request.addExtField(SIGNATURE, signature);
request.addExtField(ACCESS_KEY, sessionCredentials.getAccessKey());
// The SecurityToken value is unneccessary,user can choose this one.
if (sessionCredentials.getSecurityToken() != null) {
request.addExtField(SECURITY_TOKEN, sessionCredentials.getSecurityToken());
}
}
只是構(gòu)建簽名signature和Token龙考,準(zhǔn)備改數(shù)據(jù)供Broker端檢驗(yàn)權(quán)限時使用蟆肆。