Index: shims/ivy.xml =================================================================== --- shims/ivy.xml (revision 1202918) +++ shims/ivy.xml (working copy) @@ -33,6 +33,10 @@ + + + ipList = new ArrayList(); @@ -107,7 +118,7 @@ conf.setStrings(ProxyUsers.getProxySuperuserIpConfKey(superUserShortName), builder.toString()); } - + public void setup(final int port) throws Exception { System.setProperty(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, "true"); @@ -120,28 +131,105 @@ MetaStoreUtils.startMetaStore(port, new MyHadoopThriftAuthBridge20S()); } + /** + * Test delegation token store/load from shared store. + * @throws Exception + */ + public void testDelegationTokenSharedStore() throws Exception { + UserGroupInformation clientUgi = UserGroupInformation.getCurrentUser(); + + TokenStoreDelegationTokenSecretManager tokenManager = + new TokenStoreDelegationTokenSecretManager(0, 60*60*1000, 60*60*1000, 0, + MyHadoopThriftAuthBridge20S.Server.TOKEN_STORE); + // initializes current key + tokenManager.startThreads(); + tokenManager.stopThreads(); + + String tokenStrForm = tokenManager.getDelegationToken(clientUgi.getShortUserName()); + Token t= new Token(); + t.decodeFromUrlString(tokenStrForm); + + //check whether the username in the token is what we expect + DelegationTokenIdentifier d = new DelegationTokenIdentifier(); + d.readFields(new DataInputStream(new ByteArrayInputStream( + t.getIdentifier()))); + assertTrue("Usernames don't match", + clientUgi.getShortUserName().equals(d.getUser().getShortUserName())); + + DelegationTokenInformation tokenInfo = MyHadoopThriftAuthBridge20S.Server.TOKEN_STORE + .getToken(d); + assertNotNull("token not in store", tokenInfo); + assertFalse("duplicate token add", + MyHadoopThriftAuthBridge20S.Server.TOKEN_STORE.addToken(d, tokenInfo)); + + // check keys are copied from token store when token is loaded + TokenStoreDelegationTokenSecretManager anotherManager = + new TokenStoreDelegationTokenSecretManager(0, 0, 0, 0, + MyHadoopThriftAuthBridge20S.Server.TOKEN_STORE); + assertEquals("master keys empty on init", 0, + anotherManager.getAllKeys().length); + assertNotNull("token loaded", + anotherManager.retrievePassword(d)); + anotherManager.renewToken(t, clientUgi.getShortUserName()); + assertEquals("master keys not loaded from store", + MyHadoopThriftAuthBridge20S.Server.TOKEN_STORE.getMasterKeys().length, + anotherManager.getAllKeys().length); + + // cancel the delegation token + tokenManager.cancelDelegationToken(tokenStrForm); + assertNull("token not removed from store after cancel", + MyHadoopThriftAuthBridge20S.Server.TOKEN_STORE.getToken(d)); + assertFalse("token removed (again)", + MyHadoopThriftAuthBridge20S.Server.TOKEN_STORE.removeToken(d)); + try { + anotherManager.retrievePassword(d); + fail("InvalidToken expected after cancel"); + } catch (InvalidToken ex) { + // expected + } + + // token expiration + MyHadoopThriftAuthBridge20S.Server.TOKEN_STORE.addToken(d, + new DelegationTokenInformation(0, t.getPassword())); + assertNotNull(MyHadoopThriftAuthBridge20S.Server.TOKEN_STORE.getToken(d)); + anotherManager.removeExpiredTokens(); + assertNull("Expired token not removed", + MyHadoopThriftAuthBridge20S.Server.TOKEN_STORE.getToken(d)); + + // key expiration - create an already expired key + anotherManager.startThreads(); // generates initial key + anotherManager.stopThreads(); + DelegationKey expiredKey = new DelegationKey(-1, 0, anotherManager.getAllKeys()[0].getKey()); + anotherManager.logUpdateMasterKey(expiredKey); // updates key with sequence number + assertTrue("expired key not in allKeys", + anotherManager.reloadKeys().containsKey(expiredKey.getKeyId())); + anotherManager.rollMasterKeyExt(); + assertFalse("Expired key not removed", + anotherManager.reloadKeys().containsKey(expiredKey.getKeyId())); + } + public void testSaslWithHiveMetaStore() throws Exception { setup(10000); UserGroupInformation clientUgi = UserGroupInformation.getCurrentUser(); obtainTokenAndAddIntoUGI(clientUgi, null); obtainTokenAndAddIntoUGI(clientUgi, "tokenForFooTablePartition"); } - + public void testMetastoreProxyUser() throws Exception { setup(10010); - + final String proxyUserName = "proxyUser"; - //set the configuration up such that proxyUser can act on + //set the configuration up such that proxyUser can act on //behalf of all users belonging to the group foo_bar_group ( //a dummy group) String[] groupNames = new String[] { "foo_bar_group" }; setGroupsInConf(groupNames, proxyUserName); - - final UserGroupInformation delegationTokenUser = + + final UserGroupInformation delegationTokenUser = UserGroupInformation.getCurrentUser(); - - final UserGroupInformation proxyUserUgi = + + final UserGroupInformation proxyUserUgi = UserGroupInformation.createRemoteUser(proxyUserName); String tokenStrForm = proxyUserUgi.doAs(new PrivilegedExceptionAction() { public String run() throws Exception { @@ -154,11 +242,11 @@ } } }); - assertTrue("Expected the getDelegationToken call to fail", + assertTrue("Expected the getDelegationToken call to fail", tokenStrForm == null); - - //set the configuration up such that proxyUser can act on - //behalf of all users belonging to the real group(s) that the + + //set the configuration up such that proxyUser can act on + //behalf of all users belonging to the real group(s) that the //user running the test belongs to setGroupsInConf(UserGroupInformation.getCurrentUser().getGroupNames(), proxyUserName); @@ -173,7 +261,7 @@ } } }); - assertTrue("Expected the getDelegationToken call to not fail", + assertTrue("Expected the getDelegationToken call to not fail", tokenStrForm != null); Token t= new Token(); t.decodeFromUrlString(tokenStrForm); @@ -181,12 +269,12 @@ DelegationTokenIdentifier d = new DelegationTokenIdentifier(); d.readFields(new DataInputStream(new ByteArrayInputStream( t.getIdentifier()))); - assertTrue("Usernames don't match", + assertTrue("Usernames don't match", delegationTokenUser.getShortUserName().equals(d.getUser().getShortUserName())); - + } - - private void setGroupsInConf(String[] groupNames, String proxyUserName) + + private void setGroupsInConf(String[] groupNames, String proxyUserName) throws IOException { conf.set( ProxyUsers.getProxySuperuserGroupConfKey(proxyUserName), @@ -194,21 +282,21 @@ configureSuperUserIPAddresses(conf, proxyUserName); ProxyUsers.refreshSuperUserGroupsConfiguration(conf); } - - private String getDelegationTokenStr(UserGroupInformation ownerUgi, + + private String getDelegationTokenStr(UserGroupInformation ownerUgi, UserGroupInformation realUgi) throws Exception { //obtain a token by directly invoking the metastore operation(without going //through the thrift interface). Obtaining a token makes the secret manager //aware of the user and that it gave the token to the user - //also set the authentication method explicitly to KERBEROS. Since the + //also set the authentication method explicitly to KERBEROS. Since the //metastore checks whether the authentication method is KERBEROS or not - //for getDelegationToken, and the testcases don't use + //for getDelegationToken, and the testcases don't use //kerberos, this needs to be done HadoopThriftAuthBridge20S.Server.authenticationMethod .set(AuthenticationMethod.KERBEROS); - HadoopThriftAuthBridge20S.Server.remoteAddress.set(InetAddress.getLocalHost()); + HadoopThriftAuthBridge20S.Server.remoteAddress.set(InetAddress.getLocalHost()); return - HiveMetaStore.getDelegationToken(ownerUgi.getShortUserName(), + HiveMetaStore.getDelegationToken(ownerUgi.getShortUserName(), realUgi.getShortUserName()); } @@ -217,14 +305,14 @@ String tokenStrForm = getDelegationTokenStr(clientUgi, clientUgi); Token t= new Token(); t.decodeFromUrlString(tokenStrForm); - + //check whether the username in the token is what we expect DelegationTokenIdentifier d = new DelegationTokenIdentifier(); d.readFields(new DataInputStream(new ByteArrayInputStream( t.getIdentifier()))); - assertTrue("Usernames don't match", + assertTrue("Usernames don't match", clientUgi.getShortUserName().equals(d.getUser().getShortUserName())); - + if (tokenSig != null) { conf.set("hive.metastore.token.signature", tokenSig); t.setService(new Text(tokenSig)); @@ -247,7 +335,7 @@ //try out some metastore operations createDBAndVerifyExistence(hiveClient); - + //check that getDelegationToken fails since we are not authenticating //over kerberos boolean pass = false; Index: shims/src/0.20S/java/org/apache/hadoop/security/token/delegation/HiveDelegationTokenSupport.java =================================================================== --- shims/src/0.20S/java/org/apache/hadoop/security/token/delegation/HiveDelegationTokenSupport.java (revision 0) +++ shims/src/0.20S/java/org/apache/hadoop/security/token/delegation/HiveDelegationTokenSupport.java (revision 0) @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.security.token.delegation; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; + +/** + * Workaround for serialization of {@link DelegationTokenInformation} through package access. + * Future version of Hadoop should add this to DelegationTokenInformation itself. + */ +public final class HiveDelegationTokenSupport { + + private HiveDelegationTokenSupport() {} + + public static byte[] encodeDelegationTokenInformation(DelegationTokenInformation token) { + try { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(bos); + WritableUtils.writeVInt(out, token.password.length); + out.write(token.password); + out.writeLong(token.renewDate); + out.flush(); + return bos.toByteArray(); + } catch (IOException ex) { + throw new RuntimeException("Failed to encode token.", ex); + } + } + + public static DelegationTokenInformation decodeDelegationTokenInformation(byte[] tokenBytes) + throws IOException { + DataInputStream in = new DataInputStream(new ByteArrayInputStream(tokenBytes)); + DelegationTokenInformation token = new DelegationTokenInformation(0, null); + int len = WritableUtils.readVInt(in); + token.password = new byte[len]; + in.readFully(token.password); + token.renewDate = in.readLong(); + return token; + } + + public static void rollMasterKey( + AbstractDelegationTokenSecretManager mgr) + throws IOException { + mgr.rollMasterKey(); + } + +} Index: shims/src/0.20S/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java =================================================================== --- shims/src/0.20S/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java (revision 1202918) +++ shims/src/0.20S/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java (working copy) @@ -35,6 +35,7 @@ import javax.security.sasl.SaslServer; import org.apache.commons.codec.binary.Base64; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -48,6 +49,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.thrift.TException; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TProtocol; @@ -330,6 +332,14 @@ "hive.cluster.delegation.token.max-lifetime"; public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT = 7*24*60*60*1000; // 7 days + public static final String DELEGATION_TOKEN_STORE_CLS = + "hive.cluster.delegation.token.store.class"; + public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR = + "hive.cluster.delegation.token.store.zookeeper.connectString"; + public static final String DELEGATION_TOKEN_STORE_ZK_ROOT_NODE = + "hive.cluster.delegation.token.store.zookeeper.rootNode"; + public static final String DELEGATION_TOKEN_STORE_ZK_ROOT_NODE_DEFAULT = + "/hive/cluster/delegation"; public Server() throws TTransportException { try { @@ -404,6 +414,23 @@ return new TUGIAssumingProcessor(processor, secretManager); } + protected TokenStoreDelegationTokenSecretManager.TokenStore getTokenStore(Configuration conf) + throws IOException { + String tokenStoreClassName = conf.get(DELEGATION_TOKEN_STORE_CLS, ""); + if (StringUtils.isBlank(tokenStoreClassName)) { + return new MemoryTokenStore(); + } + try { + Class storeClass = Class + .forName(tokenStoreClassName).asSubclass( + TokenStoreDelegationTokenSecretManager.TokenStore.class); + return ReflectionUtils.newInstance(storeClass, conf); + } catch (ClassNotFoundException e) { + throw new IOException("Error initializing delegation token store: " + tokenStoreClassName, + e); + } + } + @Override public void startDelegationTokenSecretManager(Configuration conf) throws IOException{ @@ -416,11 +443,11 @@ long tokenRenewInterval = conf.getLong(DELEGATION_TOKEN_RENEW_INTERVAL_KEY, DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT); - secretManager = - new DelegationTokenSecretManager(secretKeyInterval, - tokenMaxLifetime, - tokenRenewInterval, - DELEGATION_TOKEN_GC_INTERVAL); + + secretManager = new TokenStoreDelegationTokenSecretManager(secretKeyInterval, + tokenMaxLifetime, + tokenRenewInterval, + DELEGATION_TOKEN_GC_INTERVAL, getTokenStore(conf)); secretManager.startThreads(); } Index: shims/src/0.20S/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java =================================================================== --- shims/src/0.20S/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java (revision 0) +++ shims/src/0.20S/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java (revision 0) @@ -0,0 +1,323 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.thrift; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.thrift.TokenStoreDelegationTokenSecretManager.TokenStoreError; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; +import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooKeeper.States; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ZooKeeper token store implementation. + */ +public class ZooKeeperTokenStore implements TokenStoreDelegationTokenSecretManager.TokenStore { + + private static final Logger LOGGER = + LoggerFactory.getLogger(ZooKeeperTokenStore.class.getName()); + + private static final String ZK_SEQ_FORMAT = "%010d"; + private static final String NODE_KEYS = "/keys"; + private static final String NODE_TOKENS = "/tokens"; + + private String rootNode = ""; + private volatile ZooKeeper zkSession; + private String zkConnectString; + private final int zkSessionTimeout = 3000; + + private class ZooKeeperWatcher implements Watcher { + public void process(org.apache.zookeeper.WatchedEvent event) { + LOGGER.info(event.toString()); + if (event.getState() == Watcher.Event.KeeperState.Expired) { + LOGGER.warn("ZooKeeper session expired, discarding connection"); + try { + zkSession.close(); + } catch (Throwable e) { + LOGGER.warn("Failed to close connection on expired session", e); + } + } + } + } + + /** + * Default constructor for dynamic instantiation w/ Configurable + * (ReflectionUtils does not support Configuration constructor injection). + */ + protected ZooKeeperTokenStore() { + } + + public ZooKeeperTokenStore(String hostPort) { + this.zkConnectString = hostPort; + init(); + } + + private ZooKeeper getSession() { + if (zkSession == null || zkSession.getState() == States.CLOSED) { + synchronized (this) { + if (zkSession == null || zkSession.getState() == States.CLOSED) { + try { + zkSession = new ZooKeeper(this.zkConnectString, this.zkSessionTimeout, + new ZooKeeperWatcher()); + } catch (IOException ex) { + throw new TokenStoreError("Token store error.", ex); + } + } + } + } + return zkSession; + } + + private static String ensurePath(ZooKeeper zk, String path) throws KeeperException, + InterruptedException { + String[] pathComps = StringUtils.splitByWholeSeparator(path, "/"); + String currentPath = ""; + for (String pathComp : pathComps) { + currentPath += "/" + pathComp; + try { + String node = zk.create(currentPath, new byte[0], Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + LOGGER.info("Created path: " + node); + } catch (KeeperException.NodeExistsException e) { + } + } + return currentPath; + } + + private void init() { + if (this.zkConnectString == null) { + throw new IllegalStateException("Not initialized"); + } + + if (this.zkSession != null) { + try { + this.zkSession.close(); + } catch (InterruptedException ex) { + LOGGER.warn("Failed to close existing session.", ex); + } + } + + ZooKeeper zk = getSession(); + try { + ensurePath(zk, rootNode + NODE_KEYS); + ensurePath(zk, rootNode + NODE_TOKENS); + } catch (Exception e) { + throw new TokenStoreError("Failed to validate token path.", e); + } + } + + @Override + public void setConf(Configuration conf) { + if (conf != null) { + this.zkConnectString = conf.get( + HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null); + this.rootNode = conf.get( + HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ROOT_NODE, + HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ROOT_NODE_DEFAULT); + } + init(); + } + + @Override + public Configuration getConf() { + return null; // not required + } + + private Map getAllKeys() throws KeeperException, + InterruptedException { + + String masterKeyNode = rootNode + NODE_KEYS; + ZooKeeper zk = getSession(); + List nodes = zk.getChildren(masterKeyNode, false); + Map result = new HashMap(); + for (String node : nodes) { + byte[] data = zk.getData(masterKeyNode + "/" + node, false, null); + if (data != null) { + result.put(getSeq(node), data); + } + } + return result; + } + + private int getSeq(String path) { + String[] pathComps = path.split("/"); + return Integer.parseInt(pathComps[pathComps.length-1]); + } + + @Override + public int addMasterKey(String s) { + try { + ZooKeeper zk = getSession(); + String newNode = zk.create(rootNode + NODE_KEYS + "/", s.getBytes(), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT_SEQUENTIAL); + LOGGER.info("Added key {}", newNode); + return getSeq(newNode); + } catch (KeeperException ex) { + throw new TokenStoreError(ex); + } catch (InterruptedException ex) { + throw new TokenStoreError(ex); + } + } + + @Override + public void updateMasterKey(int keySeq, String s) { + try { + ZooKeeper zk = getSession(); + zk.setData(rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq), s.getBytes(), + -1); + } catch (KeeperException ex) { + throw new TokenStoreError(ex); + } catch (InterruptedException ex) { + throw new TokenStoreError(ex); + } + } + + @Override + public boolean removeMasterKey(int keySeq) { + try { + ZooKeeper zk = getSession(); + zk.delete(rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq), -1); + return true; + } catch (KeeperException.NoNodeException ex) { + return false; + } catch (KeeperException ex) { + throw new TokenStoreError(ex); + } catch (InterruptedException ex) { + throw new TokenStoreError(ex); + } + } + + @Override + public String[] getMasterKeys() { + try { + Map allKeys = getAllKeys(); + String[] result = new String[allKeys.size()]; + int resultIdx = 0; + for (byte[] keyBytes : allKeys.values()) { + result[resultIdx++] = new String(keyBytes); + } + return result; + } catch (KeeperException ex) { + throw new TokenStoreError(ex); + } catch (InterruptedException ex) { + throw new TokenStoreError(ex); + } + } + + + private String getTokenPath(DelegationTokenIdentifier tokenIdentifier) { + try { + return rootNode + NODE_TOKENS + "/" + + TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier); + } catch (IOException ex) { + throw new TokenStoreError("Failed to encode token identifier", ex); + } + } + + @Override + public boolean addToken(DelegationTokenIdentifier tokenIdentifier, + DelegationTokenInformation token) { + try { + ZooKeeper zk = getSession(); + byte[] tokenBytes = HiveDelegationTokenSupport.encodeDelegationTokenInformation(token); + String newNode = zk.create(getTokenPath(tokenIdentifier), + tokenBytes, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + LOGGER.info("Added token: {}", newNode); + return true; + } catch (KeeperException.NodeExistsException ex) { + return false; + } catch (KeeperException ex) { + throw new TokenStoreError(ex); + } catch (InterruptedException ex) { + throw new TokenStoreError(ex); + } + } + + @Override + public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) { + try { + ZooKeeper zk = getSession(); + zk.delete(getTokenPath(tokenIdentifier), -1); + return true; + } catch (KeeperException.NoNodeException ex) { + return false; + } catch (KeeperException ex) { + throw new TokenStoreError(ex); + } catch (InterruptedException ex) { + throw new TokenStoreError(ex); + } + } + + @Override + public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) { + try { + ZooKeeper zk = getSession(); + byte[] tokenBytes = zk.getData(getTokenPath(tokenIdentifier), false, null); + try { + return HiveDelegationTokenSupport.decodeDelegationTokenInformation(tokenBytes); + } catch (Exception ex) { + throw new TokenStoreError("Failed to decode token", ex); + } + } catch (KeeperException.NoNodeException ex) { + return null; + } catch (KeeperException ex) { + throw new TokenStoreError(ex); + } catch (InterruptedException ex) { + throw new TokenStoreError(ex); + } + } + + @Override + public List getAllDelegationTokenIdentifiers() { + String containerNode = rootNode + NODE_TOKENS; + final List nodes; + try { + nodes = getSession().getChildren(containerNode, false); + } catch (KeeperException ex) { + throw new TokenStoreError(ex); + } catch (InterruptedException ex) { + throw new TokenStoreError(ex); + } + List result = new java.util.ArrayList( + nodes.size()); + for (String node : nodes) { + DelegationTokenIdentifier id = new DelegationTokenIdentifier(); + try { + TokenStoreDelegationTokenSecretManager.decodeWritable(id, node); + result.add(id); + } catch (Exception e) { + LOGGER.warn("Failed to decode token '{}'", node); + } + } + return result; + } + +} Index: shims/src/0.20S/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java =================================================================== --- shims/src/0.20S/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java (revision 0) +++ shims/src/0.20S/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java (revision 0) @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.thrift; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; + +/** + * Default in-memory token store implementation. + */ +public class MemoryTokenStore implements TokenStoreDelegationTokenSecretManager.TokenStore { + + private final java.util.concurrent.ConcurrentHashMap masterKeys + = new java.util.concurrent.ConcurrentHashMap(); + + private final java.util.concurrent.ConcurrentHashMap tokens + = new java.util.concurrent.ConcurrentHashMap(); + + private final AtomicInteger masterKeySeq = new AtomicInteger(); + + @Override + public void setConf(Configuration conf) { + } + + @Override + public Configuration getConf() { + return null; + } + + @Override + public int addMasterKey(String s) { + int keySeq = masterKeySeq.getAndIncrement(); + masterKeys.putIfAbsent(keySeq, s); + return keySeq; + } + + @Override + public void updateMasterKey(int keySeq, String s) { + masterKeys.put(keySeq, s); + } + + @Override + public boolean removeMasterKey(int keySeq) { + return masterKeys.remove(keySeq) != null; + } + + @Override + public String[] getMasterKeys() { + return masterKeys.values().toArray(new String[0]); + } + + @Override + public boolean addToken(DelegationTokenIdentifier tokenIdentifier, + DelegationTokenInformation token) { + DelegationTokenInformation tokenInfo = tokens.putIfAbsent(tokenIdentifier, token); + return (tokenInfo == null); + } + + @Override + public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) { + DelegationTokenInformation tokenInfo = tokens.remove(tokenIdentifier); + return tokenInfo != null; + } + + @Override + public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) { + return tokens.get(tokenIdentifier); + } + + @Override + public List getAllDelegationTokenIdentifiers() { + List result = new java.util.ArrayList( + tokens.size()); + for (DelegationTokenIdentifier id : tokens.keySet()) { + result.add(id); + } + return result; + } + +} Index: shims/src/0.20S/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java =================================================================== --- shims/src/0.20S/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java (revision 0) +++ shims/src/0.20S/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java (revision 0) @@ -0,0 +1,407 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.thrift; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; +import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport; +import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Extension of {@link DelegationTokenSecretManager} to support alternative to default in-memory + * token management for fail-over and clustering through plug-able token store (ZooKeeper etc.). + * Delegation tokens will be retrieved from the store on-demand and (unlike base class behavior) not + * cached in memory. This avoids complexities related to token expiration. The security token is + * needed only at the time the transport is opened (as opposed to per interface operation). The + * assumption therefore is low cost of interprocess token retrieval (for random read efficient store + * such as ZooKeeper) compared to overhead of synchronizing per-process in-memory token caches. + * The wrapper incorporates the token store abstraction within the limitations of current + * Hive/Hadoop dependency (.20S) with minimum code duplication. + * Eventually this should be supported by Hadoop security directly. + */ +public class TokenStoreDelegationTokenSecretManager extends DelegationTokenSecretManager { + + private static final Logger LOGGER = + LoggerFactory.getLogger(TokenStoreDelegationTokenSecretManager.class.getName()); + + /** + * Exception for internal token store errors that typically cannot be handled by the caller. + */ + public static class TokenStoreError extends RuntimeException { + private static final long serialVersionUID = -8693819817623074083L; + + public TokenStoreError(Throwable cause) { + super(cause); + } + + public TokenStoreError(String message, Throwable cause) { + super(message, cause); + } + } + + /** + * Interface for pluggable token store that can be implemented as shared store with external + * storage (for example with ZooKeeper for HA). + * Internal, store specific errors are translated into {@link TokenStoreError}. + */ + public static interface TokenStore extends Configurable { + /** + * Add new master key. The token store assigns and returns the sequence number. + * Caller needs to use the identifier to update the key (since it is embedded in the key). + * + * @param s + * @return sequence number for new key + */ + int addMasterKey(String s) throws TokenStoreError; + + void updateMasterKey(int keySeq, String s) throws TokenStoreError; + + /** + * Remove key for given id. + * @param keySeq + * @return false if key no longer present, true otherwise. + */ + boolean removeMasterKey(int keySeq); + + String[] getMasterKeys() throws TokenStoreError; + + /** + * Add token. If identifier is already present, token won't be added. + * @param tokenIdentifier + * @param token + * @return true if token was added, false for existing identifier + */ + boolean addToken(DelegationTokenIdentifier tokenIdentifier, + DelegationTokenInformation token) throws TokenStoreError; + + /** + * Get token. Returns null if the token does not exist. + * @param tokenIdentifier + * @return + */ + DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) + throws TokenStoreError; + + /** + * Remove token. Ignores token does not exist. + * @param tokenIdentifier + */ + boolean removeToken(DelegationTokenIdentifier tokenIdentifier) throws TokenStoreError; + + /** + * List of all token identifiers in the store. This is used to remove expired tokens + * and a potential scalability improvement would be to partition by master key id + * @return + */ + List getAllDelegationTokenIdentifiers(); + + } + + final private long keyUpdateInterval; + final private long tokenRemoverScanInterval; + private Thread tokenRemoverThread; + + final private TokenStore tokenStore; + + public TokenStoreDelegationTokenSecretManager(long delegationKeyUpdateInterval, + long delegationTokenMaxLifetime, long delegationTokenRenewInterval, + long delegationTokenRemoverScanInterval, TokenStore sharedStore) { + super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, delegationTokenRenewInterval, + delegationTokenRemoverScanInterval); + this.keyUpdateInterval = delegationKeyUpdateInterval; + this.tokenRemoverScanInterval = delegationTokenRemoverScanInterval; + + this.tokenStore = sharedStore; + } + + protected DelegationTokenIdentifier getTokenIdentifier(Token token) + throws IOException { + // turn bytes back into identifier for cache lookup + ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier()); + DataInputStream in = new DataInputStream(buf); + DelegationTokenIdentifier id = createIdentifier(); + id.readFields(in); + return id; + } + + protected Map reloadKeys() { + // read keys from token store + String[] allKeys = tokenStore.getMasterKeys(); + Map keys + = new java.util.HashMap(allKeys.length); + for (String keyStr : allKeys) { + DelegationKey key = new DelegationKey(); + try { + decodeWritable(key, keyStr); + keys.put(key.getKeyId(), key); + } catch (IOException ex) { + LOGGER.error("Failed to load master key.", ex); + } + } + synchronized (this) { + super.allKeys.clear(); + super.allKeys.putAll(keys); + } + return keys; + } + + @Override + public byte[] retrievePassword(DelegationTokenIdentifier identifier) + throws org.apache.hadoop.security.token.SecretManager.InvalidToken { + DelegationTokenInformation info = this.tokenStore.getToken(identifier); + if (info == null) { + throw new InvalidToken("token expired or does not exist: " + identifier); + } + // must reuse super as info.getPassword is not accessible + synchronized (this) { + try { + super.currentTokens.put(identifier, info); + return super.retrievePassword(identifier); + } finally { + super.currentTokens.remove(identifier); + } + } + } + + @Override + public DelegationTokenIdentifier cancelToken(Token token, + String canceller) throws IOException { + DelegationTokenIdentifier id = getTokenIdentifier(token); + LOGGER.info("Token cancelation requested for identifier: "+id); + this.tokenStore.removeToken(id); + return id; + } + + /** + * Create the password and add it to shared store. + */ + @Override + protected byte[] createPassword(DelegationTokenIdentifier id) { + byte[] password; + DelegationTokenInformation info; + synchronized (this) { + password = super.createPassword(id); + // add new token to shared store + // need to persist expiration along with password + info = super.currentTokens.remove(id); + if (info == null) { + throw new IllegalStateException("Failed to retrieve token after creation"); + } + } + this.tokenStore.addToken(id, info); + return password; + } + + @Override + public long renewToken(Token token, + String renewer) throws InvalidToken, IOException { + // since renewal is KERBEROS authenticated token may not be cached + final DelegationTokenIdentifier id = getTokenIdentifier(token); + DelegationTokenInformation tokenInfo = this.tokenStore.getToken(id); + if (tokenInfo == null) { + throw new InvalidToken("token does not exist: " + id); // no token found + } + // ensure associated master key is available + if (!super.allKeys.containsKey(id.getMasterKeyId())) { + LOGGER.info("Unknown master key (id={}), (re)loading keys from token store.", + id.getMasterKeyId()); + reloadKeys(); + } + // reuse super renewal logic + synchronized (this) { + super.currentTokens.put(id, tokenInfo); + try { + return super.renewToken(token, renewer); + } finally { + super.currentTokens.remove(id); + } + } + } + + public static String encodeWritable(Writable key) throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + key.write(dos); + dos.flush(); + return Base64.encodeBase64URLSafeString(bos.toByteArray()); + } + + public static void decodeWritable(Writable w, String idStr) throws IOException { + DataInputStream in = new DataInputStream(new ByteArrayInputStream(Base64.decodeBase64(idStr))); + w.readFields(in); + } + + /** + * Synchronize master key updates / sequence generation for multiple nodes. + * NOTE: {@Link AbstractDelegationTokenSecretManager} keeps currentKey private, so we need + * to utilize this "hook" to manipulate the key through the object reference. + * This .20S workaround should cease to exist when Hadoop supports token store. + */ + @Override + protected void logUpdateMasterKey(DelegationKey key) throws IOException { + int keySeq = this.tokenStore.addMasterKey(encodeWritable(key)); + // update key with assigned identifier + DelegationKey keyWithSeq = new DelegationKey(keySeq, key.getExpiryDate(), key.getKey()); + String keyStr = encodeWritable(keyWithSeq); + this.tokenStore.updateMasterKey(keySeq, keyStr); + decodeWritable(key, keyStr); + LOGGER.info("New master key with key id={}", key.getKeyId()); + super.logUpdateMasterKey(key); + } + + @Override + public synchronized void startThreads() throws IOException { + try { + // updateCurrentKey needs to be called to initialize the master key + // (there should be a null check added in the future in rollMasterKey) + // updateCurrentKey(); + Method m = AbstractDelegationTokenSecretManager.class.getDeclaredMethod("updateCurrentKey"); + m.setAccessible(true); + m.invoke(this); + } catch (Exception e) { + throw new IOException("Failed to initialize master key", e); + } + running = true; + tokenRemoverThread = new Daemon(new ExpiredTokenRemover()); + tokenRemoverThread.start(); + } + + @Override + public synchronized void stopThreads() { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Stopping expired delegation token remover thread"); + } + running = false; + if (tokenRemoverThread != null) { + tokenRemoverThread.interrupt(); + } + } + + /** + * Remove expired tokens. Replaces logic in {@link AbstractDelegationTokenSecretManager} + * that cannot be reused due to private method access. Logic here can more efficiently + * deal with external token store by only loading into memory the minimum data needed. + */ + protected void removeExpiredTokens() { + long now = System.currentTimeMillis(); + Iterator i = tokenStore.getAllDelegationTokenIdentifiers() + .iterator(); + while (i.hasNext()) { + DelegationTokenIdentifier id = i.next(); + if (now > id.getMaxDate()) { + this.tokenStore.removeToken(id); // no need to look at token info + } else { + // get token info to check renew date + DelegationTokenInformation tokenInfo = tokenStore.getToken(id); + if (tokenInfo != null) { + if (now > tokenInfo.getRenewDate()) { + this.tokenStore.removeToken(id); + } + } + } + } + } + + /** + * Extension of rollMasterKey to remove expired keys from store. + * @throws IOException + */ + protected void rollMasterKeyExt() throws IOException { + Map keys = reloadKeys(); + int currentKeyId = super.currentId; + HiveDelegationTokenSupport.rollMasterKey(TokenStoreDelegationTokenSecretManager.this); + List keysAfterRoll = Arrays.asList(getAllKeys()); + for (DelegationKey key : keysAfterRoll) { + keys.remove(key.getKeyId()); + if (key.getKeyId() == currentKeyId) { + tokenStore.updateMasterKey(currentKeyId, encodeWritable(key)); + } + } + for (DelegationKey expiredKey : keys.values()) { + LOGGER.info("Removing expired key id={}", expiredKey.getKeyId()); + tokenStore.removeMasterKey(expiredKey.getKeyId()); + } + } + + + /** + * Cloned from {@link AbstractDelegationTokenSecretManager} to deal with private access + * restriction (there would not be an need to clone the remove thread if the remove logic was + * protected/extensible). + */ + protected class ExpiredTokenRemover extends Thread { + private long lastMasterKeyUpdate; + private long lastTokenCacheCleanup; + + @Override + public void run() { + LOGGER.info("Starting expired delegation token remover thread, " + + "tokenRemoverScanInterval=" + tokenRemoverScanInterval + / (60 * 1000) + " min(s)"); + try { + while (running) { + long now = System.currentTimeMillis(); + if (lastMasterKeyUpdate + keyUpdateInterval < now) { + try { + rollMasterKeyExt(); + lastMasterKeyUpdate = now; + } catch (IOException e) { + LOGGER.error("Master key updating failed. " + + StringUtils.stringifyException(e)); + } + } + if (lastTokenCacheCleanup + tokenRemoverScanInterval < now) { + removeExpiredTokens(); + lastTokenCacheCleanup = now; + } + try { + Thread.sleep(5000); // 5 seconds + } catch (InterruptedException ie) { + LOGGER + .error("InterruptedExcpetion recieved for ExpiredTokenRemover thread " + + ie); + } + } + } catch (Throwable t) { + LOGGER.error("ExpiredTokenRemover thread received unexpected exception. " + + t, t); + Runtime.getRuntime().exit(-1); + } + } + } + +}