RocketMQ ACL使用指南
- 2019 年 11 月 14 日
- 筆記
目錄
@(本節目錄)
1、什麼是ACL?
ACL是access control list的簡稱,俗稱訪問控制列表。訪問控制,基本上會涉及到用戶、資源、許可權、角色等概念,那在RocketMQ中上述會對應哪些對象呢?
- 用戶
用戶是訪問控制的基礎要素,也不難理解,RocketMQ ACL必然也會引入用戶的概念,即支援用戶名、密碼。 - 資源
資源,需要保護的對象,在RocketMQ中,消息發送涉及的Topic、消息消費涉及的消費組,應該進行保護,故可以抽象成資源。 - 許可權
針對資源,能進行的操作, - 角色
RocketMQ中,只定義兩種角色:是否是管理員。
另外,RocketMQ還支援按照客戶端IP進行白名單設置。
2、ACL基本流程圖
在講解如何使用ACL之前,我們先簡單看一下RocketMQ ACL的請求流程:
對於上述具體的實現,將在後續文章中重點講解,本文的目的只是希望給讀者一個大概的了解。
3、如何配置ACL
3.1 acl配置文件
acl默認的配置文件名:plain_acl.yml,需要放在${ROCKETMQ_HOME}/store/config目錄下。下面對其配置項一一介紹。
3.1.1 globalWhiteRemoteAddresses
全局白名單,其類型為數組,即支援多個配置。其支援的配置格式如下:
- 空
表示不設置白名單,該條規則默認返回false。 - "*"
表示全部匹配,該條規則直接返回true,將會阻斷其他規則的判斷,請慎重使用。 - 192.168.0.{100,101}
多地址配置模式,ip地址的最後一組,使用{},大括弧中多個ip地址,用英文逗號(,)隔開。 - 192.168.1.100,192.168.2.100
直接使用,分隔,配置多個ip地址。 - 192.168..或192.168.100-200.10-20
每個IP段使用 "*" 或"-"表示範圍。
3.1.2 accounts
配置用戶資訊,該類型為數組類型。擁有accessKey、secretKey、whiteRemoteAddress、admin、defaultTopicPerm、defaultGroupPerm、topicPerms、groupPerms子元素。
3.1.2.1 accessKey
登錄用戶名,長度必須大於6個字元。
3.1.2.2 secretKey
登錄密碼。長度必須大於6個字元。
3.1.2.3 whiteRemoteAddress
用戶級別的IP地址白名單。其類型為一個字元串,其配置規則與globalWhiteRemoteAddresses,但只能配置一條規則。
3.1.2.4 admin
boolean類型,設置是否是admin。如下許可權只有admin=true時才有許可權執行。
- UPDATE_AND_CREATE_TOPIC
更新或創建主題。 - UPDATE_BROKER_CONFIG
更新Broker配置。 - DELETE_TOPIC_IN_BROKER
刪除主題。 - UPDATE_AND_CREATE_SUBSCRIPTIONGROUP
更新或創建訂閱組資訊。 - DELETE_SUBSCRIPTIONGROUP
刪除訂閱組資訊。
3.1.2.5 defaultTopicPerm
默認topic許可權。該值默認為DENY(拒絕)。
3.1.2.6 defaultGroupPerm
默認消費組許可權,該值默認為DENY(拒絕),建議值為SUB。
3.1.2.7 topicPerms
設置topic的許可權。其類型為數組,其可選擇值在下節介紹。
3.1.2.8 groupPerms
設置消費組的許可權。其類型為數組,其可選擇值在下節介紹。可以為每一消費組配置不一樣的許可權。
3.2 RocketMQ ACL許可權可選值
- DENY
拒絕。 - PUB
擁有發送許可權。 - SUB
擁有訂閱許可權。
3.3、許可權驗證流程
上面定義了全局白名單、用戶級別的白名單,用戶級別的許可權,為了更好的配置ACL許可權規則,下面給出許可權匹配邏輯。
4、使用示例
4.1 Broker端安裝
首先,需要在broker.conf文件中,增加參數aclEnable=true。並拷貝distribution/conf/plain_acl.yml文件到${ROCKETMQ_HOME}/conf目錄。
broker.conf的配置文件如下:
brokerClusterName = DefaultCluster brokerName = broker-b brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH listenPort=10915 storePathRootDir=E:/SH2019/tmp/rocketmq_home/rocketmq4.5MB/store storePathCommitLog=E:/SH2019/tmp/rocketmq_home/rocketmq4.5MB/store/commitlog namesrvAddr=127.0.0.1:9876 autoCreateTopicEnable=false aclEnable=true
plain_acl.yml文件內容如下:
globalWhiteRemoteAddresses: accounts: - accessKey: RocketMQ secretKey: 12345678 whiteRemoteAddress: admin: false defaultTopicPerm: DENY defaultGroupPerm: SUB topicPerms: - TopicTest=PUB groupPerms: # the group should convert to retry topic - oms_consumer_group=DENY - accessKey: admin secretKey: 12345678 whiteRemoteAddress: # if it is admin, it could access all resources admin: true
從上面的配置可知,用戶RocketMQ只能發送TopicTest的消息,其他topic無許可權發送;拒絕oms_consumer_group消費組的消息消費,其他消費組默認可消費。
4.2 消息發送端示例
public class AclProducer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name", getAclRPCHook()); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); for (int i = 0; i < 1; i++) { try { Message msg = new Message("TopicTest3" ,"TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("rocketmq","12345678")); } }
運行效果如圖所示:
4.3 消息消費端示例
public class AclConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4", getAclRPCHook(),new AllocateMessageQueueAveragely()); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("rocketmq","12345678")); } }
發現並不沒有消費消息,符合預期。
關於RocketMQ ACL的使用就介紹到這裡了,下一篇將介紹RocketMQ ACL實現原理。
推薦閱讀:
1、RocketMQ實戰:生產環境中,autoCreateTopicEnable為什麼不能設置為true
2、RocketMQ 消息發送system busy、broker busy原因分析與解決方案
作者介紹:
丁威,《RocketMQ技術內幕》作者,RocketMQ 社區佈道師,公眾號:中間件興趣圈 維護者,目前已陸續發表源碼分析Java集合、Java 並發包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源碼專欄。