Index: shims/src/test/org/apache/hadoop/hive/thrift/TestZooKeeperTokenStore.java =================================================================== --- shims/src/test/org/apache/hadoop/hive/thrift/TestZooKeeperTokenStore.java (revision 0) +++ shims/src/test/org/apache/hadoop/hive/thrift/TestZooKeeperTokenStore.java (revision 0) @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.thrift; + +import java.io.File; +import java.io.IOException; +import java.util.List; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; +import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import org.junit.Assert; + +public class TestZooKeeperTokenStore extends TestCase { + + private MiniZooKeeperCluster zkCluster = null; + private ZooKeeper zkClient = null; + + @Override + protected void setUp() throws Exception { + File zkDataDir = new File(System.getProperty("java.io.tmpdir")); + if (this.zkCluster != null) { + throw new IOException("Cluster already running"); + } + this.zkCluster = new MiniZooKeeperCluster(); + this.zkCluster.startup(zkDataDir); + this.zkClient = new ZooKeeper("localhost:" + + this.zkCluster.getClientPort(), 300, null); + } + + @Override + protected void tearDown() throws Exception { + this.zkClient.close(); + this.zkCluster.shutdown(); + this.zkCluster = null; + } + + private Configuration createConf(String zkPath) { + Configuration conf = new Configuration(); + conf.set( + HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, + "localhost:" + this.zkCluster.getClientPort()); + conf.set( + HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE, + zkPath); + return conf; + } + + public void testTokenStorage() throws Exception { + String ZK_PATH = "/zktokenstore-testTokenStorage"; + ZooKeeperTokenStore ts = new ZooKeeperTokenStore(); + ts.setConf(createConf(ZK_PATH)); + + int keySeq = ts.addMasterKey("key1Data"); + byte[] keyBytes = zkClient.getData( + ZK_PATH + + "/keys/" + + String.format(ZooKeeperTokenStore.ZK_SEQ_FORMAT, + keySeq), false, null); + assertNotNull(keyBytes); + assertEquals(new String(keyBytes), "key1Data"); + + int keySeq2 = ts.addMasterKey("key2Data"); + assertEquals("keys sequential", keySeq + 1, keySeq2); + assertEquals("expected number keys", 2, ts.getMasterKeys().length); + + ts.removeMasterKey(keySeq); + assertEquals("expected number keys", 1, ts.getMasterKeys().length); + + // tokens + DelegationTokenIdentifier tokenId = new DelegationTokenIdentifier( + new Text("owner"), new Text("renewer"), new Text("realUser")); + DelegationTokenInformation tokenInfo = new DelegationTokenInformation( + 99, "password".getBytes()); + ts.addToken(tokenId, tokenInfo); + DelegationTokenInformation tokenInfoRead = ts.getToken(tokenId); + assertEquals(tokenInfo.getRenewDate(), tokenInfoRead.getRenewDate()); + assertNotSame(tokenInfo, tokenInfoRead); + Assert.assertArrayEquals(HiveDelegationTokenSupport + .encodeDelegationTokenInformation(tokenInfo), + HiveDelegationTokenSupport + .encodeDelegationTokenInformation(tokenInfoRead)); + + List allIds = ts + .getAllDelegationTokenIdentifiers(); + assertEquals(1, allIds.size()); + Assert.assertEquals(TokenStoreDelegationTokenSecretManager + .encodeWritable(tokenId), + TokenStoreDelegationTokenSecretManager.encodeWritable(allIds + .get(0))); + + assertTrue(ts.removeToken(tokenId)); + assertEquals(0, ts.getAllDelegationTokenIdentifiers().size()); + } + + public void testAclNoAuth() throws Exception { + String ZK_PATH = "/zktokenstore-testAclNoAuth"; + Configuration conf = createConf(ZK_PATH); + conf.set( + HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ACL, + "ip:127.0.0.1:r"); + + ZooKeeperTokenStore ts = new ZooKeeperTokenStore(); + try { + ts.setConf(conf); + fail("expected ACL exception"); + } catch (DelegationTokenStore.TokenStoreException e) { + assertEquals(e.getCause().getClass(), + KeeperException.NoAuthException.class); + } + } + + public void testAclInvalid() throws Exception { + String ZK_PATH = "/zktokenstore-testAclInvalid"; + String aclString = "sasl:hive/host@TEST.DOMAIN:cdrwa, fail-parse-ignored"; + Configuration conf = createConf(ZK_PATH); + conf.set( + HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ACL, + aclString); + + List aclList = ZooKeeperTokenStore.parseACLs(aclString); + assertEquals(1, aclList.size()); + + ZooKeeperTokenStore ts = new ZooKeeperTokenStore(); + try { + ts.setConf(conf); + fail("expected ACL exception"); + } catch (DelegationTokenStore.TokenStoreException e) { + assertEquals(e.getCause().getClass(), + KeeperException.InvalidACLException.class); + } + } + + public void testAclPositive() throws Exception { + String ZK_PATH = "/zktokenstore-testAcl"; + Configuration conf = createConf(ZK_PATH); + conf.set( + HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ACL, + "world:anyone:cdrwa,ip:127.0.0.1:cdrwa"); + ZooKeeperTokenStore ts = new ZooKeeperTokenStore(); + ts.setConf(conf); + List acl = zkClient.getACL(ZK_PATH, new Stat()); + assertEquals(2, acl.size()); + } + +} Index: shims/src/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java =================================================================== --- shims/src/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java (revision 1292407) +++ shims/src/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java (working copy) @@ -39,7 +39,6 @@ import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.thrift.TokenStoreDelegationTokenSecretManager.TokenStore; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.SaslRpcServer.AuthMethod; @@ -82,11 +81,10 @@ return new TUGIAssumingTransportFactory(transFactory, realUgi); } - static TokenStore TOKEN_STORE = new MemoryTokenStore(); - //static TokenStore TOKEN_STORE = new ZooKeeperTokenStore("localhost:2181"); + static DelegationTokenStore TOKEN_STORE = new MemoryTokenStore(); @Override - protected TokenStore getTokenStore(Configuration conf) throws IOException { + protected DelegationTokenStore getTokenStore(Configuration conf) throws IOException { return TOKEN_STORE; } } Index: shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java =================================================================== --- shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java (revision 1292407) +++ shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java (working copy) @@ -213,9 +213,11 @@ "hive.cluster.delegation.token.store.class"; public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR = "hive.cluster.delegation.token.store.zookeeper.connectString"; - public static final String DELEGATION_TOKEN_STORE_ZK_ROOT_NODE = - "hive.cluster.delegation.token.store.zookeeper.rootNode"; - public static final String DELEGATION_TOKEN_STORE_ZK_ROOT_NODE_DEFAULT = + public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE = + "hive.cluster.delegation.token.store.zookeeper.znode"; + public static final String DELEGATION_TOKEN_STORE_ZK_ACL = + "hive.cluster.delegation.token.store.zookeeper.acl"; + public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT = "/hive/cluster/delegation"; public Server() throws TTransportException { @@ -291,16 +293,16 @@ return new TUGIAssumingProcessor(processor, secretManager); } - protected TokenStoreDelegationTokenSecretManager.TokenStore getTokenStore(Configuration conf) + protected DelegationTokenStore getTokenStore(Configuration conf) throws IOException { String tokenStoreClassName = conf.get(DELEGATION_TOKEN_STORE_CLS, ""); if (StringUtils.isBlank(tokenStoreClassName)) { return new MemoryTokenStore(); } try { - Class storeClass = Class + Class storeClass = Class .forName(tokenStoreClassName).asSubclass( - TokenStoreDelegationTokenSecretManager.TokenStore.class); + DelegationTokenStore.class); return ReflectionUtils.newInstance(storeClass, conf); } catch (ClassNotFoundException e) { throw new IOException("Error initializing delegation token store: " + tokenStoreClassName, Index: shims/src/common-secure/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java =================================================================== --- shims/src/common-secure/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java (revision 1292407) +++ shims/src/common-secure/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java (working copy) @@ -19,33 +19,36 @@ package org.apache.hadoop.hive.thrift; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.thrift.TokenStoreDelegationTokenSecretManager.TokenStoreError; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper.States; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Id; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * ZooKeeper token store implementation. */ -public class ZooKeeperTokenStore implements TokenStoreDelegationTokenSecretManager.TokenStore { +public class ZooKeeperTokenStore implements DelegationTokenStore { private static final Logger LOGGER = LoggerFactory.getLogger(ZooKeeperTokenStore.class.getName()); - private static final String ZK_SEQ_FORMAT = "%010d"; + protected static final String ZK_SEQ_FORMAT = "%010d"; private static final String NODE_KEYS = "/keys"; private static final String NODE_TOKENS = "/tokens"; @@ -53,7 +56,9 @@ private volatile ZooKeeper zkSession; private String zkConnectString; private final int zkSessionTimeout = 3000; + private List newNodeAcl = Ids.OPEN_ACL_UNSAFE; + private class ZooKeeperWatcher implements Watcher { public void process(org.apache.zookeeper.WatchedEvent event) { LOGGER.info(event.toString()); @@ -88,7 +93,7 @@ zkSession = new ZooKeeper(this.zkConnectString, this.zkSessionTimeout, new ZooKeeperWatcher()); } catch (IOException ex) { - throw new TokenStoreError("Token store error.", ex); + throw new TokenStoreException("Token store error.", ex); } } } @@ -96,14 +101,23 @@ return zkSession; } - private static String ensurePath(ZooKeeper zk, String path) throws KeeperException, + /** + * Create a path if it does not already exist ("mkdir -p") + * @param zk ZooKeeper session + * @param path string with '/' separator + * @param acl list of ACL entries + * @return + * @throws KeeperException + * @throws InterruptedException + */ + public static String ensurePath(ZooKeeper zk, String path, List acl) throws KeeperException, InterruptedException { String[] pathComps = StringUtils.splitByWholeSeparator(path, "/"); String currentPath = ""; for (String pathComp : pathComps) { currentPath += "/" + pathComp; try { - String node = zk.create(currentPath, new byte[0], Ids.OPEN_ACL_UNSAFE, + String node = zk.create(currentPath, new byte[0], acl, CreateMode.PERSISTENT); LOGGER.info("Created path: " + node); } catch (KeeperException.NodeExistsException e) { @@ -112,6 +126,67 @@ return currentPath; } + /** + * Parse ACL permission string, from ZooKeeperMain private method + * @param permString + * @return + */ + public static int getPermFromString(String permString) { + int perm = 0; + for (int i = 0; i < permString.length(); i++) { + switch (permString.charAt(i)) { + case 'r': + perm |= ZooDefs.Perms.READ; + break; + case 'w': + perm |= ZooDefs.Perms.WRITE; + break; + case 'c': + perm |= ZooDefs.Perms.CREATE; + break; + case 'd': + perm |= ZooDefs.Perms.DELETE; + break; + case 'a': + perm |= ZooDefs.Perms.ADMIN; + break; + default: + LOGGER.error("Unknown perm type: " + permString.charAt(i)); + } + } + return perm; + } + + /** + * Parse comma separated list of ACL entries to secure generated nodes, e.g. + * sasl:hive/host1@MY.DOMAIN:cdrwa,sasl:hive/host2@MY.DOMAIN:cdrwa + * @param aclString + * @return ACL list + */ + public static List parseACLs(String aclString) { + String[] aclComps = StringUtils.splitByWholeSeparator(aclString, ","); + List acl = new ArrayList(aclComps.length); + for (String a : aclComps) { + if (StringUtils.isBlank(a)) { + continue; + } + a = a.trim(); + // from ZooKeeperMain private method + int firstColon = a.indexOf(':'); + int lastColon = a.lastIndexOf(':'); + if (firstColon == -1 || lastColon == -1 || firstColon == lastColon) { + LOGGER.error(a + " does not have the form scheme:id:perm"); + continue; + } + ACL newAcl = new ACL(); + newAcl.setId(new Id(a.substring(0, firstColon), a.substring( + firstColon + 1, lastColon))); + newAcl.setPerms(getPermFromString(a.substring(lastColon + 1))); + acl.add(newAcl); + } + return acl; + } + private void init() { if (this.zkConnectString == null) { throw new IllegalStateException("Not initialized"); @@ -127,22 +202,27 @@ ZooKeeper zk = getSession(); try { - ensurePath(zk, rootNode + NODE_KEYS); - ensurePath(zk, rootNode + NODE_TOKENS); + ensurePath(zk, rootNode + NODE_KEYS, newNodeAcl); + ensurePath(zk, rootNode + NODE_TOKENS, newNodeAcl); } catch (Exception e) { - throw new TokenStoreError("Failed to validate token path.", e); + throw new TokenStoreException("Failed to validate token path.", e); } } @Override public void setConf(Configuration conf) { - if (conf != null) { - this.zkConnectString = conf.get( - HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null); - this.rootNode = conf.get( - HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ROOT_NODE, - HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ROOT_NODE_DEFAULT); + if (conf == null) { + throw new IllegalArgumentException("conf is null"); } + this.zkConnectString = conf.get( + HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null); + this.rootNode = conf.get( + HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE, + HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT); + String csv = conf.get(HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ACL, null); + if (StringUtils.isNotBlank(csv)) { + this.newNodeAcl = parseACLs(csv); + } init(); } @@ -176,14 +256,14 @@ public int addMasterKey(String s) { try { ZooKeeper zk = getSession(); - String newNode = zk.create(rootNode + NODE_KEYS + "/", s.getBytes(), Ids.OPEN_ACL_UNSAFE, + String newNode = zk.create(rootNode + NODE_KEYS + "/", s.getBytes(), newNodeAcl, CreateMode.PERSISTENT_SEQUENTIAL); LOGGER.info("Added key {}", newNode); return getSeq(newNode); } catch (KeeperException ex) { - throw new TokenStoreError(ex); + throw new TokenStoreException(ex); } catch (InterruptedException ex) { - throw new TokenStoreError(ex); + throw new TokenStoreException(ex); } } @@ -194,9 +274,9 @@ zk.setData(rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq), s.getBytes(), -1); } catch (KeeperException ex) { - throw new TokenStoreError(ex); + throw new TokenStoreException(ex); } catch (InterruptedException ex) { - throw new TokenStoreError(ex); + throw new TokenStoreException(ex); } } @@ -209,9 +289,9 @@ } catch (KeeperException.NoNodeException ex) { return false; } catch (KeeperException ex) { - throw new TokenStoreError(ex); + throw new TokenStoreException(ex); } catch (InterruptedException ex) { - throw new TokenStoreError(ex); + throw new TokenStoreException(ex); } } @@ -226,9 +306,9 @@ } return result; } catch (KeeperException ex) { - throw new TokenStoreError(ex); + throw new TokenStoreException(ex); } catch (InterruptedException ex) { - throw new TokenStoreError(ex); + throw new TokenStoreException(ex); } } @@ -238,7 +318,7 @@ return rootNode + NODE_TOKENS + "/" + TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier); } catch (IOException ex) { - throw new TokenStoreError("Failed to encode token identifier", ex); + throw new TokenStoreException("Failed to encode token identifier", ex); } } @@ -249,15 +329,15 @@ ZooKeeper zk = getSession(); byte[] tokenBytes = HiveDelegationTokenSupport.encodeDelegationTokenInformation(token); String newNode = zk.create(getTokenPath(tokenIdentifier), - tokenBytes, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + tokenBytes, newNodeAcl, CreateMode.PERSISTENT); LOGGER.info("Added token: {}", newNode); return true; } catch (KeeperException.NodeExistsException ex) { return false; } catch (KeeperException ex) { - throw new TokenStoreError(ex); + throw new TokenStoreException(ex); } catch (InterruptedException ex) { - throw new TokenStoreError(ex); + throw new TokenStoreException(ex); } } @@ -270,9 +350,9 @@ } catch (KeeperException.NoNodeException ex) { return false; } catch (KeeperException ex) { - throw new TokenStoreError(ex); + throw new TokenStoreException(ex); } catch (InterruptedException ex) { - throw new TokenStoreError(ex); + throw new TokenStoreException(ex); } } @@ -284,14 +364,14 @@ try { return HiveDelegationTokenSupport.decodeDelegationTokenInformation(tokenBytes); } catch (Exception ex) { - throw new TokenStoreError("Failed to decode token", ex); + throw new TokenStoreException("Failed to decode token", ex); } } catch (KeeperException.NoNodeException ex) { return null; } catch (KeeperException ex) { - throw new TokenStoreError(ex); + throw new TokenStoreException(ex); } catch (InterruptedException ex) { - throw new TokenStoreError(ex); + throw new TokenStoreException(ex); } } @@ -302,9 +382,9 @@ try { nodes = getSession().getChildren(containerNode, false); } catch (KeeperException ex) { - throw new TokenStoreError(ex); + throw new TokenStoreException(ex); } catch (InterruptedException ex) { - throw new TokenStoreError(ex); + throw new TokenStoreException(ex); } List result = new java.util.ArrayList( nodes.size()); Index: shims/src/common-secure/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java =================================================================== --- shims/src/common-secure/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java (revision 1292407) +++ shims/src/common-secure/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java (working copy) @@ -18,7 +18,10 @@ package org.apache.hadoop.hive.thrift; +import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; @@ -27,29 +30,31 @@ /** * Default in-memory token store implementation. */ -public class MemoryTokenStore implements TokenStoreDelegationTokenSecretManager.TokenStore { +public class MemoryTokenStore implements DelegationTokenStore { - private final java.util.concurrent.ConcurrentHashMap masterKeys - = new java.util.concurrent.ConcurrentHashMap(); + private final Map masterKeys + = new ConcurrentHashMap(); - private final java.util.concurrent.ConcurrentHashMap tokens - = new java.util.concurrent.ConcurrentHashMap(); + private final ConcurrentHashMap tokens + = new ConcurrentHashMap(); private final AtomicInteger masterKeySeq = new AtomicInteger(); + private Configuration conf; @Override public void setConf(Configuration conf) { + this.conf = conf; } @Override public Configuration getConf() { - return null; + return this.conf; } @Override public int addMasterKey(String s) { int keySeq = masterKeySeq.getAndIncrement(); - masterKeys.putIfAbsent(keySeq, s); + masterKeys.put(keySeq, s); return keySeq; } @@ -88,7 +93,7 @@ @Override public List getAllDelegationTokenIdentifiers() { - List result = new java.util.ArrayList( + List result = new ArrayList( tokens.size()); for (DelegationTokenIdentifier id : tokens.keySet()) { result.add(id); Index: shims/src/common-secure/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java =================================================================== --- shims/src/common-secure/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java (revision 0) +++ shims/src/common-secure/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java (revision 0) @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.thrift; + +import java.util.List; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; + +/** + * Interface for pluggable token store that can be implemented with shared external + * storage for load balancing and high availability (for example using ZooKeeper). + * Internal, store specific errors are translated into {@link TokenStoreException}. + */ +public interface DelegationTokenStore extends Configurable { + + /** + * Exception for internal token store errors that typically cannot be handled by the caller. + */ + public static class TokenStoreException extends RuntimeException { + private static final long serialVersionUID = -8693819817623074083L; + + public TokenStoreException(Throwable cause) { + super(cause); + } + + public TokenStoreException(String message, Throwable cause) { + super(message, cause); + } + } + + /** + * Add new master key. The token store assigns and returns the sequence number. + * Caller needs to use the identifier to update the key (since it is embedded in the key). + * + * @param s + * @return sequence number for new key + */ + int addMasterKey(String s) throws TokenStoreException; + + /** + * Update master key (for expiration and setting store assigned sequence within key) + * @param keySeq + * @param s + * @throws TokenStoreException + */ + void updateMasterKey(int keySeq, String s) throws TokenStoreException; + + /** + * Remove key for given id. + * @param keySeq + * @return false if key no longer present, true otherwise. + */ + boolean removeMasterKey(int keySeq); + + /** + * Return all master keys. + * @return + * @throws TokenStoreException + */ + String[] getMasterKeys() throws TokenStoreException; + + /** + * Add token. If identifier is already present, token won't be added. + * @param tokenIdentifier + * @param token + * @return true if token was added, false for existing identifier + */ + boolean addToken(DelegationTokenIdentifier tokenIdentifier, + DelegationTokenInformation token) throws TokenStoreException; + + /** + * Get token. Returns null if the token does not exist. + * @param tokenIdentifier + * @return + */ + DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) + throws TokenStoreException; + + /** + * Remove token. Return value can be used by caller to detect concurrency. + * @param tokenIdentifier + * @return true if token was removed, false if it was already removed. + * @throws TokenStoreException + */ + boolean removeToken(DelegationTokenIdentifier tokenIdentifier) throws TokenStoreException; + + /** + * List of all token identifiers in the store. This is used to remove expired tokens + * and a potential scalability improvement would be to partition by master key id + * @return + */ + List getAllDelegationTokenIdentifiers(); + +} Index: shims/src/common-secure/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java =================================================================== --- shims/src/common-secure/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java (revision 1292407) +++ shims/src/common-secure/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java (working copy) @@ -25,12 +25,12 @@ import java.io.IOException; import java.lang.reflect.Method; import java.util.Arrays; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import org.apache.commons.codec.binary.Base64; -import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; @@ -58,88 +58,16 @@ private static final Logger LOGGER = LoggerFactory.getLogger(TokenStoreDelegationTokenSecretManager.class.getName()); - /** - * Exception for internal token store errors that typically cannot be handled by the caller. - */ - public static class TokenStoreError extends RuntimeException { - private static final long serialVersionUID = -8693819817623074083L; - - public TokenStoreError(Throwable cause) { - super(cause); - } - - public TokenStoreError(String message, Throwable cause) { - super(message, cause); - } - } - - /** - * Interface for pluggable token store that can be implemented as shared store with external - * storage (for example with ZooKeeper for HA). - * Internal, store specific errors are translated into {@link TokenStoreError}. - */ - public static interface TokenStore extends Configurable { - /** - * Add new master key. The token store assigns and returns the sequence number. - * Caller needs to use the identifier to update the key (since it is embedded in the key). - * - * @param s - * @return sequence number for new key - */ - int addMasterKey(String s) throws TokenStoreError; - - void updateMasterKey(int keySeq, String s) throws TokenStoreError; - - /** - * Remove key for given id. - * @param keySeq - * @return false if key no longer present, true otherwise. - */ - boolean removeMasterKey(int keySeq); - - String[] getMasterKeys() throws TokenStoreError; - - /** - * Add token. If identifier is already present, token won't be added. - * @param tokenIdentifier - * @param token - * @return true if token was added, false for existing identifier - */ - boolean addToken(DelegationTokenIdentifier tokenIdentifier, - DelegationTokenInformation token) throws TokenStoreError; - - /** - * Get token. Returns null if the token does not exist. - * @param tokenIdentifier - * @return - */ - DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) - throws TokenStoreError; - - /** - * Remove token. Ignores token does not exist. - * @param tokenIdentifier - */ - boolean removeToken(DelegationTokenIdentifier tokenIdentifier) throws TokenStoreError; - - /** - * List of all token identifiers in the store. This is used to remove expired tokens - * and a potential scalability improvement would be to partition by master key id - * @return - */ - List getAllDelegationTokenIdentifiers(); - - } - final private long keyUpdateInterval; final private long tokenRemoverScanInterval; private Thread tokenRemoverThread; - final private TokenStore tokenStore; + final private DelegationTokenStore tokenStore; public TokenStoreDelegationTokenSecretManager(long delegationKeyUpdateInterval, long delegationTokenMaxLifetime, long delegationTokenRenewInterval, - long delegationTokenRemoverScanInterval, TokenStore sharedStore) { + long delegationTokenRemoverScanInterval, + DelegationTokenStore sharedStore) { super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, delegationTokenRenewInterval, delegationTokenRemoverScanInterval); this.keyUpdateInterval = delegationKeyUpdateInterval; @@ -162,7 +90,7 @@ // read keys from token store String[] allKeys = tokenStore.getMasterKeys(); Map keys - = new java.util.HashMap(allKeys.length); + = new HashMap(allKeys.length); for (String keyStr : allKeys) { DelegationKey key = new DelegationKey(); try { @@ -180,8 +108,7 @@ } @Override - public byte[] retrievePassword(DelegationTokenIdentifier identifier) - throws org.apache.hadoop.security.token.SecretManager.InvalidToken { + public byte[] retrievePassword(DelegationTokenIdentifier identifier) throws InvalidToken { DelegationTokenInformation info = this.tokenStore.getToken(identifier); if (info == null) { throw new InvalidToken("token expired or does not exist: " + identifier); Index: conf/hive-default.xml.template =================================================================== --- conf/hive-default.xml.template (revision 1292407) +++ conf/hive-default.xml.template (working copy) @@ -754,6 +754,30 @@ + hive.cluster.delegation.token.store.class + org.apache.hadoop.hive.thrift.MemoryTokenStore + The delegation token store implementation. Set to org.apache.hadoop.hive.thrift.ZooKeeperTokenStore for load-balanced cluster. + + + + hive.cluster.delegation.token.store.zookeeper.connectString + localhost:2181 + The ZooKeeper token store connect string. + + + + hive.cluster.delegation.token.store.zookeeper.znode + /hive/cluster/delegation + The root path for token store data. + + + + hive.cluster.delegation.token.store.zookeeper.acl + sasl:hive/host1@EXAMPLE.COM:cdrwa,sasl:hive/host2@EXAMPLE.COM:cdrwa + ACL for token store entries. List comma separated all server principals for the cluster. + + + hive.metastore.cache.pinobjtypes Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order List of comma separated metastore object types that should be pinned in the cache Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1292407) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -263,6 +263,15 @@ METASTORE_KERBEROS_PRINCIPAL("hive.metastore.kerberos.principal", "hive-metastore/_HOST@EXAMPLE.COM"), METASTORE_USE_THRIFT_SASL("hive.metastore.sasl.enabled", false), + METASTORE_CLUSTER_DELEGATION_TOKEN_STORE_CLS( + "hive.cluster.delegation.token.store.class", + "org.apache.hadoop.hive.thrift.MemoryTokenStore"), + METASTORE_CLUSTER_DELEGATION_TOKEN_STORE_ZK_CONNECTSTR( + "hive.cluster.delegation.token.store.zookeeper.connectString", ""), + METASTORE_CLUSTER_DELEGATION_TOKEN_STORE_ZK_ZNODE( + "hive.cluster.delegation.token.store.zookeeper.znode", "/hive/cluster/delegation"), + METASTORE_CLUSTER_DELEGATION_TOKEN_STORE_ZK_ACL( + "hive.cluster.delegation.token.store.zookeeper.acl", ""), METASTORE_CACHE_PINOBJTYPES("hive.metastore.cache.pinobjtypes", "Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order"), METASTORE_CONNECTION_POOLING_TYPE("datanucleus.connectionPoolingType", "DBCP"), METASTORE_VALIDATE_TABLES("datanucleus.validateTables", false),