diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 7d8e5bc..f017482 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -413,13 +413,19 @@ "The delegation token store implementation. Set to org.apache.hadoop.hive.thrift.ZooKeeperTokenStore for load-balanced cluster."), METASTORE_CLUSTER_DELEGATION_TOKEN_STORE_ZK_CONNECTSTR( "hive.cluster.delegation.token.store.zookeeper.connectString", "", - "The ZooKeeper token store connect string."), + "The ZooKeeper token store connect string. You can re-use the configuration value\n" + + "set in hive.zookeeper.quorum, by leaving this parameter unset."), METASTORE_CLUSTER_DELEGATION_TOKEN_STORE_ZK_ZNODE( - "hive.cluster.delegation.token.store.zookeeper.znode", "/hive/cluster/delegation", - "The root path for token store data."), + "hive.cluster.delegation.token.store.zookeeper.znode", "/hivedelegation", + "The root path for token store data. Note that this is used by both HiveServer2 and\n" + + "MetaStore to store delegation Token. One directory gets created for each of them.\n" + + "The final directory names would have the servername appended to it (HIVESERVER2,\n" + + "METASTORE)."), METASTORE_CLUSTER_DELEGATION_TOKEN_STORE_ZK_ACL( "hive.cluster.delegation.token.store.zookeeper.acl", "", - "ACL for token store entries. List comma separated all server principals for the cluster."), + "ACL for token store entries. Comma separated list of ACL entries For example: \n" + + "sasl:hive/host1@MY.DOMAIN:cdrwa,sasl:hive/host2@MY.DOMAIN:cdrwa\n" + + "Defaults to all permissions for the hiveserver2/metastore process user."), METASTORE_CACHE_PINOBJTYPES("hive.metastore.cache.pinobjtypes", "Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order", "List of comma separated metastore object types that should be pinned in the cache"), METASTORE_CONNECTION_POOLING_TYPE("datanucleus.connectionPoolingType", "BONECP", @@ -1246,10 +1252,13 @@ // Zookeeper related configs HIVE_ZOOKEEPER_QUORUM("hive.zookeeper.quorum", "", - "List of ZooKeeper servers to talk to. This is needed for: " + - "1. Read/write locks - when hive.lock.manager is set to " + - "org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager, " + - "2. When HiveServer2 supports service discovery via Zookeeper."), + "List of ZooKeeper servers to talk to. This is needed for: \n" + + "1. Read/write locks - when hive.lock.manager is set to \n" + + "org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager, \n" + + "2. When HiveServer2 supports service discovery via Zookeeper.\n" + + "3. For delegation token storage if zookeeper store is used, if\n" + + "hive.cluster.delegation.token.store.zookeeper.connectString is not set"), + HIVE_ZOOKEEPER_CLIENT_PORT("hive.zookeeper.client.port", "2181", "The port of ZooKeeper servers to talk to. " + "If the list of Zookeeper servers specified in hive.zookeeper.quorum," + diff --git a/hcatalog/webhcat/svr/pom.xml b/hcatalog/webhcat/svr/pom.xml index 6065748..4969708 100644 --- a/hcatalog/webhcat/svr/pom.xml +++ b/hcatalog/webhcat/svr/pom.xml @@ -72,14 +72,11 @@ commons-exec ${commons-exec.version} - - - + org.apache.curator curator-framework ${curator.version} - org.apache.zookeeper zookeeper @@ -203,37 +200,6 @@ - - org.apache.maven.plugins - maven-shade-plugin - - - include-curator - - package - - shade - - - true - - - org.apache.curator - - - - - org.apache.curator - webhcat.org.apache.curator - - - - - - diff --git a/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java b/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java index b81942a..b2bdafa 100644 --- a/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java +++ b/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.SaslRpcServer.AuthMethod; @@ -98,9 +99,9 @@ protected DelegationTokenStore getTokenStore(Configuration conf) throws IOExcept } @Override - public void startDelegationTokenSecretManager(Configuration conf, Object hms) + public void startDelegationTokenSecretManager(Configuration conf, Object hms, ServerMode sm) throws IOException{ - super.startDelegationTokenSecretManager(conf, hms); + super.startDelegationTokenSecretManager(conf, hms, sm); isMetastoreTokenManagerInited = true; } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestDBTokenStore.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestDBTokenStore.java index 8860d30..0b61a62 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestDBTokenStore.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestDBTokenStore.java @@ -37,7 +37,7 @@ public void testDBTokenStore() throws TokenStoreException, MetaException, IOException { DelegationTokenStore ts = new DBTokenStore(); - ts.setStore(new HMSHandler("Test handler").getMS()); + ts.init(new HMSHandler("Test handler").getMS(), null); assertEquals(0, ts.getMasterKeys().length); assertEquals(false,ts.removeMasterKey(-1)); try{ diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestZooKeeperTokenStore.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestZooKeeperTokenStore.java index 83a80b4..26d4d97 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestZooKeeperTokenStore.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestZooKeeperTokenStore.java @@ -24,25 +24,28 @@ import junit.framework.TestCase; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.ACL; -import org.apache.zookeeper.data.Stat; import org.junit.Assert; public class TestZooKeeperTokenStore extends TestCase { private MiniZooKeeperCluster zkCluster = null; - private ZooKeeper zkClient = null; + private CuratorFramework zkClient = null; private int zkPort = -1; private ZooKeeperTokenStore ts; // connect timeout large enough for slower test environments private final int connectTimeoutMillis = 30000; + private final int sessionTimeoutMillis = 3000; @Override protected void setUp() throws Exception { @@ -53,8 +56,10 @@ protected void setUp() throws Exception { this.zkCluster = new MiniZooKeeperCluster(); this.zkPort = this.zkCluster.startup(zkDataDir); - this.zkClient = ZooKeeperTokenStore.createConnectedClient("localhost:" + zkPort, 3000, - connectTimeoutMillis); + this.zkClient = CuratorFrameworkFactory.builder().connectString("localhost:" + zkPort) + .sessionTimeoutMs(sessionTimeoutMillis).connectionTimeoutMs(connectTimeoutMillis) + .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); + this.zkClient.start(); } @Override @@ -84,14 +89,16 @@ private Configuration createConf(String zkPath) { public void testTokenStorage() throws Exception { String ZK_PATH = "/zktokenstore-testTokenStorage"; ts = new ZooKeeperTokenStore(); - ts.setConf(createConf(ZK_PATH)); + Configuration conf = createConf(ZK_PATH); + conf.set(HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ACL, "world:anyone:cdrwa"); + ts.setConf(conf); + ts.init(null, ServerMode.METASTORE); + + String metastore_zk_path = ZK_PATH + ServerMode.METASTORE; int keySeq = ts.addMasterKey("key1Data"); - byte[] keyBytes = zkClient.getData( - ZK_PATH - + "/keys/" - + String.format(ZooKeeperTokenStore.ZK_SEQ_FORMAT, - keySeq), false, null); + byte[] keyBytes = zkClient.getData().forPath( + metastore_zk_path + "/keys/" + String.format(ZooKeeperTokenStore.ZK_SEQ_FORMAT, keySeq)); assertNotNull(keyBytes); assertEquals(new String(keyBytes), "key1Data"); @@ -116,8 +123,7 @@ public void testTokenStorage() throws Exception { HiveDelegationTokenSupport .encodeDelegationTokenInformation(tokenInfoRead)); - List allIds = ts - .getAllDelegationTokenIdentifiers(); + List allIds = ts.getAllDelegationTokenIdentifiers(); assertEquals(1, allIds.size()); Assert.assertEquals(TokenStoreDelegationTokenSecretManager .encodeWritable(tokenId), @@ -138,10 +144,10 @@ public void testAclNoAuth() throws Exception { ts = new ZooKeeperTokenStore(); try { ts.setConf(conf); + ts.init(null, ServerMode.METASTORE); fail("expected ACL exception"); } catch (DelegationTokenStore.TokenStoreException e) { - assertEquals(e.getCause().getClass(), - KeeperException.NoAuthException.class); + assertEquals(KeeperException.NoAuthException.class, e.getCause().getClass()); } } @@ -159,10 +165,10 @@ public void testAclInvalid() throws Exception { ts = new ZooKeeperTokenStore(); try { ts.setConf(conf); + ts.init(null, ServerMode.METASTORE); fail("expected ACL exception"); } catch (DelegationTokenStore.TokenStoreException e) { - assertEquals(e.getCause().getClass(), - KeeperException.InvalidACLException.class); + assertEquals(KeeperException.InvalidACLException.class, e.getCause().getClass()); } } @@ -171,10 +177,11 @@ public void testAclPositive() throws Exception { Configuration conf = createConf(ZK_PATH); conf.set( HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ACL, - "world:anyone:cdrwa,ip:127.0.0.1:cdrwa"); + "ip:127.0.0.1:cdrwa,world:anyone:cdrwa"); ts = new ZooKeeperTokenStore(); ts.setConf(conf); - List acl = zkClient.getACL(ZK_PATH, new Stat()); + ts.init(null, ServerMode.METASTORE); + List acl = zkClient.getACL().forPath(ZK_PATH + ServerMode.METASTORE); assertEquals(2, acl.size()); } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 3cf43a9..e3240ca 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -149,8 +149,8 @@ import org.apache.hadoop.hive.metastore.api.UnlockRequest; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.events.AddIndexEvent; -import org.apache.hadoop.hive.metastore.events.AlterIndexEvent; import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AlterIndexEvent; import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; import org.apache.hadoop.hive.metastore.events.AlterTableEvent; import org.apache.hadoop.hive.metastore.events.ConfigChangeEvent; @@ -192,6 +192,7 @@ import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge; +import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; import org.apache.hadoop.hive.thrift.TUGIContainingTransport; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; @@ -5795,7 +5796,7 @@ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge, conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_KEYTAB_FILE), conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL)); // start delegation token manager - saslServer.startDelegationTokenSecretManager(conf, baseHandler.getMS()); + saslServer.startDelegationTokenSecretManager(conf, baseHandler.getMS(), ServerMode.METASTORE); transFactory = saslServer.createTransportFactory( MetaStoreUtils.getMetaStoreSaslProperties(conf)); processor = saslServer.wrapProcessor( diff --git a/pom.xml b/pom.xml index c694980..825fa7f 100644 --- a/pom.xml +++ b/pom.xml @@ -161,7 +161,7 @@ 3.4.5 1.1 2.4.0 - 2.5.0 + 2.6.0 diff --git a/ql/src/java/org/apache/hadoop/hive/ql/util/ZooKeeperHiveHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/util/ZooKeeperHiveHelper.java index 11dd962..29d0531 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/util/ZooKeeperHiveHelper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/util/ZooKeeperHiveHelper.java @@ -18,18 +18,12 @@ package org.apache.hadoop.hive.ql.util; -import java.util.HashMap; import java.util.List; -import java.util.Map; - -import javax.security.auth.login.AppConfigurationEntry; -import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.security.authentication.util.KerberosUtil; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; @@ -39,7 +33,7 @@ public class ZooKeeperHiveHelper { public static final Log LOG = LogFactory.getLog(ZooKeeperHiveHelper.class.getName()); public static final String ZOOKEEPER_PATH_SEPARATOR = "/"; - public static final String SASL_LOGIN_CONTEXT_NAME = "HiveZooKeeperClient"; + /** * Get the ensemble server addresses from the configuration. The format is: host1:port, * host2:port.. @@ -97,59 +91,9 @@ public static String createPathRecursively(ZooKeeper zooKeeperClient, String pat * A no-op watcher class */ public static class DummyWatcher implements Watcher { + @Override public void process(org.apache.zookeeper.WatchedEvent event) { } } - /** - * Dynamically sets up the JAAS configuration - * @param principal - * @param keyTabFile - */ - public static void setUpJaasConfiguration(String principal, String keyTabFile) { - JaasConfiguration jaasConf = - new JaasConfiguration(ZooKeeperHiveHelper.SASL_LOGIN_CONTEXT_NAME, principal, keyTabFile); - // Install the Configuration in the runtime. - javax.security.auth.login.Configuration.setConfiguration(jaasConf); - } - - /** - * A JAAS configuration for ZooKeeper clients intended to use for SASL Kerberos. - */ - private static class JaasConfiguration extends javax.security.auth.login.Configuration { - // Current installed Configuration - private javax.security.auth.login.Configuration baseConfig = - javax.security.auth.login.Configuration.getConfiguration(); - private final String loginContextName; - private final String principal; - private final String keyTabFile; - - public JaasConfiguration(String hiveLoginContextName, String principal, String keyTabFile) { - this.loginContextName = hiveLoginContextName; - this.principal = principal; - this.keyTabFile = keyTabFile; - } - - @Override - public AppConfigurationEntry[] getAppConfigurationEntry(String appName) { - if (loginContextName.equals(appName)) { - Map krbOptions = new HashMap(); - krbOptions.put("doNotPrompt", "true"); - krbOptions.put("storeKey", "true"); - krbOptions.put("useKeyTab", "true"); - krbOptions.put("principal", principal); - krbOptions.put("keyTab", keyTabFile); - krbOptions.put("refreshKrb5Config", "true"); - AppConfigurationEntry hiveZooKeeperClientEntry = - new AppConfigurationEntry(KerberosUtil.getKrb5LoginModuleName(), - LoginModuleControlFlag.REQUIRED, krbOptions); - return new AppConfigurationEntry[] { hiveZooKeeperClientEntry }; - } - // Try the base config - if (baseConfig != null) { - return baseConfig.getAppConfigurationEntry(appName); - } - return null; - } - } } diff --git a/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java b/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java index ab34d2d..72bc724 100644 --- a/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java +++ b/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.shims.HadoopShims.KerberosNameShim; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge; +import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.thrift.ThriftCLIService; @@ -98,7 +99,7 @@ public HiveAuthFactory(HiveConf conf) throws TTransportException { conf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL)); // start delegation token manager try { - saslServer.startDelegationTokenSecretManager(conf, null); + saslServer.startDelegationTokenSecretManager(conf, null, ServerMode.HIVESERVER2); } catch (IOException e) { throw new TTransportException("Failed to start token manager", e); } diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index 0aab3f9..3c37f24 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -53,7 +53,6 @@ import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooDefs.Perms; import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.client.ZooKeeperSaslClient; import org.apache.zookeeper.data.ACL; /** @@ -178,23 +177,19 @@ private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception { */ private void setUpAuthAndAcls(HiveConf hiveConf, List nodeAcls) throws Exception { if (ShimLoader.getHadoopShims().isSecurityEnabled()) { - String principal = - ShimLoader.getHadoopShims().getResolvedPrincipal( - hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL)); - String keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB); + String principal = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL); if (principal.isEmpty()) { throw new IOException( "HiveServer2 Kerberos principal is empty"); } + String keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB); if (keyTabFile.isEmpty()) { throw new IOException( "HiveServer2 Kerberos keytab is empty"); } - // ZooKeeper property name to pick the correct JAAS conf section - System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, - ZooKeeperHiveHelper.SASL_LOGIN_CONTEXT_NAME); + // Install the JAAS Configuration for the runtime - ZooKeeperHiveHelper.setUpJaasConfiguration(principal, keyTabFile); + ShimLoader.getHadoopShims().setZookeeperClientKerberosJaasConfig(principal, keyTabFile); // Read all to the world nodeAcls.addAll(Ids.READ_ACL_UNSAFE); // Create/Delete/Write/Admin to the authenticated user @@ -212,6 +207,7 @@ private void setUpAuthAndAcls(HiveConf hiveConf, List nodeAcls) throws Exce * sessions at the time of receiving a 'NodeDeleted' notification from ZooKeeper. */ private class DeRegisterWatcher implements Watcher { + @Override public void process(WatchedEvent event) { if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) { HiveServer2.this.setRegisteredWithZooKeeper(false); @@ -389,7 +385,7 @@ public static void main(String[] args) { private final Options options = new Options(); private org.apache.commons.cli.CommandLine commandLine; private final String serverName; - private StringBuilder debugMessage = new StringBuilder(); + private final StringBuilder debugMessage = new StringBuilder(); @SuppressWarnings("static-access") ServerOptionsProcessor(String serverName) { @@ -453,7 +449,7 @@ StringBuilder getDebugMessage() { * The response sent back from {@link ServerOptionsProcessor#parse(String[])} */ static class ServerOptionsProcessorResponse { - private ServerOptionsExecutor serverOptionsExecutor; + private final ServerOptionsExecutor serverOptionsExecutor; ServerOptionsProcessorResponse(ServerOptionsExecutor serverOptionsExecutor) { this.serverOptionsExecutor = serverOptionsExecutor; @@ -483,6 +479,7 @@ ServerOptionsExecutor getServerOptionsExecutor() { this.serverName = serverName; } + @Override public void execute() { new HelpFormatter().printHelp(serverName, options); System.exit(0); @@ -494,6 +491,7 @@ public void execute() { * This is the default executor, when no option is specified. */ static class StartOptionExecutor implements ServerOptionsExecutor { + @Override public void execute() { try { startHiveServer2(); @@ -515,6 +513,7 @@ public void execute() { this.versionNumber = versionNumber; } + @Override public void execute() { try { deleteServerInstancesFromZooKeeper(versionNumber); diff --git a/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java b/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java index d18ae44..9d2ff55 100644 --- a/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java +++ b/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java @@ -59,7 +59,6 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; -import org.apache.hadoop.hive.shims.HadoopShims.KerberosNameShim; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.FileInputFormat; @@ -705,7 +704,7 @@ public void setFullFileStatus(Configuration conf, HdfsFileStatus sourceStatus, } public class Hadoop20FileStatus implements HdfsFileStatus { - private FileStatus fileStatus; + private final FileStatus fileStatus; public Hadoop20FileStatus(FileStatus fileStatus) { this.fileStatus = fileStatus; } @@ -713,6 +712,7 @@ public Hadoop20FileStatus(FileStatus fileStatus) { public FileStatus getFileStatus() { return fileStatus; } + @Override public void debugLog() { if (fileStatus != null) { LOG.debug(fileStatus.toString()); @@ -946,4 +946,9 @@ public KerberosNameShim getKerberosNameShim(String name) throws IOException { // Not supported return null; } + + @Override + public void setZookeeperClientKerberosJaasConfig(String principal, String keyTabFile) { + // Not supported + } } diff --git a/shims/common-secure/pom.xml b/shims/common-secure/pom.xml index 98b5ca1..2c56f5d 100644 --- a/shims/common-secure/pom.xml +++ b/shims/common-secure/pom.xml @@ -74,6 +74,11 @@ ${libthrift.version} + org.apache.curator + curator-framework + ${curator.version} + + org.apache.zookeeper zookeeper ${zookeeper.version} diff --git a/shims/common-secure/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java b/shims/common-secure/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java index 606f973..53341e0 100644 --- a/shims/common-secure/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java +++ b/shims/common-secure/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java @@ -29,11 +29,14 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; -import javax.security.auth.login.LoginException; +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.logging.Log; @@ -66,6 +69,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.util.KerberosUtil; import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; @@ -73,6 +77,7 @@ import org.apache.hadoop.tools.HadoopArchives; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ToolRunner; +import org.apache.zookeeper.client.ZooKeeperSaslClient; import com.google.common.primitives.Longs; @@ -88,6 +93,7 @@ public String unquoteHtmlChars(String item) { return HtmlQuoting.unquoteHtmlChars(item); } + @Override public HadoopShims.CombineFileInputFormatShim getCombineFileInputFormat() { return new CombineFileInputFormatShim() { @Override @@ -171,6 +177,7 @@ public void write(DataOutput out) throws IOException { protected boolean isShrinked; protected long shrinkedLength; + @Override public boolean next(K key, V value) throws IOException { while ((curReader == null) @@ -183,11 +190,13 @@ public boolean next(K key, V value) throws IOException { return true; } + @Override public K createKey() { K newKey = curReader.createKey(); return (K)(new CombineHiveKey(newKey)); } + @Override public V createValue() { return curReader.createValue(); } @@ -195,10 +204,12 @@ public V createValue() { /** * Return the amount of data processed. */ + @Override public long getPos() throws IOException { return progress; } + @Override public void close() throws IOException { if (curReader != null) { curReader.close(); @@ -209,6 +220,7 @@ public void close() throws IOException { /** * Return progress based on the amount of data processed so far. */ + @Override public float getProgress() throws IOException { return Math.min(1.0f, progress / (float) (split.getLength())); } @@ -309,6 +321,7 @@ protected boolean initNextRecordReader(K key) throws IOException { CombineFileInputFormat implements HadoopShims.CombineFileInputFormatShim { + @Override public Path[] getInputPathsShim(JobConf conf) { try { return FileInputFormat.getInputPaths(conf); @@ -339,7 +352,7 @@ public void createPool(JobConf conf, PathFilter... filters) { super.setMaxSplitSize(minSize); } - InputSplit[] splits = (InputSplit[]) super.getSplits(job, numSplits); + InputSplit[] splits = super.getSplits(job, numSplits); ArrayList inputSplitShims = new ArrayList(); for (int pos = 0; pos < splits.length; pos++) { @@ -359,10 +372,12 @@ public void createPool(JobConf conf, PathFilter... filters) { return inputSplitShims.toArray(new InputSplitShim[inputSplitShims.size()]); } + @Override public InputSplitShim getInputSplitShim() throws IOException { return new InputSplitShim(); } + @Override public RecordReader getRecordReader(JobConf job, HadoopShims.InputSplitShim split, Reporter reporter, Class> rrClass) @@ -373,6 +388,7 @@ public RecordReader getRecordReader(JobConf job, HadoopShims.InputSplitShim spli } + @Override public String getInputFormatClassName() { return "org.apache.hadoop.hive.ql.io.CombineHiveInputFormat"; } @@ -401,6 +417,7 @@ public int createHadoopArchive(Configuration conf, Path sourceDir, Path destDir, * the archive as compared to the full path in case of earlier versions. * See this api in Hadoop20Shims for comparison. */ + @Override public URI getHarUri(URI original, URI base, URI originalBase) throws URISyntaxException { URI relative = originalBase.relativize(original); @@ -431,6 +448,7 @@ public void commitTask(TaskAttemptContext taskContext) { } public void abortTask(TaskAttemptContext taskContext) { } } + @Override public void prepareJobOutput(JobConf conf) { conf.setOutputCommitter(NullOutputCommitter.class); @@ -686,4 +704,58 @@ public void checkFileAccess(FileSystem fs, FileStatus stat, FsAction action) throws IOException, AccessControlException, Exception { DefaultFileAccess.checkFileAccess(fs, stat, action); } + + @Override + public void setZookeeperClientKerberosJaasConfig(String principal, String keyTabFile) throws IOException { + // ZooKeeper property name to pick the correct JAAS conf section + final String SASL_LOGIN_CONTEXT_NAME = "HiveZooKeeperClient"; + System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, SASL_LOGIN_CONTEXT_NAME); + + principal = getResolvedPrincipal(principal); + JaasConfiguration jaasConf = new JaasConfiguration(SASL_LOGIN_CONTEXT_NAME, principal, keyTabFile); + + // Install the Configuration in the runtime. + javax.security.auth.login.Configuration.setConfiguration(jaasConf); + } + + /** + * A JAAS configuration for ZooKeeper clients intended to use for SASL + * Kerberos. + */ + private static class JaasConfiguration extends javax.security.auth.login.Configuration { + // Current installed Configuration + private final javax.security.auth.login.Configuration baseConfig = javax.security.auth.login.Configuration + .getConfiguration(); + private final String loginContextName; + private final String principal; + private final String keyTabFile; + + public JaasConfiguration(String hiveLoginContextName, String principal, String keyTabFile) { + this.loginContextName = hiveLoginContextName; + this.principal = principal; + this.keyTabFile = keyTabFile; + } + + @Override + public AppConfigurationEntry[] getAppConfigurationEntry(String appName) { + if (loginContextName.equals(appName)) { + Map krbOptions = new HashMap(); + krbOptions.put("doNotPrompt", "true"); + krbOptions.put("storeKey", "true"); + krbOptions.put("useKeyTab", "true"); + krbOptions.put("principal", principal); + krbOptions.put("keyTab", keyTabFile); + krbOptions.put("refreshKrb5Config", "true"); + AppConfigurationEntry hiveZooKeeperClientEntry = new AppConfigurationEntry( + KerberosUtil.getKrb5LoginModuleName(), LoginModuleControlFlag.REQUIRED, krbOptions); + return new AppConfigurationEntry[] { hiveZooKeeperClientEntry }; + } + // Try the base config + if (baseConfig != null) { + return baseConfig.getAppConfigurationEntry(appName); + } + return null; + } + } + } diff --git a/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/DBTokenStore.java b/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/DBTokenStore.java index 0bb2763..d86c5cb 100644 --- a/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/DBTokenStore.java +++ b/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/DBTokenStore.java @@ -25,6 +25,7 @@ import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport; @@ -111,7 +112,7 @@ public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) throws Tok private Object rawStore; @Override - public void setStore(Object rawStore) throws TokenStoreException { + public void init(Object rawStore, ServerMode smode) throws TokenStoreException { this.rawStore = rawStore; } @@ -148,5 +149,4 @@ public void close() throws IOException { // No-op. } - } diff --git a/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java b/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java index f3c2e48..867b4ed 100644 --- a/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java +++ b/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java @@ -21,6 +21,7 @@ import java.util.List; import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; /** @@ -108,6 +109,10 @@ DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) */ List getAllDelegationTokenIdentifiers() throws TokenStoreException; - void setStore(Object hmsHandler) throws TokenStoreException; + /** + * @param hmsHandler ObjectStore used by DBTokenStore + * @param smode Indicate whether this is a metastore or hiveserver2 token store + */ + void init(Object hmsHandler, ServerMode smode); } diff --git a/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java b/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java index 56735d8..624ac6b 100644 --- a/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java +++ b/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java @@ -308,6 +308,10 @@ static String encodeIdentifier(byte[] identifier) { "hive.cluster.delegation.token.store.class"; public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR = "hive.cluster.delegation.token.store.zookeeper.connectString"; + // alternate connect string specification configuration + public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE = + "hive.zookeeper.quorum"; + public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS = "hive.cluster.delegation.token.store.zookeeper.connectTimeoutMillis"; public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE = @@ -315,7 +319,7 @@ static String encodeIdentifier(byte[] identifier) { public static final String DELEGATION_TOKEN_STORE_ZK_ACL = "hive.cluster.delegation.token.store.zookeeper.acl"; public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT = - "/hive/cluster/delegation"; + "/hivedelegation"; public Server() throws TTransportException { try { @@ -417,7 +421,7 @@ protected DelegationTokenStore getTokenStore(Configuration conf) } @Override - public void startDelegationTokenSecretManager(Configuration conf, Object rawStore) + public void startDelegationTokenSecretManager(Configuration conf, Object rawStore, ServerMode smode) throws IOException{ long secretKeyInterval = conf.getLong(DELEGATION_KEY_UPDATE_INTERVAL_KEY, @@ -430,7 +434,7 @@ public void startDelegationTokenSecretManager(Configuration conf, Object rawStor DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT); DelegationTokenStore dts = getTokenStore(conf); - dts.setStore(rawStore); + dts.init(rawStore, smode); secretManager = new TokenStoreDelegationTokenSecretManager(secretKeyInterval, tokenMaxLifetime, tokenRenewInterval, diff --git a/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java b/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java index 9908aa4..cf60b7c 100644 --- a/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java +++ b/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; /** @@ -108,8 +109,7 @@ public void close() throws IOException { } @Override - public void setStore(Object hmsHandler) throws TokenStoreException { + public void init(Object hmsHandler, ServerMode smode) throws TokenStoreException { // no-op } - } diff --git a/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java b/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java index 4ccf895..8146d51 100644 --- a/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java +++ b/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java @@ -265,6 +265,7 @@ protected void removeExpiredTokens() { /** * Extension of rollMasterKey to remove expired keys from store. + * * @throws IOException */ protected void rollMasterKeyExt() throws IOException { @@ -273,18 +274,21 @@ protected void rollMasterKeyExt() throws IOException { HiveDelegationTokenSupport.rollMasterKey(TokenStoreDelegationTokenSecretManager.this); List keysAfterRoll = Arrays.asList(getAllKeys()); for (DelegationKey key : keysAfterRoll) { - keys.remove(key.getKeyId()); - if (key.getKeyId() == currentKeyId) { - tokenStore.updateMasterKey(currentKeyId, encodeWritable(key)); - } + keys.remove(key.getKeyId()); + if (key.getKeyId() == currentKeyId) { + tokenStore.updateMasterKey(currentKeyId, encodeWritable(key)); + } } for (DelegationKey expiredKey : keys.values()) { LOGGER.info("Removing expired key id={}", expiredKey.getKeyId()); - tokenStore.removeMasterKey(expiredKey.getKeyId()); + try { + tokenStore.removeMasterKey(expiredKey.getKeyId()); + } catch (Exception e) { + LOGGER.error("Error removing expired key id={}", expiredKey.getKeyId(), e); + } } } - /** * Cloned from {@link AbstractDelegationTokenSecretManager} to deal with private access * restriction (there would not be an need to clone the remove thread if the remove logic was diff --git a/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java b/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java index 8683496..16a52e4 100644 --- a/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java +++ b/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java @@ -20,24 +20,28 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import org.apache.commons.lang.StringUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.ACLProvider; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.ZooKeeper.States; +import org.apache.zookeeper.ZooDefs.Perms; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; import org.slf4j.Logger; @@ -56,26 +60,35 @@ private static final String NODE_TOKENS = "/tokens"; private String rootNode = ""; - private volatile ZooKeeper zkSession; + private volatile CuratorFramework zkSession; private String zkConnectString; private final int zkSessionTimeout = 3000; - private long connectTimeoutMillis = -1; - private List newNodeAcl = Ids.OPEN_ACL_UNSAFE; - - private class ZooKeeperWatcher implements Watcher { - public void process(org.apache.zookeeper.WatchedEvent event) { - LOGGER.info(event.toString()); - if (event.getState() == Watcher.Event.KeeperState.Expired) { - LOGGER.warn("ZooKeeper session expired, discarding connection"); - try { - zkSession.close(); - } catch (Throwable e) { - LOGGER.warn("Failed to close connection on expired session", e); - } - } + private int connectTimeoutMillis = -1; + private List newNodeAcl = Arrays.asList(new ACL(Perms.ALL, Ids.AUTH_IDS)); + + /** + * ACLProvider permissions will be used in case parent dirs need to be created + */ + private final ACLProvider aclDefaultProvider = new ACLProvider() { + + @Override + public List getDefaultAcl() { + return newNodeAcl; } - } + @Override + public List getAclForPath(String path) { + return getDefaultAcl(); + } + }; + + + private ServerMode serverMode; + + private final String WHEN_ZK_DSTORE_MSG = "when zookeeper based delegation token storage is enabled" + + "(hive.cluster.delegation.token.store.class=" + ZooKeeperTokenStore.class.getName() + ")"; + + private Configuration conf; /** * Default constructor for dynamic instantiation w/ Configurable @@ -84,93 +97,74 @@ public void process(org.apache.zookeeper.WatchedEvent event) { protected ZooKeeperTokenStore() { } - public ZooKeeperTokenStore(String hostPort) { - this.zkConnectString = hostPort; - init(); - } - - private ZooKeeper getSession() { - if (zkSession == null || zkSession.getState() == States.CLOSED) { - synchronized (this) { - if (zkSession == null || zkSession.getState() == States.CLOSED) { - try { - zkSession = createConnectedClient(this.zkConnectString, this.zkSessionTimeout, - this.connectTimeoutMillis, new ZooKeeperWatcher()); - } catch (IOException ex) { - throw new TokenStoreException("Token store error.", ex); - } - } + private CuratorFramework getSession() { + if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) { + synchronized (this) { + if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) { + zkSession = CuratorFrameworkFactory.builder().connectString(zkConnectString) + .sessionTimeoutMs(zkSessionTimeout).connectionTimeoutMs(connectTimeoutMillis) + .aclProvider(aclDefaultProvider) + .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); + zkSession.start(); } + } } return zkSession; } - /** - * Create a ZooKeeper session that is in connected state. - * - * @param connectString ZooKeeper connect String - * @param sessionTimeout ZooKeeper session timeout - * @param connectTimeout milliseconds to wait for connection, 0 or negative value means no wait - * @param watchers - * @return - * @throws InterruptedException - * @throws IOException - */ - public static ZooKeeper createConnectedClient(String connectString, - int sessionTimeout, long connectTimeout, final Watcher... watchers) - throws IOException { - final CountDownLatch connected = new CountDownLatch(1); - Watcher connectWatcher = new Watcher() { - @Override - public void process(WatchedEvent event) { - switch (event.getState()) { - case SyncConnected: - connected.countDown(); - break; - } - for (Watcher w : watchers) { - w.process(event); - } - } - }; - ZooKeeper zk = new ZooKeeper(connectString, sessionTimeout, connectWatcher); - if (connectTimeout > 0) { - try { - if (!connected.await(connectTimeout, TimeUnit.MILLISECONDS)) { - zk.close(); - throw new IOException("Timeout waiting for connection after " - + connectTimeout + "ms"); - } - } catch (InterruptedException e) { - throw new IOException("Error waiting for connection.", e); - } + private void setupJAASConfig(Configuration conf) throws IOException { + if (!UserGroupInformation.getLoginUser().isFromKeytab()) { + // The process has not logged in using keytab + // this should be a test mode, can't use keytab to authenticate + // with zookeeper. + LOGGER.warn("Login is not from keytab"); + return; } - return zk; + + String principal; + String keytab; + switch (serverMode) { + case METASTORE: + principal = getNonEmptyConfVar(conf, "hive.metastore.kerberos.principal"); + keytab = getNonEmptyConfVar(conf, "hive.metastore.kerberos.keytab.file"); + break; + case HIVESERVER2: + principal = getNonEmptyConfVar(conf, "hive.server2.authentication.kerberos.principal"); + keytab = getNonEmptyConfVar(conf, "hive.server2.authentication.kerberos.keytab"); + break; + default: + throw new AssertionError("Unexpected server mode " + serverMode); + } + ShimLoader.getHadoopShims().setZookeeperClientKerberosJaasConfig(principal, keytab); + } + + private String getNonEmptyConfVar(Configuration conf, String param) throws IOException { + String val = conf.get(param); + if (val == null || val.trim().isEmpty()) { + throw new IOException("Configuration parameter " + param + " should be set, " + + WHEN_ZK_DSTORE_MSG); + } + return val; } /** * Create a path if it does not already exist ("mkdir -p") - * @param zk ZooKeeper session * @param path string with '/' separator * @param acl list of ACL entries - * @return - * @throws KeeperException - * @throws InterruptedException + * @throws TokenStoreException */ - public static String ensurePath(ZooKeeper zk, String path, List acl) throws KeeperException, - InterruptedException { - String[] pathComps = StringUtils.splitByWholeSeparator(path, "/"); - String currentPath = ""; - for (String pathComp : pathComps) { - currentPath += "/" + pathComp; - try { - String node = zk.create(currentPath, new byte[0], acl, - CreateMode.PERSISTENT); - LOGGER.info("Created path: " + node); - } catch (KeeperException.NodeExistsException e) { - } + public void ensurePath(String path, List acl) + throws TokenStoreException { + try { + CuratorFramework zk = getSession(); + String node = zk.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) + .withACL(acl).forPath(path); + LOGGER.info("Created path: {} ", node); + } catch (KeeperException.NodeExistsException e) { + // node already exists + } catch (Exception e) { + throw new TokenStoreException("Error creating path " + path, e); } - return currentPath; } /** @@ -234,45 +228,24 @@ public static int getPermFromString(String permString) { return acl; } - private void init() { - if (this.zkConnectString == null) { - throw new IllegalStateException("Not initialized"); - } - + private void initClientAndPaths() { if (this.zkSession != null) { - try { - this.zkSession.close(); - } catch (InterruptedException ex) { - LOGGER.warn("Failed to close existing session.", ex); - } + this.zkSession.close(); } - ZooKeeper zk = getSession(); - try { - ensurePath(zk, rootNode + NODE_KEYS, newNodeAcl); - ensurePath(zk, rootNode + NODE_TOKENS, newNodeAcl); - } catch (Exception e) { - throw new TokenStoreException("Failed to validate token path.", e); - } + ensurePath(rootNode + NODE_KEYS, newNodeAcl); + ensurePath(rootNode + NODE_TOKENS, newNodeAcl); + } catch (TokenStoreException e) { + throw e; + } } @Override public void setConf(Configuration conf) { if (conf == null) { - throw new IllegalArgumentException("conf is null"); + throw new IllegalArgumentException("conf is null"); } - this.zkConnectString = conf.get( - HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null); - this.connectTimeoutMillis = conf.getLong( - HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS, -1); - this.rootNode = conf.get( - HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE, - HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT); - String csv = conf.get(HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ACL, null); - if (StringUtils.isNotBlank(csv)) { - this.newNodeAcl = parseACLs(csv); - } - init(); + this.conf = conf; } @Override @@ -280,15 +253,18 @@ public Configuration getConf() { return null; // not required } - private Map getAllKeys() throws KeeperException, - InterruptedException { + private Map getAllKeys() throws KeeperException, InterruptedException { String masterKeyNode = rootNode + NODE_KEYS; - ZooKeeper zk = getSession(); - List nodes = zk.getChildren(masterKeyNode, false); + + // get children of key node + List nodes = zkGetChildren(masterKeyNode); + + // read each child node, add to results Map result = new HashMap(); for (String node : nodes) { - byte[] data = zk.getData(masterKeyNode + "/" + node, false, null); + String nodePath = masterKeyNode + "/" + node; + byte[] data = zkGetData(nodePath); if (data != null) { result.put(getSeq(node), data); } @@ -296,6 +272,26 @@ public Configuration getConf() { return result; } + private List zkGetChildren(String path) { + CuratorFramework zk = getSession(); + try { + return zk.getChildren().forPath(path); + } catch (Exception e) { + throw new TokenStoreException("Error getting children for " + path, e); + } + } + + private byte[] zkGetData(String nodePath) { + CuratorFramework zk = getSession(); + try { + return zk.getData().forPath(nodePath); + } catch (KeeperException.NoNodeException ex) { + return null; + } catch (Exception e) { + throw new TokenStoreException("Error reading " + nodePath, e); + } + } + private int getSeq(String path) { String[] pathComps = path.split("/"); return Integer.parseInt(pathComps[pathComps.length-1]); @@ -303,44 +299,45 @@ private int getSeq(String path) { @Override public int addMasterKey(String s) { + String keysPath = rootNode + NODE_KEYS + "/"; + CuratorFramework zk = getSession(); + String newNode; try { - ZooKeeper zk = getSession(); - String newNode = zk.create(rootNode + NODE_KEYS + "/", s.getBytes(), newNodeAcl, - CreateMode.PERSISTENT_SEQUENTIAL); - LOGGER.info("Added key {}", newNode); - return getSeq(newNode); - } catch (KeeperException ex) { - throw new TokenStoreException(ex); - } catch (InterruptedException ex) { - throw new TokenStoreException(ex); + newNode = zk.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).withACL(newNodeAcl) + .forPath(keysPath, s.getBytes()); + } catch (Exception e) { + throw new TokenStoreException("Error creating new node with path " + keysPath, e); } + LOGGER.info("Added key {}", newNode); + return getSeq(newNode); } @Override public void updateMasterKey(int keySeq, String s) { + CuratorFramework zk = getSession(); + String keyPath = rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq); try { - ZooKeeper zk = getSession(); - zk.setData(rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq), s.getBytes(), - -1); - } catch (KeeperException ex) { - throw new TokenStoreException(ex); - } catch (InterruptedException ex) { - throw new TokenStoreException(ex); + zk.setData().forPath(keyPath, s.getBytes()); + } catch (Exception e) { + throw new TokenStoreException("Error setting data in " + keyPath, e); } } @Override public boolean removeMasterKey(int keySeq) { + String keyPath = rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq); + zkDelete(keyPath); + return true; + } + + private void zkDelete(String path) { + CuratorFramework zk = getSession(); try { - ZooKeeper zk = getSession(); - zk.delete(rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq), -1); - return true; + zk.delete().forPath(path); } catch (KeeperException.NoNodeException ex) { - return false; - } catch (KeeperException ex) { - throw new TokenStoreException(ex); - } catch (InterruptedException ex) { - throw new TokenStoreException(ex); + // already deleted + } catch (Exception e) { + throw new TokenStoreException("Error deleting " + path, e); } } @@ -374,67 +371,42 @@ private String getTokenPath(DelegationTokenIdentifier tokenIdentifier) { @Override public boolean addToken(DelegationTokenIdentifier tokenIdentifier, DelegationTokenInformation token) { + byte[] tokenBytes = HiveDelegationTokenSupport.encodeDelegationTokenInformation(token); + String tokenPath = getTokenPath(tokenIdentifier); + CuratorFramework zk = getSession(); + String newNode; try { - ZooKeeper zk = getSession(); - byte[] tokenBytes = HiveDelegationTokenSupport.encodeDelegationTokenInformation(token); - String newNode = zk.create(getTokenPath(tokenIdentifier), - tokenBytes, newNodeAcl, CreateMode.PERSISTENT); - LOGGER.info("Added token: {}", newNode); - return true; - } catch (KeeperException.NodeExistsException ex) { - return false; - } catch (KeeperException ex) { - throw new TokenStoreException(ex); - } catch (InterruptedException ex) { - throw new TokenStoreException(ex); + newNode = zk.create().withMode(CreateMode.PERSISTENT).withACL(newNodeAcl) + .forPath(tokenPath, tokenBytes); + } catch (Exception e) { + throw new TokenStoreException("Error creating new node with path " + tokenPath, e); } + + LOGGER.info("Added token: {}", newNode); + return true; } @Override public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) { - try { - ZooKeeper zk = getSession(); - zk.delete(getTokenPath(tokenIdentifier), -1); - return true; - } catch (KeeperException.NoNodeException ex) { - return false; - } catch (KeeperException ex) { - throw new TokenStoreException(ex); - } catch (InterruptedException ex) { - throw new TokenStoreException(ex); - } + String tokenPath = getTokenPath(tokenIdentifier); + zkDelete(tokenPath); + return true; } @Override public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) { + byte[] tokenBytes = zkGetData(getTokenPath(tokenIdentifier)); try { - ZooKeeper zk = getSession(); - byte[] tokenBytes = zk.getData(getTokenPath(tokenIdentifier), false, null); - try { - return HiveDelegationTokenSupport.decodeDelegationTokenInformation(tokenBytes); - } catch (Exception ex) { - throw new TokenStoreException("Failed to decode token", ex); - } - } catch (KeeperException.NoNodeException ex) { - return null; - } catch (KeeperException ex) { - throw new TokenStoreException(ex); - } catch (InterruptedException ex) { - throw new TokenStoreException(ex); + return HiveDelegationTokenSupport.decodeDelegationTokenInformation(tokenBytes); + } catch (Exception ex) { + throw new TokenStoreException("Failed to decode token", ex); } } @Override public List getAllDelegationTokenIdentifiers() { String containerNode = rootNode + NODE_TOKENS; - final List nodes; - try { - nodes = getSession().getChildren(containerNode, false); - } catch (KeeperException ex) { - throw new TokenStoreException(ex); - } catch (InterruptedException ex) { - throw new TokenStoreException(ex); - } + final List nodes = zkGetChildren(containerNode); List result = new java.util.ArrayList( nodes.size()); for (String node : nodes) { @@ -452,17 +424,44 @@ public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdenti @Override public void close() throws IOException { if (this.zkSession != null) { - try { - this.zkSession.close(); - } catch (InterruptedException ex) { - LOGGER.warn("Failed to close existing session.", ex); - } + this.zkSession.close(); } } @Override - public void setStore(Object hmsHandler) throws TokenStoreException { - // no-op. + public void init(Object objectStore, ServerMode smode) { + this.serverMode = smode; + zkConnectString = conf.get( + HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null); + if (zkConnectString == null || zkConnectString.trim().isEmpty()) { + // try alternate config param + zkConnectString = conf.get( + HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE, null); + if (zkConnectString == null || zkConnectString.trim().isEmpty()) { + throw new IllegalArgumentException("Zookeeper connect string has to be specifed through " + + "either " + HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR + + " or " + + HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE + + WHEN_ZK_DSTORE_MSG); + } + } + connectTimeoutMillis = conf.getInt( + HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS, -1); + String aclStr = conf.get(HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ACL, null); + if (StringUtils.isNotBlank(aclStr)) { + this.newNodeAcl = parseACLs(aclStr); + } + rootNode = conf.get(HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE, + HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT) + serverMode; + + try { + // Install the JAAS Configuration for the runtime + setupJAASConfig(conf); + } catch (IOException e) { + throw new TokenStoreException("Error setting up JAAS configuration for zookeeper client " + + e.getMessage(), e); + } + initClientAndPaths(); } } diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index 9850405..f079cf8 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -61,7 +61,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; -import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; @@ -240,6 +239,15 @@ public URI getHarUri(URI original, URI base, URI originalBase) public String getTokenStrForm(String tokenSignature) throws IOException; /** + * Dynamically sets up the JAAS configuration that uses kerberos + * @param principal + * @param keyTabFile + * @throws IOException + */ + public void setZookeeperClientKerberosJaasConfig(String principal, String keyTabFile) + throws IOException; + + /** * Add a delegation token to the given ugi * @param ugi * @param tokenStr diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java index d0d6c7b..d011c67 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java @@ -99,12 +99,15 @@ public abstract TTransport createClientTransport( } public static abstract class Server { + public enum ServerMode { + HIVESERVER2, METASTORE + }; public abstract TTransportFactory createTransportFactory(Map saslProps) throws TTransportException; public abstract TProcessor wrapProcessor(TProcessor processor); public abstract TProcessor wrapNonAssumingProcessor(TProcessor processor); public abstract InetAddress getRemoteAddress(); public abstract void startDelegationTokenSecretManager(Configuration conf, - Object hmsHandler) throws IOException; + Object hmsHandler, ServerMode smode) throws IOException; public abstract String getDelegationToken(String owner, String renewer) throws IOException, InterruptedException; public abstract String getDelegationTokenWithService(String owner, String renewer, String service)