diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 7d8e5bc..c94f007 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -413,13 +413,18 @@
"The delegation token store implementation. Set to org.apache.hadoop.hive.thrift.ZooKeeperTokenStore for load-balanced cluster."),
METASTORE_CLUSTER_DELEGATION_TOKEN_STORE_ZK_CONNECTSTR(
"hive.cluster.delegation.token.store.zookeeper.connectString", "",
- "The ZooKeeper token store connect string."),
+ "The ZooKeeper token store connect string. If this variable is not set, the value set\n" +
+ "in hive.zookeeper.quorum will be used."),
METASTORE_CLUSTER_DELEGATION_TOKEN_STORE_ZK_ZNODE(
- "hive.cluster.delegation.token.store.zookeeper.znode", "/hive/cluster/delegation",
- "The root path for token store data."),
+ "hive.cluster.delegation.token.store.zookeeper.znode", "/hivedelegation",
+ "The root path for token store data. Note that this is used by both HiveServer2 and\n" +
+ "MetaStore to store delegation Token. One directory gets created for each of them.\n" +
+ "The final directory names would have the servername appended to it (HIVESERVER2,\n" +
+ "METASTORE)."),
METASTORE_CLUSTER_DELEGATION_TOKEN_STORE_ZK_ACL(
"hive.cluster.delegation.token.store.zookeeper.acl", "",
- "ACL for token store entries. List comma separated all server principals for the cluster."),
+ "ACL for token store entries. List comma separated all server principals for the cluster.\n" +
+ "Defaults to all permissions for the hiveserver2/metastore process user."),
METASTORE_CACHE_PINOBJTYPES("hive.metastore.cache.pinobjtypes", "Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order",
"List of comma separated metastore object types that should be pinned in the cache"),
METASTORE_CONNECTION_POOLING_TYPE("datanucleus.connectionPoolingType", "BONECP",
@@ -1246,10 +1251,13 @@
// Zookeeper related configs
HIVE_ZOOKEEPER_QUORUM("hive.zookeeper.quorum", "",
- "List of ZooKeeper servers to talk to. This is needed for: " +
- "1. Read/write locks - when hive.lock.manager is set to " +
- "org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager, " +
- "2. When HiveServer2 supports service discovery via Zookeeper."),
+ "List of ZooKeeper servers to talk to. This is needed for: \n" +
+ "1. Read/write locks - when hive.lock.manager is set to \n" +
+ "org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager, \n" +
+ "2. When HiveServer2 supports service discovery via Zookeeper.\n" +
+ "3. For delegation token storage if zookeeper store is used, if\n" +
+ "hive.cluster.delegation.token.store.zookeeper.connectString is not set"),
+
HIVE_ZOOKEEPER_CLIENT_PORT("hive.zookeeper.client.port", "2181",
"The port of ZooKeeper servers to talk to. " +
"If the list of Zookeeper servers specified in hive.zookeeper.quorum," +
diff --git a/hcatalog/webhcat/svr/pom.xml b/hcatalog/webhcat/svr/pom.xml
index 6065748..4969708 100644
--- a/hcatalog/webhcat/svr/pom.xml
+++ b/hcatalog/webhcat/svr/pom.xml
@@ -72,14 +72,11 @@
commons-exec
${commons-exec.version}
-
-
-
+
org.apache.curator
curator-framework
${curator.version}
-
org.apache.zookeeper
zookeeper
@@ -203,37 +200,6 @@
-
- org.apache.maven.plugins
- maven-shade-plugin
-
-
- include-curator
-
- package
-
- shade
-
-
- true
-
-
- org.apache.curator
-
-
-
-
- org.apache.curator
- webhcat.org.apache.curator
-
-
-
-
-
-
diff --git a/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java b/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java
index b81942a..b2bdafa 100644
--- a/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java
+++ b/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java
@@ -41,6 +41,7 @@
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
@@ -98,9 +99,9 @@ protected DelegationTokenStore getTokenStore(Configuration conf) throws IOExcept
}
@Override
- public void startDelegationTokenSecretManager(Configuration conf, Object hms)
+ public void startDelegationTokenSecretManager(Configuration conf, Object hms, ServerMode sm)
throws IOException{
- super.startDelegationTokenSecretManager(conf, hms);
+ super.startDelegationTokenSecretManager(conf, hms, sm);
isMetastoreTokenManagerInited = true;
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestDBTokenStore.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestDBTokenStore.java
index 8860d30..0b61a62 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestDBTokenStore.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestDBTokenStore.java
@@ -37,7 +37,7 @@
public void testDBTokenStore() throws TokenStoreException, MetaException, IOException {
DelegationTokenStore ts = new DBTokenStore();
- ts.setStore(new HMSHandler("Test handler").getMS());
+ ts.init(new HMSHandler("Test handler").getMS(), null);
assertEquals(0, ts.getMasterKeys().length);
assertEquals(false,ts.removeMasterKey(-1));
try{
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestZooKeeperTokenStore.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestZooKeeperTokenStore.java
index 83a80b4..26d4d97 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestZooKeeperTokenStore.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestZooKeeperTokenStore.java
@@ -24,25 +24,28 @@
import junit.framework.TestCase;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
import org.junit.Assert;
public class TestZooKeeperTokenStore extends TestCase {
private MiniZooKeeperCluster zkCluster = null;
- private ZooKeeper zkClient = null;
+ private CuratorFramework zkClient = null;
private int zkPort = -1;
private ZooKeeperTokenStore ts;
// connect timeout large enough for slower test environments
private final int connectTimeoutMillis = 30000;
+ private final int sessionTimeoutMillis = 3000;
@Override
protected void setUp() throws Exception {
@@ -53,8 +56,10 @@ protected void setUp() throws Exception {
this.zkCluster = new MiniZooKeeperCluster();
this.zkPort = this.zkCluster.startup(zkDataDir);
- this.zkClient = ZooKeeperTokenStore.createConnectedClient("localhost:" + zkPort, 3000,
- connectTimeoutMillis);
+ this.zkClient = CuratorFrameworkFactory.builder().connectString("localhost:" + zkPort)
+ .sessionTimeoutMs(sessionTimeoutMillis).connectionTimeoutMs(connectTimeoutMillis)
+ .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
+ this.zkClient.start();
}
@Override
@@ -84,14 +89,16 @@ private Configuration createConf(String zkPath) {
public void testTokenStorage() throws Exception {
String ZK_PATH = "/zktokenstore-testTokenStorage";
ts = new ZooKeeperTokenStore();
- ts.setConf(createConf(ZK_PATH));
+ Configuration conf = createConf(ZK_PATH);
+ conf.set(HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ACL, "world:anyone:cdrwa");
+ ts.setConf(conf);
+ ts.init(null, ServerMode.METASTORE);
+
+ String metastore_zk_path = ZK_PATH + ServerMode.METASTORE;
int keySeq = ts.addMasterKey("key1Data");
- byte[] keyBytes = zkClient.getData(
- ZK_PATH
- + "/keys/"
- + String.format(ZooKeeperTokenStore.ZK_SEQ_FORMAT,
- keySeq), false, null);
+ byte[] keyBytes = zkClient.getData().forPath(
+ metastore_zk_path + "/keys/" + String.format(ZooKeeperTokenStore.ZK_SEQ_FORMAT, keySeq));
assertNotNull(keyBytes);
assertEquals(new String(keyBytes), "key1Data");
@@ -116,8 +123,7 @@ public void testTokenStorage() throws Exception {
HiveDelegationTokenSupport
.encodeDelegationTokenInformation(tokenInfoRead));
- List allIds = ts
- .getAllDelegationTokenIdentifiers();
+ List allIds = ts.getAllDelegationTokenIdentifiers();
assertEquals(1, allIds.size());
Assert.assertEquals(TokenStoreDelegationTokenSecretManager
.encodeWritable(tokenId),
@@ -138,10 +144,10 @@ public void testAclNoAuth() throws Exception {
ts = new ZooKeeperTokenStore();
try {
ts.setConf(conf);
+ ts.init(null, ServerMode.METASTORE);
fail("expected ACL exception");
} catch (DelegationTokenStore.TokenStoreException e) {
- assertEquals(e.getCause().getClass(),
- KeeperException.NoAuthException.class);
+ assertEquals(KeeperException.NoAuthException.class, e.getCause().getClass());
}
}
@@ -159,10 +165,10 @@ public void testAclInvalid() throws Exception {
ts = new ZooKeeperTokenStore();
try {
ts.setConf(conf);
+ ts.init(null, ServerMode.METASTORE);
fail("expected ACL exception");
} catch (DelegationTokenStore.TokenStoreException e) {
- assertEquals(e.getCause().getClass(),
- KeeperException.InvalidACLException.class);
+ assertEquals(KeeperException.InvalidACLException.class, e.getCause().getClass());
}
}
@@ -171,10 +177,11 @@ public void testAclPositive() throws Exception {
Configuration conf = createConf(ZK_PATH);
conf.set(
HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ACL,
- "world:anyone:cdrwa,ip:127.0.0.1:cdrwa");
+ "ip:127.0.0.1:cdrwa,world:anyone:cdrwa");
ts = new ZooKeeperTokenStore();
ts.setConf(conf);
- List acl = zkClient.getACL(ZK_PATH, new Stat());
+ ts.init(null, ServerMode.METASTORE);
+ List acl = zkClient.getACL().forPath(ZK_PATH + ServerMode.METASTORE);
assertEquals(2, acl.size());
}
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 3cf43a9..e3240ca 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -149,8 +149,8 @@
import org.apache.hadoop.hive.metastore.api.UnlockRequest;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.events.AddIndexEvent;
-import org.apache.hadoop.hive.metastore.events.AlterIndexEvent;
import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterIndexEvent;
import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
import org.apache.hadoop.hive.metastore.events.ConfigChangeEvent;
@@ -192,6 +192,7 @@
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge;
+import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode;
import org.apache.hadoop.hive.thrift.TUGIContainingTransport;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
@@ -5795,7 +5796,7 @@ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge,
conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_KEYTAB_FILE),
conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL));
// start delegation token manager
- saslServer.startDelegationTokenSecretManager(conf, baseHandler.getMS());
+ saslServer.startDelegationTokenSecretManager(conf, baseHandler.getMS(), ServerMode.METASTORE);
transFactory = saslServer.createTransportFactory(
MetaStoreUtils.getMetaStoreSaslProperties(conf));
processor = saslServer.wrapProcessor(
diff --git a/pom.xml b/pom.xml
index c694980..825fa7f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -161,7 +161,7 @@
3.4.5
1.1
2.4.0
- 2.5.0
+ 2.6.0
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/util/ZooKeeperHiveHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/util/ZooKeeperHiveHelper.java
index 11dd962..29d0531 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/util/ZooKeeperHiveHelper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/util/ZooKeeperHiveHelper.java
@@ -18,18 +18,12 @@
package org.apache.hadoop.hive.ql.util;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-
-import javax.security.auth.login.AppConfigurationEntry;
-import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.security.authentication.util.KerberosUtil;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
@@ -39,7 +33,7 @@
public class ZooKeeperHiveHelper {
public static final Log LOG = LogFactory.getLog(ZooKeeperHiveHelper.class.getName());
public static final String ZOOKEEPER_PATH_SEPARATOR = "/";
- public static final String SASL_LOGIN_CONTEXT_NAME = "HiveZooKeeperClient";
+
/**
* Get the ensemble server addresses from the configuration. The format is: host1:port,
* host2:port..
@@ -97,59 +91,9 @@ public static String createPathRecursively(ZooKeeper zooKeeperClient, String pat
* A no-op watcher class
*/
public static class DummyWatcher implements Watcher {
+ @Override
public void process(org.apache.zookeeper.WatchedEvent event) {
}
}
- /**
- * Dynamically sets up the JAAS configuration
- * @param principal
- * @param keyTabFile
- */
- public static void setUpJaasConfiguration(String principal, String keyTabFile) {
- JaasConfiguration jaasConf =
- new JaasConfiguration(ZooKeeperHiveHelper.SASL_LOGIN_CONTEXT_NAME, principal, keyTabFile);
- // Install the Configuration in the runtime.
- javax.security.auth.login.Configuration.setConfiguration(jaasConf);
- }
-
- /**
- * A JAAS configuration for ZooKeeper clients intended to use for SASL Kerberos.
- */
- private static class JaasConfiguration extends javax.security.auth.login.Configuration {
- // Current installed Configuration
- private javax.security.auth.login.Configuration baseConfig =
- javax.security.auth.login.Configuration.getConfiguration();
- private final String loginContextName;
- private final String principal;
- private final String keyTabFile;
-
- public JaasConfiguration(String hiveLoginContextName, String principal, String keyTabFile) {
- this.loginContextName = hiveLoginContextName;
- this.principal = principal;
- this.keyTabFile = keyTabFile;
- }
-
- @Override
- public AppConfigurationEntry[] getAppConfigurationEntry(String appName) {
- if (loginContextName.equals(appName)) {
- Map krbOptions = new HashMap();
- krbOptions.put("doNotPrompt", "true");
- krbOptions.put("storeKey", "true");
- krbOptions.put("useKeyTab", "true");
- krbOptions.put("principal", principal);
- krbOptions.put("keyTab", keyTabFile);
- krbOptions.put("refreshKrb5Config", "true");
- AppConfigurationEntry hiveZooKeeperClientEntry =
- new AppConfigurationEntry(KerberosUtil.getKrb5LoginModuleName(),
- LoginModuleControlFlag.REQUIRED, krbOptions);
- return new AppConfigurationEntry[] { hiveZooKeeperClientEntry };
- }
- // Try the base config
- if (baseConfig != null) {
- return baseConfig.getAppConfigurationEntry(appName);
- }
- return null;
- }
- }
}
diff --git a/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java b/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java
index ab34d2d..72bc724 100644
--- a/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java
+++ b/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java
@@ -32,6 +32,7 @@
import org.apache.hadoop.hive.shims.HadoopShims.KerberosNameShim;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge;
+import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.service.cli.HiveSQLException;
import org.apache.hive.service.cli.thrift.ThriftCLIService;
@@ -98,7 +99,7 @@ public HiveAuthFactory(HiveConf conf) throws TTransportException {
conf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL));
// start delegation token manager
try {
- saslServer.startDelegationTokenSecretManager(conf, null);
+ saslServer.startDelegationTokenSecretManager(conf, null, ServerMode.HIVESERVER2);
} catch (IOException e) {
throw new TTransportException("Failed to start token manager", e);
}
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index 0aab3f9..064b81c 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -53,7 +53,6 @@
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.client.ZooKeeperSaslClient;
import org.apache.zookeeper.data.ACL;
/**
@@ -178,23 +177,19 @@ private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception {
*/
private void setUpAuthAndAcls(HiveConf hiveConf, List nodeAcls) throws Exception {
if (ShimLoader.getHadoopShims().isSecurityEnabled()) {
- String principal =
- ShimLoader.getHadoopShims().getResolvedPrincipal(
- hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL));
- String keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB);
+ String principal = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL);
if (principal.isEmpty()) {
throw new IOException(
"HiveServer2 Kerberos principal is empty");
}
+ String keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB);
if (keyTabFile.isEmpty()) {
throw new IOException(
"HiveServer2 Kerberos keytab is empty");
}
- // ZooKeeper property name to pick the correct JAAS conf section
- System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY,
- ZooKeeperHiveHelper.SASL_LOGIN_CONTEXT_NAME);
+
// Install the JAAS Configuration for the runtime
- ZooKeeperHiveHelper.setUpJaasConfiguration(principal, keyTabFile);
+ ShimLoader.getHadoopShims().setZookeeperClientJaasConfig(principal, keyTabFile);
// Read all to the world
nodeAcls.addAll(Ids.READ_ACL_UNSAFE);
// Create/Delete/Write/Admin to the authenticated user
@@ -212,6 +207,7 @@ private void setUpAuthAndAcls(HiveConf hiveConf, List nodeAcls) throws Exce
* sessions at the time of receiving a 'NodeDeleted' notification from ZooKeeper.
*/
private class DeRegisterWatcher implements Watcher {
+ @Override
public void process(WatchedEvent event) {
if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
HiveServer2.this.setRegisteredWithZooKeeper(false);
@@ -389,7 +385,7 @@ public static void main(String[] args) {
private final Options options = new Options();
private org.apache.commons.cli.CommandLine commandLine;
private final String serverName;
- private StringBuilder debugMessage = new StringBuilder();
+ private final StringBuilder debugMessage = new StringBuilder();
@SuppressWarnings("static-access")
ServerOptionsProcessor(String serverName) {
@@ -453,7 +449,7 @@ StringBuilder getDebugMessage() {
* The response sent back from {@link ServerOptionsProcessor#parse(String[])}
*/
static class ServerOptionsProcessorResponse {
- private ServerOptionsExecutor serverOptionsExecutor;
+ private final ServerOptionsExecutor serverOptionsExecutor;
ServerOptionsProcessorResponse(ServerOptionsExecutor serverOptionsExecutor) {
this.serverOptionsExecutor = serverOptionsExecutor;
@@ -483,6 +479,7 @@ ServerOptionsExecutor getServerOptionsExecutor() {
this.serverName = serverName;
}
+ @Override
public void execute() {
new HelpFormatter().printHelp(serverName, options);
System.exit(0);
@@ -494,6 +491,7 @@ public void execute() {
* This is the default executor, when no option is specified.
*/
static class StartOptionExecutor implements ServerOptionsExecutor {
+ @Override
public void execute() {
try {
startHiveServer2();
@@ -515,6 +513,7 @@ public void execute() {
this.versionNumber = versionNumber;
}
+ @Override
public void execute() {
try {
deleteServerInstancesFromZooKeeper(versionNumber);
diff --git a/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java b/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
index d18ae44..179554a 100644
--- a/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
+++ b/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
@@ -59,7 +59,6 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
-import org.apache.hadoop.hive.shims.HadoopShims.KerberosNameShim;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.FileInputFormat;
@@ -705,7 +704,7 @@ public void setFullFileStatus(Configuration conf, HdfsFileStatus sourceStatus,
}
public class Hadoop20FileStatus implements HdfsFileStatus {
- private FileStatus fileStatus;
+ private final FileStatus fileStatus;
public Hadoop20FileStatus(FileStatus fileStatus) {
this.fileStatus = fileStatus;
}
@@ -713,6 +712,7 @@ public Hadoop20FileStatus(FileStatus fileStatus) {
public FileStatus getFileStatus() {
return fileStatus;
}
+ @Override
public void debugLog() {
if (fileStatus != null) {
LOG.debug(fileStatus.toString());
@@ -946,4 +946,9 @@ public KerberosNameShim getKerberosNameShim(String name) throws IOException {
// Not supported
return null;
}
+
+ @Override
+ public void setZookeeperClientJaasConfig(String principal, String keyTabFile) {
+ // Not supported
+ }
}
diff --git a/shims/common-secure/pom.xml b/shims/common-secure/pom.xml
index 98b5ca1..2c56f5d 100644
--- a/shims/common-secure/pom.xml
+++ b/shims/common-secure/pom.xml
@@ -74,6 +74,11 @@
${libthrift.version}
+ org.apache.curator
+ curator-framework
+ ${curator.version}
+
+
org.apache.zookeeper
zookeeper
${zookeeper.version}
diff --git a/shims/common-secure/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java b/shims/common-secure/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java
index 606f973..6d3fa93 100644
--- a/shims/common-secure/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java
+++ b/shims/common-secure/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java
@@ -29,11 +29,14 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
-import javax.security.auth.login.LoginException;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
@@ -66,6 +69,7 @@
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
@@ -73,6 +77,7 @@
import org.apache.hadoop.tools.HadoopArchives;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.zookeeper.client.ZooKeeperSaslClient;
import com.google.common.primitives.Longs;
@@ -88,6 +93,7 @@ public String unquoteHtmlChars(String item) {
return HtmlQuoting.unquoteHtmlChars(item);
}
+ @Override
public HadoopShims.CombineFileInputFormatShim getCombineFileInputFormat() {
return new CombineFileInputFormatShim() {
@Override
@@ -171,6 +177,7 @@ public void write(DataOutput out) throws IOException {
protected boolean isShrinked;
protected long shrinkedLength;
+ @Override
public boolean next(K key, V value) throws IOException {
while ((curReader == null)
@@ -183,11 +190,13 @@ public boolean next(K key, V value) throws IOException {
return true;
}
+ @Override
public K createKey() {
K newKey = curReader.createKey();
return (K)(new CombineHiveKey(newKey));
}
+ @Override
public V createValue() {
return curReader.createValue();
}
@@ -195,10 +204,12 @@ public V createValue() {
/**
* Return the amount of data processed.
*/
+ @Override
public long getPos() throws IOException {
return progress;
}
+ @Override
public void close() throws IOException {
if (curReader != null) {
curReader.close();
@@ -209,6 +220,7 @@ public void close() throws IOException {
/**
* Return progress based on the amount of data processed so far.
*/
+ @Override
public float getProgress() throws IOException {
return Math.min(1.0f, progress / (float) (split.getLength()));
}
@@ -309,6 +321,7 @@ protected boolean initNextRecordReader(K key) throws IOException {
CombineFileInputFormat
implements HadoopShims.CombineFileInputFormatShim {
+ @Override
public Path[] getInputPathsShim(JobConf conf) {
try {
return FileInputFormat.getInputPaths(conf);
@@ -339,7 +352,7 @@ public void createPool(JobConf conf, PathFilter... filters) {
super.setMaxSplitSize(minSize);
}
- InputSplit[] splits = (InputSplit[]) super.getSplits(job, numSplits);
+ InputSplit[] splits = super.getSplits(job, numSplits);
ArrayList inputSplitShims = new ArrayList();
for (int pos = 0; pos < splits.length; pos++) {
@@ -359,10 +372,12 @@ public void createPool(JobConf conf, PathFilter... filters) {
return inputSplitShims.toArray(new InputSplitShim[inputSplitShims.size()]);
}
+ @Override
public InputSplitShim getInputSplitShim() throws IOException {
return new InputSplitShim();
}
+ @Override
public RecordReader getRecordReader(JobConf job, HadoopShims.InputSplitShim split,
Reporter reporter,
Class> rrClass)
@@ -373,6 +388,7 @@ public RecordReader getRecordReader(JobConf job, HadoopShims.InputSplitShim spli
}
+ @Override
public String getInputFormatClassName() {
return "org.apache.hadoop.hive.ql.io.CombineHiveInputFormat";
}
@@ -401,6 +417,7 @@ public int createHadoopArchive(Configuration conf, Path sourceDir, Path destDir,
* the archive as compared to the full path in case of earlier versions.
* See this api in Hadoop20Shims for comparison.
*/
+ @Override
public URI getHarUri(URI original, URI base, URI originalBase)
throws URISyntaxException {
URI relative = originalBase.relativize(original);
@@ -431,6 +448,7 @@ public void commitTask(TaskAttemptContext taskContext) { }
public void abortTask(TaskAttemptContext taskContext) { }
}
+ @Override
public void prepareJobOutput(JobConf conf) {
conf.setOutputCommitter(NullOutputCommitter.class);
@@ -686,4 +704,58 @@ public void checkFileAccess(FileSystem fs, FileStatus stat, FsAction action)
throws IOException, AccessControlException, Exception {
DefaultFileAccess.checkFileAccess(fs, stat, action);
}
+
+ @Override
+ public void setZookeeperClientJaasConfig(String principal, String keyTabFile) throws IOException {
+ // ZooKeeper property name to pick the correct JAAS conf section
+ final String SASL_LOGIN_CONTEXT_NAME = "HiveZooKeeperClient";
+ System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, SASL_LOGIN_CONTEXT_NAME);
+
+ principal = getResolvedPrincipal(principal);
+ JaasConfiguration jaasConf = new JaasConfiguration(SASL_LOGIN_CONTEXT_NAME, principal, keyTabFile);
+
+ // Install the Configuration in the runtime.
+ javax.security.auth.login.Configuration.setConfiguration(jaasConf);
+ }
+
+ /**
+ * A JAAS configuration for ZooKeeper clients intended to use for SASL
+ * Kerberos.
+ */
+ private static class JaasConfiguration extends javax.security.auth.login.Configuration {
+ // Current installed Configuration
+ private final javax.security.auth.login.Configuration baseConfig = javax.security.auth.login.Configuration
+ .getConfiguration();
+ private final String loginContextName;
+ private final String principal;
+ private final String keyTabFile;
+
+ public JaasConfiguration(String hiveLoginContextName, String principal, String keyTabFile) {
+ this.loginContextName = hiveLoginContextName;
+ this.principal = principal;
+ this.keyTabFile = keyTabFile;
+ }
+
+ @Override
+ public AppConfigurationEntry[] getAppConfigurationEntry(String appName) {
+ if (loginContextName.equals(appName)) {
+ Map krbOptions = new HashMap();
+ krbOptions.put("doNotPrompt", "true");
+ krbOptions.put("storeKey", "true");
+ krbOptions.put("useKeyTab", "true");
+ krbOptions.put("principal", principal);
+ krbOptions.put("keyTab", keyTabFile);
+ krbOptions.put("refreshKrb5Config", "true");
+ AppConfigurationEntry hiveZooKeeperClientEntry = new AppConfigurationEntry(
+ KerberosUtil.getKrb5LoginModuleName(), LoginModuleControlFlag.REQUIRED, krbOptions);
+ return new AppConfigurationEntry[] { hiveZooKeeperClientEntry };
+ }
+ // Try the base config
+ if (baseConfig != null) {
+ return baseConfig.getAppConfigurationEntry(appName);
+ }
+ return null;
+ }
+ }
+
}
diff --git a/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/DBTokenStore.java b/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/DBTokenStore.java
index 0bb2763..d86c5cb 100644
--- a/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/DBTokenStore.java
+++ b/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/DBTokenStore.java
@@ -25,6 +25,7 @@
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport;
@@ -111,7 +112,7 @@ public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) throws Tok
private Object rawStore;
@Override
- public void setStore(Object rawStore) throws TokenStoreException {
+ public void init(Object rawStore, ServerMode smode) throws TokenStoreException {
this.rawStore = rawStore;
}
@@ -148,5 +149,4 @@ public void close() throws IOException {
// No-op.
}
-
}
diff --git a/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java b/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java
index f3c2e48..867b4ed 100644
--- a/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java
+++ b/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java
@@ -21,6 +21,7 @@
import java.util.List;
import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
/**
@@ -108,6 +109,10 @@ DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier)
*/
List getAllDelegationTokenIdentifiers() throws TokenStoreException;
- void setStore(Object hmsHandler) throws TokenStoreException;
+ /**
+ * @param hmsHandler ObjectStore used by DBTokenStore
+ * @param smode Indicate whether this is a metastore or hiveserver2 token store
+ */
+ void init(Object hmsHandler, ServerMode smode);
}
diff --git a/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java b/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java
index 56735d8..624ac6b 100644
--- a/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java
+++ b/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java
@@ -308,6 +308,10 @@ static String encodeIdentifier(byte[] identifier) {
"hive.cluster.delegation.token.store.class";
public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR =
"hive.cluster.delegation.token.store.zookeeper.connectString";
+ // alternate connect string specification configuration
+ public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE =
+ "hive.zookeeper.quorum";
+
public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS =
"hive.cluster.delegation.token.store.zookeeper.connectTimeoutMillis";
public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE =
@@ -315,7 +319,7 @@ static String encodeIdentifier(byte[] identifier) {
public static final String DELEGATION_TOKEN_STORE_ZK_ACL =
"hive.cluster.delegation.token.store.zookeeper.acl";
public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT =
- "/hive/cluster/delegation";
+ "/hivedelegation";
public Server() throws TTransportException {
try {
@@ -417,7 +421,7 @@ protected DelegationTokenStore getTokenStore(Configuration conf)
}
@Override
- public void startDelegationTokenSecretManager(Configuration conf, Object rawStore)
+ public void startDelegationTokenSecretManager(Configuration conf, Object rawStore, ServerMode smode)
throws IOException{
long secretKeyInterval =
conf.getLong(DELEGATION_KEY_UPDATE_INTERVAL_KEY,
@@ -430,7 +434,7 @@ public void startDelegationTokenSecretManager(Configuration conf, Object rawStor
DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
DelegationTokenStore dts = getTokenStore(conf);
- dts.setStore(rawStore);
+ dts.init(rawStore, smode);
secretManager = new TokenStoreDelegationTokenSecretManager(secretKeyInterval,
tokenMaxLifetime,
tokenRenewInterval,
diff --git a/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java b/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java
index 9908aa4..cf60b7c 100644
--- a/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java
+++ b/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java
@@ -26,6 +26,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
/**
@@ -108,8 +109,7 @@ public void close() throws IOException {
}
@Override
- public void setStore(Object hmsHandler) throws TokenStoreException {
+ public void init(Object hmsHandler, ServerMode smode) throws TokenStoreException {
// no-op
}
-
}
diff --git a/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java b/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java
index 4ccf895..8146d51 100644
--- a/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java
+++ b/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java
@@ -265,6 +265,7 @@ protected void removeExpiredTokens() {
/**
* Extension of rollMasterKey to remove expired keys from store.
+ *
* @throws IOException
*/
protected void rollMasterKeyExt() throws IOException {
@@ -273,18 +274,21 @@ protected void rollMasterKeyExt() throws IOException {
HiveDelegationTokenSupport.rollMasterKey(TokenStoreDelegationTokenSecretManager.this);
List keysAfterRoll = Arrays.asList(getAllKeys());
for (DelegationKey key : keysAfterRoll) {
- keys.remove(key.getKeyId());
- if (key.getKeyId() == currentKeyId) {
- tokenStore.updateMasterKey(currentKeyId, encodeWritable(key));
- }
+ keys.remove(key.getKeyId());
+ if (key.getKeyId() == currentKeyId) {
+ tokenStore.updateMasterKey(currentKeyId, encodeWritable(key));
+ }
}
for (DelegationKey expiredKey : keys.values()) {
LOGGER.info("Removing expired key id={}", expiredKey.getKeyId());
- tokenStore.removeMasterKey(expiredKey.getKeyId());
+ try {
+ tokenStore.removeMasterKey(expiredKey.getKeyId());
+ } catch (Exception e) {
+ LOGGER.error("Error removing expired key id={}", expiredKey.getKeyId(), e);
+ }
}
}
-
/**
* Cloned from {@link AbstractDelegationTokenSecretManager} to deal with private access
* restriction (there would not be an need to clone the remove thread if the remove logic was
diff --git a/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java b/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java
index 8683496..418ce40 100644
--- a/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java
+++ b/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java
@@ -20,24 +20,28 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.ACLProvider;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.slf4j.Logger;
@@ -56,26 +60,35 @@
private static final String NODE_TOKENS = "/tokens";
private String rootNode = "";
- private volatile ZooKeeper zkSession;
+ private volatile CuratorFramework zkSession;
private String zkConnectString;
private final int zkSessionTimeout = 3000;
- private long connectTimeoutMillis = -1;
- private List newNodeAcl = Ids.OPEN_ACL_UNSAFE;
-
- private class ZooKeeperWatcher implements Watcher {
- public void process(org.apache.zookeeper.WatchedEvent event) {
- LOGGER.info(event.toString());
- if (event.getState() == Watcher.Event.KeeperState.Expired) {
- LOGGER.warn("ZooKeeper session expired, discarding connection");
- try {
- zkSession.close();
- } catch (Throwable e) {
- LOGGER.warn("Failed to close connection on expired session", e);
- }
- }
+ private int connectTimeoutMillis = -1;
+ private List newNodeAcl = Arrays.asList(new ACL(Perms.ALL, Ids.AUTH_IDS));
+
+ /**
+ * ACLProvider permissions will be used in case parent dirs need to be created
+ */
+ private final ACLProvider aclDefaultProvider = new ACLProvider() {
+
+ @Override
+ public List getDefaultAcl() {
+ return newNodeAcl;
}
- }
+ @Override
+ public List getAclForPath(String path) {
+ return getDefaultAcl();
+ }
+ };
+
+
+ private ServerMode serverMode;
+
+ private final String WHEN_ZK_DSTORE_MSG = "when zookeeper based delegation token storage is enabled"
+ + "(hive.cluster.delegation.token.store.class=" + ZooKeeperTokenStore.class.getName() + ")";
+
+ private Configuration conf;
/**
* Default constructor for dynamic instantiation w/ Configurable
@@ -84,68 +97,54 @@ public void process(org.apache.zookeeper.WatchedEvent event) {
protected ZooKeeperTokenStore() {
}
- public ZooKeeperTokenStore(String hostPort) {
- this.zkConnectString = hostPort;
- init();
- }
-
- private ZooKeeper getSession() {
- if (zkSession == null || zkSession.getState() == States.CLOSED) {
- synchronized (this) {
- if (zkSession == null || zkSession.getState() == States.CLOSED) {
- try {
- zkSession = createConnectedClient(this.zkConnectString, this.zkSessionTimeout,
- this.connectTimeoutMillis, new ZooKeeperWatcher());
- } catch (IOException ex) {
- throw new TokenStoreException("Token store error.", ex);
- }
- }
+ private CuratorFramework getSession() {
+ if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) {
+ synchronized (this) {
+ if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) {
+ zkSession = CuratorFrameworkFactory.builder().connectString(zkConnectString)
+ .sessionTimeoutMs(zkSessionTimeout).connectionTimeoutMs(connectTimeoutMillis)
+ .aclProvider(aclDefaultProvider)
+ .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
+ zkSession.start();
}
+ }
}
return zkSession;
}
- /**
- * Create a ZooKeeper session that is in connected state.
- *
- * @param connectString ZooKeeper connect String
- * @param sessionTimeout ZooKeeper session timeout
- * @param connectTimeout milliseconds to wait for connection, 0 or negative value means no wait
- * @param watchers
- * @return
- * @throws InterruptedException
- * @throws IOException
- */
- public static ZooKeeper createConnectedClient(String connectString,
- int sessionTimeout, long connectTimeout, final Watcher... watchers)
- throws IOException {
- final CountDownLatch connected = new CountDownLatch(1);
- Watcher connectWatcher = new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- switch (event.getState()) {
- case SyncConnected:
- connected.countDown();
- break;
- }
- for (Watcher w : watchers) {
- w.process(event);
- }
- }
- };
- ZooKeeper zk = new ZooKeeper(connectString, sessionTimeout, connectWatcher);
- if (connectTimeout > 0) {
- try {
- if (!connected.await(connectTimeout, TimeUnit.MILLISECONDS)) {
- zk.close();
- throw new IOException("Timeout waiting for connection after "
- + connectTimeout + "ms");
- }
- } catch (InterruptedException e) {
- throw new IOException("Error waiting for connection.", e);
- }
+ private void setupJAASConfig(Configuration conf) throws IOException {
+ if (!UserGroupInformation.getLoginUser().isFromKeytab()) {
+ // The process has not logged in using keytab
+ // this should be a test mode, can't use keytab to authenticate
+ // with zookeeper.
+ LOGGER.warn("Login is not from keytab");
+ return;
}
- return zk;
+
+ String principal;
+ String keytab;
+ switch (serverMode) {
+ case METASTORE:
+ principal = getNonEmptyConfVar(conf, "hive.metastore.kerberos.principal");
+ keytab = getNonEmptyConfVar(conf, "hive.metastore.kerberos.keytab.file");
+ break;
+ case HIVESERVER2:
+ principal = getNonEmptyConfVar(conf, "hive.server2.authentication.kerberos.principal");
+ keytab = getNonEmptyConfVar(conf, "hive.server2.authentication.kerberos.keytab");
+ break;
+ default:
+ throw new AssertionError("Unexpected server mode " + serverMode);
+ }
+ ShimLoader.getHadoopShims().setZookeeperClientJaasConfig(principal, keytab);
+ }
+
+ private String getNonEmptyConfVar(Configuration conf, String param) throws IOException {
+ String val = conf.get(param);
+ if (val == null || val.trim().isEmpty()) {
+ throw new IOException("Configuration parameter " + param + " should be set, "
+ + WHEN_ZK_DSTORE_MSG);
+ }
+ return val;
}
/**
@@ -157,20 +156,18 @@ public void process(WatchedEvent event) {
* @throws KeeperException
* @throws InterruptedException
*/
- public static String ensurePath(ZooKeeper zk, String path, List acl) throws KeeperException,
- InterruptedException {
- String[] pathComps = StringUtils.splitByWholeSeparator(path, "/");
- String currentPath = "";
- for (String pathComp : pathComps) {
- currentPath += "/" + pathComp;
- try {
- String node = zk.create(currentPath, new byte[0], acl,
- CreateMode.PERSISTENT);
- LOGGER.info("Created path: " + node);
- } catch (KeeperException.NodeExistsException e) {
- }
+ public void ensurePath(String path, List acl)
+ throws TokenStoreException {
+ try {
+ CuratorFramework zk = getSession();
+ String node = zk.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+ .withACL(acl).forPath(path);
+ LOGGER.info("Created path: {} ", node);
+ } catch (KeeperException.NodeExistsException e) {
+ // node already exists
+ } catch (Exception e) {
+ throw new TokenStoreException("Error creating path " + path, e);
}
- return currentPath;
}
/**
@@ -234,45 +231,24 @@ public static int getPermFromString(String permString) {
return acl;
}
- private void init() {
- if (this.zkConnectString == null) {
- throw new IllegalStateException("Not initialized");
- }
-
+ private void initClientAndPaths() {
if (this.zkSession != null) {
- try {
- this.zkSession.close();
- } catch (InterruptedException ex) {
- LOGGER.warn("Failed to close existing session.", ex);
- }
+ this.zkSession.close();
}
- ZooKeeper zk = getSession();
-
try {
- ensurePath(zk, rootNode + NODE_KEYS, newNodeAcl);
- ensurePath(zk, rootNode + NODE_TOKENS, newNodeAcl);
- } catch (Exception e) {
- throw new TokenStoreException("Failed to validate token path.", e);
- }
+ ensurePath(rootNode + NODE_KEYS, newNodeAcl);
+ ensurePath(rootNode + NODE_TOKENS, newNodeAcl);
+ } catch (TokenStoreException e) {
+ throw e;
+ }
}
@Override
public void setConf(Configuration conf) {
if (conf == null) {
- throw new IllegalArgumentException("conf is null");
+ throw new IllegalArgumentException("conf is null");
}
- this.zkConnectString = conf.get(
- HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null);
- this.connectTimeoutMillis = conf.getLong(
- HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS, -1);
- this.rootNode = conf.get(
- HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE,
- HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT);
- String csv = conf.get(HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ACL, null);
- if (StringUtils.isNotBlank(csv)) {
- this.newNodeAcl = parseACLs(csv);
- }
- init();
+ this.conf = conf;
}
@Override
@@ -280,15 +256,18 @@ public Configuration getConf() {
return null; // not required
}
- private Map getAllKeys() throws KeeperException,
- InterruptedException {
+ private Map getAllKeys() throws KeeperException, InterruptedException {
String masterKeyNode = rootNode + NODE_KEYS;
- ZooKeeper zk = getSession();
- List nodes = zk.getChildren(masterKeyNode, false);
+
+ // get children of key node
+ List nodes = zkGetChildren(masterKeyNode);
+
+ // read each child node, add to results
Map result = new HashMap();
for (String node : nodes) {
- byte[] data = zk.getData(masterKeyNode + "/" + node, false, null);
+ String nodePath = masterKeyNode + "/" + node;
+ byte[] data = zkGetData(nodePath);
if (data != null) {
result.put(getSeq(node), data);
}
@@ -296,6 +275,26 @@ public Configuration getConf() {
return result;
}
+ private List zkGetChildren(String path) {
+ CuratorFramework zk = getSession();
+ try {
+ return zk.getChildren().forPath(path);
+ } catch (Exception e) {
+ throw new TokenStoreException("Error getting children for " + path, e);
+ }
+ }
+
+ private byte[] zkGetData(String nodePath) {
+ CuratorFramework zk = getSession();
+ try {
+ return zk.getData().forPath(nodePath);
+ } catch (KeeperException.NoNodeException ex) {
+ return null;
+ } catch (Exception e) {
+ throw new TokenStoreException("Error reading " + nodePath, e);
+ }
+ }
+
private int getSeq(String path) {
String[] pathComps = path.split("/");
return Integer.parseInt(pathComps[pathComps.length-1]);
@@ -303,44 +302,45 @@ private int getSeq(String path) {
@Override
public int addMasterKey(String s) {
+ String keysPath = rootNode + NODE_KEYS + "/";
+ CuratorFramework zk = getSession();
+ String newNode;
try {
- ZooKeeper zk = getSession();
- String newNode = zk.create(rootNode + NODE_KEYS + "/", s.getBytes(), newNodeAcl,
- CreateMode.PERSISTENT_SEQUENTIAL);
- LOGGER.info("Added key {}", newNode);
- return getSeq(newNode);
- } catch (KeeperException ex) {
- throw new TokenStoreException(ex);
- } catch (InterruptedException ex) {
- throw new TokenStoreException(ex);
+ newNode = zk.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).withACL(newNodeAcl)
+ .forPath(keysPath, s.getBytes());
+ } catch (Exception e) {
+ throw new TokenStoreException("Error creating new node with path " + keysPath, e);
}
+ LOGGER.info("Added key {}", newNode);
+ return getSeq(newNode);
}
@Override
public void updateMasterKey(int keySeq, String s) {
+ CuratorFramework zk = getSession();
+ String keyPath = rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq);
try {
- ZooKeeper zk = getSession();
- zk.setData(rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq), s.getBytes(),
- -1);
- } catch (KeeperException ex) {
- throw new TokenStoreException(ex);
- } catch (InterruptedException ex) {
- throw new TokenStoreException(ex);
+ zk.setData().forPath(keyPath, s.getBytes());
+ } catch (Exception e) {
+ throw new TokenStoreException("Error setting data in " + keyPath, e);
}
}
@Override
public boolean removeMasterKey(int keySeq) {
+ String keyPath = rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq);
+ zkDelete(keyPath);
+ return true;
+ }
+
+ private void zkDelete(String path) {
+ CuratorFramework zk = getSession();
try {
- ZooKeeper zk = getSession();
- zk.delete(rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq), -1);
- return true;
+ zk.delete().forPath(path);
} catch (KeeperException.NoNodeException ex) {
- return false;
- } catch (KeeperException ex) {
- throw new TokenStoreException(ex);
- } catch (InterruptedException ex) {
- throw new TokenStoreException(ex);
+ // already deleted
+ } catch (Exception e) {
+ throw new TokenStoreException("Error deleting " + path, e);
}
}
@@ -374,67 +374,42 @@ private String getTokenPath(DelegationTokenIdentifier tokenIdentifier) {
@Override
public boolean addToken(DelegationTokenIdentifier tokenIdentifier,
DelegationTokenInformation token) {
+ byte[] tokenBytes = HiveDelegationTokenSupport.encodeDelegationTokenInformation(token);
+ String tokenPath = getTokenPath(tokenIdentifier);
+ CuratorFramework zk = getSession();
+ String newNode;
try {
- ZooKeeper zk = getSession();
- byte[] tokenBytes = HiveDelegationTokenSupport.encodeDelegationTokenInformation(token);
- String newNode = zk.create(getTokenPath(tokenIdentifier),
- tokenBytes, newNodeAcl, CreateMode.PERSISTENT);
- LOGGER.info("Added token: {}", newNode);
- return true;
- } catch (KeeperException.NodeExistsException ex) {
- return false;
- } catch (KeeperException ex) {
- throw new TokenStoreException(ex);
- } catch (InterruptedException ex) {
- throw new TokenStoreException(ex);
+ newNode = zk.create().withMode(CreateMode.PERSISTENT).withACL(newNodeAcl)
+ .forPath(tokenPath, tokenBytes);
+ } catch (Exception e) {
+ throw new TokenStoreException("Error creating new node with path " + tokenPath, e);
}
+
+ LOGGER.info("Added token: {}", newNode);
+ return true;
}
@Override
public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) {
- try {
- ZooKeeper zk = getSession();
- zk.delete(getTokenPath(tokenIdentifier), -1);
- return true;
- } catch (KeeperException.NoNodeException ex) {
- return false;
- } catch (KeeperException ex) {
- throw new TokenStoreException(ex);
- } catch (InterruptedException ex) {
- throw new TokenStoreException(ex);
- }
+ String tokenPath = getTokenPath(tokenIdentifier);
+ zkDelete(tokenPath);
+ return true;
}
@Override
public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) {
+ byte[] tokenBytes = zkGetData(getTokenPath(tokenIdentifier));
try {
- ZooKeeper zk = getSession();
- byte[] tokenBytes = zk.getData(getTokenPath(tokenIdentifier), false, null);
- try {
- return HiveDelegationTokenSupport.decodeDelegationTokenInformation(tokenBytes);
- } catch (Exception ex) {
- throw new TokenStoreException("Failed to decode token", ex);
- }
- } catch (KeeperException.NoNodeException ex) {
- return null;
- } catch (KeeperException ex) {
- throw new TokenStoreException(ex);
- } catch (InterruptedException ex) {
- throw new TokenStoreException(ex);
+ return HiveDelegationTokenSupport.decodeDelegationTokenInformation(tokenBytes);
+ } catch (Exception ex) {
+ throw new TokenStoreException("Failed to decode token", ex);
}
}
@Override
public List getAllDelegationTokenIdentifiers() {
String containerNode = rootNode + NODE_TOKENS;
- final List nodes;
- try {
- nodes = getSession().getChildren(containerNode, false);
- } catch (KeeperException ex) {
- throw new TokenStoreException(ex);
- } catch (InterruptedException ex) {
- throw new TokenStoreException(ex);
- }
+ final List nodes = zkGetChildren(containerNode);
List result = new java.util.ArrayList(
nodes.size());
for (String node : nodes) {
@@ -452,17 +427,44 @@ public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdenti
@Override
public void close() throws IOException {
if (this.zkSession != null) {
- try {
- this.zkSession.close();
- } catch (InterruptedException ex) {
- LOGGER.warn("Failed to close existing session.", ex);
- }
+ this.zkSession.close();
}
}
@Override
- public void setStore(Object hmsHandler) throws TokenStoreException {
- // no-op.
+ public void init(Object objectStore, ServerMode smode) {
+ this.serverMode = smode;
+ zkConnectString = conf.get(
+ HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null);
+ if (zkConnectString == null || zkConnectString.trim().isEmpty()) {
+ // try alternate config param
+ zkConnectString = conf.get(
+ HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE, null);
+ if (zkConnectString == null || zkConnectString.trim().isEmpty()) {
+ throw new IllegalArgumentException("Zookeeper connect string has to be specifed through "
+ + "either " + HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR
+ + " or "
+ + HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE
+ + WHEN_ZK_DSTORE_MSG);
+ }
+ }
+ connectTimeoutMillis = conf.getInt(
+ HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS, -1);
+ String csv = conf.get(HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ACL, null);
+ if (StringUtils.isNotBlank(csv)) {
+ this.newNodeAcl = parseACLs(csv);
+ }
+ rootNode = conf.get(HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE,
+ HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT) + serverMode;
+
+ try {
+ // Install the JAAS Configuration for the runtime
+ setupJAASConfig(conf);
+ } catch (IOException e) {
+ throw new TokenStoreException("Error setting up JAAS configuration for zookeeper client "
+ + e.getMessage(), e);
+ }
+ initClientAndPaths();
}
}
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
index 9850405..87a05f3 100644
--- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
+++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
@@ -61,7 +61,6 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
@@ -240,6 +239,14 @@ public URI getHarUri(URI original, URI base, URI originalBase)
public String getTokenStrForm(String tokenSignature) throws IOException;
/**
+ * Dynamically sets up the JAAS configuration
+ * @param principal
+ * @param keyTabFile
+ * @throws IOException
+ */
+ public void setZookeeperClientJaasConfig(String principal, String keyTabFile) throws IOException;
+
+ /**
* Add a delegation token to the given ugi
* @param ugi
* @param tokenStr
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java
index d0d6c7b..d011c67 100644
--- a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java
+++ b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java
@@ -99,12 +99,15 @@ public abstract TTransport createClientTransport(
}
public static abstract class Server {
+ public enum ServerMode {
+ HIVESERVER2, METASTORE
+ };
public abstract TTransportFactory createTransportFactory(Map saslProps) throws TTransportException;
public abstract TProcessor wrapProcessor(TProcessor processor);
public abstract TProcessor wrapNonAssumingProcessor(TProcessor processor);
public abstract InetAddress getRemoteAddress();
public abstract void startDelegationTokenSecretManager(Configuration conf,
- Object hmsHandler) throws IOException;
+ Object hmsHandler, ServerMode smode) throws IOException;
public abstract String getDelegationToken(String owner, String renewer)
throws IOException, InterruptedException;
public abstract String getDelegationTokenWithService(String owner, String renewer, String service)