1. 背景
????Apache Sentry 是Cloudera公司發(fā)布的一個Hadoop開源組件筒繁,提供了細粒度級厚掷、基于角色的授權(quán)以及多租戶的管理模式兔魂,主要針對存儲在Hadoop集群上的數(shù)據(jù)和元數(shù)據(jù)栈妆。它可以和Hive/Hcatalog扛吞、Apache Solr 和Cloudera Impala等集成初肉,未來可以擴展到其他Hadoop生態(tài)系統(tǒng)組件酷鸦,如HDFS和HBase。
????Sentry旨在成為可插拔授權(quán)引擎的Hadoop組件牙咏。允許定義授權(quán)規(guī)則以驗證用戶或應(yīng)用程序?qū)adoop資源的訪問請求臼隔。Sentry是高度模塊化的,可以支持Hadoop中各種數(shù)據(jù)模型的授權(quán)妄壶。
2. 本文的目標(biāo)
? ? 本文目標(biāo)人群為初次接觸apache sentry的開發(fā)人員摔握,幫助其找到代碼的入口函數(shù),快速摸清代碼的架構(gòu)丁寄,梳理系統(tǒng)的結(jié)構(gòu)氨淌,追蹤代碼的調(diào)用路徑,為之后的深入閱讀打下基礎(chǔ)伊磺。建議閱讀之前盛正,在網(wǎng)上搜索sentry相關(guān)的資料,了解其架構(gòu)屑埋,原理豪筝,以便更好地了解源代碼。寫這篇文章時摘能,本人也是剛接觸sentry续崖,有些心得,與大家分享徊哑。更多深入的理解袜刷,請關(guān)注下一次的分享
3. sentry簡單介紹
? ? 權(quán)限模型,在很多系統(tǒng)我們都見過莺丑,先是資源著蟹,然后是權(quán)限,權(quán)限是對資源的訪問規(guī)則梢莽,然后有角色萧豆,角色是一組權(quán)限,最后是用戶昏名,角色賦予給用戶涮雷,有時候也會有組的概念,相同屬性的用戶劃成一組轻局,角色賦予給組洪鸭。sentry的權(quán)限模型也是同樣的原理样刷。它包括一下元素:
? ? a. Resource:權(quán)限的對象,包括server, 庫览爵,表置鼻,行,uri等
? ? b. Privilege:? 權(quán)限蜓竹,可以連接成一組訪問規(guī)則箕母。比如用戶A可以對表T進行讀訪問,但不能刪除俱济。
? ? c. Role:角色是一組權(quán)限的集合
? ? d. Group:相當(dāng)于用戶這個概念嘶是,sentry不存在用戶這個概念。都是以Group來表達蛛碌。你可以理解成sentry的組就是用戶或者賬號的概念聂喇。角色賦予給Group。
4. 代碼初讀
4.1 入口函數(shù)
? ? 閱讀源代碼蔚携,最好的方式是找到入口函數(shù)授帕,從入口函數(shù)一步步往下閱讀,自然能夠梳理出代碼的整體架構(gòu)浮梢。
? ? Sentry入口函數(shù)位于:sentry-tools/src/main/java/org/apache/sentry/SentryMain.java
為什么我知道在這兒,根據(jù)啟動時采用命令行的方式彤路,帶參數(shù)秕硝。一般都會有main函數(shù),還是經(jīng)驗問題哈洲尊。
public static void main(String[] args)
? ? ? ? ? ? throws Exception {
? ? ? ? CommandLineParser parser = new GnuParser();
? ? ? ? Options options = new Options();
? ? ? ? options.addOption(HELP_SHORT, HELP_LONG, false, "Print this help text");
? ? ? ? options.addOption(VERSION_SHORT, VERSION_LONG, false,
? ? ? ? ? ? ? ? "Print Sentry version");
? ? ? ? options.addOption(HIVE_CONF, true, "Set hive configuration variables");
? ? ? ? options.addOption(null, COMMAND, true, "Command to run. Options: " + COMMANDS);
? ? ? ? options.addOption(null, LOG4J_CONF, true, "Location of log4j properties file");
? ? ? ? //Ignore unrecognized options: service and config-tool options
? ? ? ? CommandLine commandLine = parser.parse(options, args, true);
? ? ? ? String log4jconf = commandLine.getOptionValue(LOG4J_CONF);
? ? ? ? if (log4jconf != null && log4jconf.length() > 0) {
? ? ? ? ? ? Properties log4jProperties = new Properties();
? ? ? ? ? ? // Firstly load log properties from properties file
? ? ? ? ? ? try (InputStream istream = Files.newInputStream(Paths.get(log4jconf))) {
? ? ? ? ? ? ? ? log4jProperties.load(istream);
? ? ? ? ? ? }
? ? ? ? ? ? // Set the log level of DataNucleus.Query to INFO only if it is not set in the
? ? ? ? ? ? // properties file
? ? ? ? ? ? if (!log4jProperties.containsKey(LOG4J_DATANUCLEUS)) {
? ? ? ? ? ? ? ? log4jProperties.setProperty(LOG4J_DATANUCLEUS, "INFO");
? ? ? ? ? ? ? ? // Enable debug log for DataNucleus.Query only when log.threshold is TRACE
? ? ? ? ? ? ? ? String logThreshold = log4jProperties.getProperty("log.threshold");
? ? ? ? ? ? ? ? if (logThreshold != null && logThreshold.equalsIgnoreCase("TRACE")) {
? ? ? ? ? ? ? ? ? ? log4jProperties.setProperty(LOG4J_DATANUCLEUS, "DEBUG");
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? PropertyConfigurator.configure(log4jProperties);
? ? ? ? ? ? Logger sentryLogger = LoggerFactory.getLogger(SentryMain.class);
? ? ? ? ? ? sentryLogger.info("Configuring log4j to use [" + log4jconf + "]");
? ? ? ? }
? ? ? ? //Print sentry help only if commandName was not given,
? ? ? ? // otherwise we assume the help is for the sub command
? ? ? ? String commandName = commandLine.getOptionValue(COMMAND);
? ? ? ? if (commandName == null && (commandLine.hasOption(HELP_SHORT) ||
? ? ? ? ? ? ? ? commandLine.hasOption(HELP_LONG))) {
? ? ? ? ? ? printHelp(options, "Command name is missing.");
? ? ? ? } else if (commandLine.hasOption(VERSION_SHORT) ||
? ? ? ? ? ? ? ? commandLine.hasOption(VERSION_LONG)) {
? ? ? ? ? ? printVersion();
? ? ? ? }
? ? ? ? Command command = null;
? ? ? ? switch (commandName){
? ? ? ? ? ? case "service":
? ? ? ? ? ? ? ? command = new SentryService.CommandImpl();
? ? ? ? ? ? ? ? break;
? ? ? ? ? ? case "config-tool":
? ? ? ? ? ? ? ? command = new SentryConfigTool.CommandImpl();
? ? ? ? ? ? ? ? break;
? ? ? ? ? ? case "schema-tool":
? ? ? ? ? ? ? ? command = new SentrySchemaTool.CommandImpl();
? ? ? ? ? ? ? ? break;
? ? ? ? ? ? default:
? ? ? ? ? ? ? ? printHelp(options, "Unknown command " + commandName + "\n");
? ? ? ? ? ? ? ? break;
? ? ? ? }
? ? ? ? ((Command)command).run(commandLine.getArgs());
? ? }
這段代碼主要是根據(jù)傳入的參數(shù)远豺,解析參數(shù),讀取配置文件坞嘀,初始化日志躯护。其他的大致看一眼,最主要的是這個函數(shù):command = new SentryService.CommandImpl();
4.2 啟動過程:
? ? sentry啟動過程主要看這個類:sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/SentryService.java丽涩。
? ??
public SentryService(Configuration conf) throws Exception {
? ? this.conf = conf;
? ? int port = conf
? ? ? ? .getInt(ServerConfig.RPC_PORT, ServerConfig.RPC_PORT_DEFAULT);
? ? if (port == 0) {
? ? ? port = findFreePort();
? ? ? conf.setInt(ServerConfig.RPC_PORT, port);
? ? }
? ? this.address = NetUtils.createSocketAddr(
? ? ? ? conf.get(ServerConfig.RPC_ADDRESS, ServerConfig.RPC_ADDRESS_DEFAULT),
? ? ? ? port);
? ? LOGGER.info("Configured on address {}", address);
? ? kerberos = ServerConfig.SECURITY_MODE_KERBEROS.equalsIgnoreCase(
? ? ? ? conf.get(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_KERBEROS).trim());
? ? maxThreads = conf.getInt(ServerConfig.RPC_MAX_THREADS,
? ? ? ? ServerConfig.RPC_MAX_THREADS_DEFAULT);
? ? minThreads = conf.getInt(ServerConfig.RPC_MIN_THREADS,
? ? ? ? ServerConfig.RPC_MIN_THREADS_DEFAULT);
? ? maxMessageSize = conf.getLong(ServerConfig.SENTRY_POLICY_SERVER_THRIFT_MAX_MESSAGE_SIZE,
? ? ? ? ServerConfig.SENTRY_POLICY_SERVER_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
? ? if (kerberos) {
? ? ? // Use Hadoop libraries to translate the _HOST placeholder with actual hostname
? ? ? try {
? ? ? ? String rawPrincipal = Preconditions.checkNotNull(conf.get(ServerConfig.PRINCIPAL), ServerConfig.PRINCIPAL + " is required");
? ? ? ? principal = SecurityUtil.getServerPrincipal(rawPrincipal, address.getAddress());
? ? ? } catch(IOException io) {
? ? ? ? throw new RuntimeException("Can't translate kerberos principal'", io);
? ? ? }
? ? ? LOGGER.info("Using kerberos principal: {}", principal);
? ? ? principalParts = SaslRpcServer.splitKerberosName(principal);
? ? ? Preconditions.checkArgument(principalParts.length == 3,
? ? ? ? ? "Kerberos principal should have 3 parts: " + principal);
? ? ? keytab = Preconditions.checkNotNull(conf.get(ServerConfig.KEY_TAB),
? ? ? ? ? ServerConfig.KEY_TAB + " is required");
? ? ? File keytabFile = new File(keytab);
? ? ? Preconditions.checkState(keytabFile.isFile() && keytabFile.canRead(),
? ? ? ? ? "Keytab %s does not exist or is not readable.", keytab);
? ? } else {
? ? ? principal = null;
? ? ? principalParts = null;
? ? ? keytab = null;
? ? }
? ? ThreadFactory sentryServiceThreadFactory = new ThreadFactoryBuilder()
? ? ? ? .setNameFormat(SENTRY_SERVICE_THREAD_NAME)
? ? ? ? .build();
? ? serviceExecutor = Executors.newSingleThreadExecutor(sentryServiceThreadFactory);
? ? this.sentryStore = getSentryStore(conf);
? ? sentryStore.setPersistUpdateDeltas(SentryServiceUtil.isHDFSSyncEnabled(conf));
? ? this.leaderMonitor = LeaderStatusMonitor.getLeaderStatusMonitor(conf);
? ? status = Status.NOT_STARTED;
? ? // Enable signal handler for HA leader/follower status if configured
? ? String sigName = conf.get(ServerConfig.SERVER_HA_STANDBY_SIG);
? ? if ((sigName != null) && !sigName.isEmpty()) {
? ? ? LOGGER.info("Registering signal handler {} for HA", sigName);
? ? ? try {
? ? ? ? registerSigListener(sigName, this);
? ? ? } catch (Exception e) {
? ? ? ? LOGGER.error("Failed to register signal", e);
? ? ? }
? ? }
? }
? ? 首先看下這個類的構(gòu)造函數(shù)棺滞,主要是根據(jù)傳入的配置文件,確定thrift的端口矢渊,最大線程數(shù)继准,最小線程數(shù),最大消息size矮男。還有監(jiān)控移必,kerberos驗證,可用行HA的配置毡鉴。最主要的是看:?this.sentryStore = getSentryStore(conf);這個函數(shù)崔泵。由于是采用thrift這種rpc框架秒赤,所以要注冊processor,以及根據(jù)thrift IDL生成實際的處理函數(shù)憎瘸。要想弄懂Sentry入篮,建議先去弄懂Thrift這個RPC框架。
private void runServer() throws Exception {
? ? startSentryStoreCleaner(conf);
? ? startHMSFollower(conf);
? ? Iterable<String> processorFactories = ConfUtilties.CLASS_SPLITTER
? ? ? ? .split(conf.get(ServerConfig.PROCESSOR_FACTORIES,
? ? ? ? ? ? ServerConfig.PROCESSOR_FACTORIES_DEFAULT).trim());
? ? TMultiplexedProcessor processor = new TMultiplexedProcessor();
? ? boolean registeredProcessor = false;
? ? for (String processorFactory : processorFactories) {
? ? ? Class<?> clazz = conf.getClassByName(processorFactory);
? ? ? if (!ProcessorFactory.class.isAssignableFrom(clazz)) {
? ? ? ? throw new IllegalArgumentException("Processor Factory "
? ? ? ? ? ? + processorFactory + " is not a "
? ? ? ? ? ? + ProcessorFactory.class.getName());
? ? ? }
? ? ? try {
? ? ? ? Constructor<?> constructor = clazz
? ? ? ? ? ? .getConstructor(Configuration.class);
? ? ? ? LOGGER.info("ProcessorFactory being used: " + clazz.getCanonicalName());
? ? ? ? ProcessorFactory factory = (ProcessorFactory) constructor
? ? ? ? ? ? .newInstance(conf);
? ? ? ? boolean registerStatus = factory.register(processor, sentryStore);
? ? ? ? if (!registerStatus) {
? ? ? ? ? LOGGER.error("Failed to register " + clazz.getCanonicalName());
? ? ? ? }
? ? ? ? registeredProcessor = registerStatus || registeredProcessor;
? ? ? } catch (Exception e) {
? ? ? ? throw new IllegalStateException("Could not create "
? ? ? ? ? ? + processorFactory, e);
? ? ? }
? ? }
? ? if (!registeredProcessor) {
? ? ? throw new IllegalStateException(
? ? ? ? ? "Failed to register any processors from " + processorFactories);
? ? }
? ? addSentryServiceGauge();
? ? TServerTransport serverTransport = new TServerSocket(address);
? ? TTransportFactory transportFactory = null;
? ? if (kerberos) {
? ? ? TSaslServerTransport.Factory saslTransportFactory = new TSaslServerTransport.Factory();
? ? ? saslTransportFactory.addServerDefinition(AuthMethod.KERBEROS
? ? ? ? ? .getMechanismName(), principalParts[0], principalParts[1],
? ? ? ? ? ? ? ServerConfig.SASL_PROPERTIES, new GSSCallback(conf));
? ? ? transportFactory = saslTransportFactory;
? ? } else {
? ? ? transportFactory = new TTransportFactory();
? ? }
? ? TThreadPoolServer.Args args = new TThreadPoolServer.Args(
? ? ? ? serverTransport).processor(processor)
? ? ? ? .transportFactory(transportFactory)
? ? ? ? .protocolFactory(new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize))
? ? ? ? .minWorkerThreads(minThreads).maxWorkerThreads(maxThreads);
? ? thriftServer = new TThreadPoolServer(args);
? ? LOGGER.info("Serving on {}", address);
? ? startSentryWebServer();
? ? // thriftServer.serve() does not return until thriftServer is stopped. Need to log before
? ? // calling thriftServer.serve()
? ? LOGGER.info("Sentry service is ready to serve client requests");
? ? // Allow clients/users watching the console to know when sentry is ready
? ? System.out.println("Sentry service is ready to serve client requests");
? ? SentryStateBank.enableState(SentryServiceState.COMPONENT, SentryServiceState.SERVICE_RUNNING);
? ? thriftServer.serve();
? }
? ? 這段代碼其他的不太重要含思,主要的看看崎弃,接受到RPC請求后怎么處理。既然采用了Thrift框架含潘,必然對應(yīng)的有根據(jù)IDL生成處理函數(shù)饲做。processor就是處理函數(shù),進一步去看看processFactories遏弱,從配置文件中可以看到采用的這個processorFactory:org.apache.sentry.api.service.thrift.SentryPolicyStoreProcessorFactory盆均,進去看看里面干了啥?
public SentryPolicyStoreProcessorFactory(Configuration conf) {
? ? super(conf);
? }
? public boolean register(TMultiplexedProcessor multiplexedProcessor,
? ? ? ? ? ? ? ? ? ? ? ? ? SentryStoreInterface sentryStore) throws Exception {
? ? SentryPolicyStoreProcessor sentryServiceHandler =
? ? ? ? new SentryPolicyStoreProcessor(SentryPolicyServiceConstants.SENTRY_POLICY_SERVICE_NAME,
? ? ? ? ? ? conf, sentryStore);
? ? TProcessor processor =
? ? ? new SentryProcessorWrapper<SentryPolicyService.Iface>(sentryServiceHandler);
? ? multiplexedProcessor.registerProcessor(
? ? ? SentryPolicyServiceConstants.SENTRY_POLICY_SERVICE_NAME, processor);
? ? return true;
? }
? ? 這個類在注冊時漱逸,指定具體的處理類:SentryPolicyStoreProcessor泪姨,這個類里面都是thrift生成的每個接口的處理函數(shù),如:drop_sentry_role饰抒,這個接口時刪除角色的RPC接口肮砾。之后的邏輯,大家可以看具體的函數(shù)了袋坑,再次我不進一步分享了
5. 總結(jié)
? ? 今天主要時跟大家簡單分享了下sentry的背景和原理仗处,以及對讀源碼進行一件簡單的引路。本人能力有限枣宫,提出了粗鄙的見解婆誓,有不足之處,請大家指出也颤,分享交流洋幻!