Index: shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java =================================================================== --- shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (revision 1050190) +++ shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (working copy) @@ -23,6 +23,7 @@ import java.lang.reflect.Constructor; import java.util.ArrayList; import java.util.List; + import javax.security.auth.login.LoginException; import org.apache.hadoop.conf.Configuration; @@ -49,8 +50,8 @@ import org.apache.hadoop.mapred.lib.CombineFileInputFormat; import org.apache.hadoop.mapred.lib.CombineFileSplit; import org.apache.hadoop.mapred.lib.NullOutputFormat; +import org.apache.hadoop.security.UnixUserGroupInformation; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.UnixUserGroupInformation; import org.apache.hadoop.tools.HadoopArchives; import org.apache.hadoop.util.ToolRunner; @@ -449,4 +450,14 @@ } return ugi; } + + @Override + public String getTokenStrForm() throws IOException { + throw new UnsupportedOperationException("Tokens are not supported in current hadoop version"); + } + + @Override + public String getTokenStrForm(String tokenSignature) throws IOException { + throw new UnsupportedOperationException("Tokens are not supported in current hadoop version"); + } } Index: shims/src/0.20S/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java =================================================================== --- shims/src/0.20S/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java (revision 1050190) +++ shims/src/0.20S/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java (working copy) @@ -15,21 +15,35 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hive.thrift; - import java.io.IOException; +import java.io.IOException; import java.security.PrivilegedAction; import java.security.PrivilegedExceptionAction; +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.callback.NameCallback; +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.sasl.AuthorizeCallback; +import javax.security.sasl.RealmCallback; +import javax.security.sasl.RealmChoiceCallback; import javax.security.sasl.SaslException; +import javax.security.sasl.SaslServer; +import org.apache.commons.codec.binary.Base64; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.SaslRpcServer.AuthMethod; +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.thrift.TException; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TProtocol; @@ -66,14 +80,27 @@ * @param serverPrincipal The Kerberos principal of the target server. * @param underlyingTransport The underlying transport mechanism, usually a TSocket. */ + @Override public TTransport createClientTransport( String principalConfig, String host, - String methodStr, TTransport underlyingTransport) + String methodStr, String tokenStrForm, TTransport underlyingTransport) throws IOException { AuthMethod method = AuthMethod.valueOf(AuthMethod.class, methodStr); + TTransport saslTransport = null; switch (method) { + case DIGEST: + Token t= new Token(); + t.decodeFromUrlString(tokenStrForm); + saslTransport = new TSaslClientTransport( + method.getMechanismName(), + null, + null, SaslRpcServer.SASL_DEFAULT_REALM, + SaslRpcServer.SASL_PROPS, new SaslClientCallbackHandler(t), + underlyingTransport); + return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser()); + case KERBEROS: String serverPrincipal = SecurityUtil.getServerPrincipal(principalConfig, host); String names[] = SaslRpcServer.splitKerberosName(serverPrincipal); @@ -83,7 +110,7 @@ + serverPrincipal); } try { - TTransport saslTransport = new TSaslClientTransport( + saslTransport = new TSaslClientTransport( method.getMechanismName(), null, names[0], names[1], @@ -95,14 +122,85 @@ } default: - throw new IOException("Unsupported authentication method: " +method); + throw new IOException("Unsupported authentication method: " + method); } } + private static class SaslClientCallbackHandler implements CallbackHandler { + private final String userName; + private final char[] userPassword; + + public SaslClientCallbackHandler(Token token) { + this.userName = encodeIdentifier(token.getIdentifier()); + this.userPassword = encodePassword(token.getPassword()); } + public void handle(Callback[] callbacks) + throws UnsupportedCallbackException { + NameCallback nc = null; + PasswordCallback pc = null; + RealmCallback rc = null; + for (Callback callback : callbacks) { + if (callback instanceof RealmChoiceCallback) { + continue; + } else if (callback instanceof NameCallback) { + nc = (NameCallback) callback; + } else if (callback instanceof PasswordCallback) { + pc = (PasswordCallback) callback; + } else if (callback instanceof RealmCallback) { + rc = (RealmCallback) callback; + } else { + throw new UnsupportedCallbackException(callback, + "Unrecognized SASL client callback"); + } + } + if (nc != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("SASL client callback: setting username: " + userName); + } + nc.setName(userName); + } + if (pc != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("SASL client callback: setting userPassword"); + } + pc.setPassword(userPassword); + } + if (rc != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("SASL client callback: setting realm: " + + rc.getDefaultText()); + } + rc.setText(rc.getDefaultText()); + } + } + + static String encodeIdentifier(byte[] identifier) { + return new String(Base64.encodeBase64(identifier)); + } + + static char[] encodePassword(byte[] password) { + return new String(Base64.encodeBase64(password)).toCharArray(); + } + } + } + public static class Server extends HadoopThriftAuthBridge.Server { private final UserGroupInformation realUgi; - + DelegationTokenSecretManager secretManager; + private final static long DELEGATION_TOKEN_GC_INTERVAL = 3600000; // 1 hour + //Delegation token related keys + public static final String DELEGATION_KEY_UPDATE_INTERVAL_KEY = + "hive.cluster.delegation.key.update-interval"; + public static final long DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT = + 24*60*60*1000; // 1 day + public static final String DELEGATION_TOKEN_RENEW_INTERVAL_KEY = + "hive.cluster.delegation.token.renew-interval"; + public static final long DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT = + 24*60*60*1000; // 1 day + public static final String DELEGATION_TOKEN_MAX_LIFETIME_KEY = + "hive.cluster.delegation.token.max-lifetime"; + public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT = + 7*24*60*60*1000; // 7 days /** * TODO: javadoc */ @@ -143,7 +241,7 @@ String kerberosName = realUgi.getUserName(); final String names[] = SaslRpcServer.splitKerberosName(kerberosName); if (names.length != 3) { - throw new TTransportException("Kerberos principal should have 3 parts: " +kerberosName); + throw new TTransportException("Kerberos principal should have 3 parts: " + kerberosName); } TSaslServerTransport.Factory transFactory = new TSaslServerTransport.Factory(); @@ -152,6 +250,9 @@ names[0], names[1], // two parts of kerberos principal SaslRpcServer.SASL_PROPS, new SaslRpcServer.SaslGssCallbackHandler()); + transFactory.addServerDefinition(AuthMethod.DIGEST.getMechanismName(), + null, SaslRpcServer.SASL_DEFAULT_REALM, + SaslRpcServer.SASL_PROPS, new SaslDigestCallbackHandler(secretManager)); return new TUGIAssumingTransportFactory(transFactory, realUgi); } @@ -163,10 +264,161 @@ */ @Override public TProcessor wrapProcessor(TProcessor processor) { - return new TUGIAssumingProcessor(processor); + return new TUGIAssumingProcessor(processor, secretManager); } + @Override + public void startDelegationTokenSecretManager(Configuration conf) + throws IOException{ + long secretKeyInterval = + conf.getLong(DELEGATION_KEY_UPDATE_INTERVAL_KEY, + DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT); + long tokenMaxLifetime = + conf.getLong(DELEGATION_TOKEN_MAX_LIFETIME_KEY, + DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT); + long tokenRenewInterval = + conf.getLong(DELEGATION_TOKEN_RENEW_INTERVAL_KEY, + DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT); + secretManager = + new DelegationTokenSecretManager(secretKeyInterval, + tokenMaxLifetime, + tokenRenewInterval, + DELEGATION_TOKEN_GC_INTERVAL); + secretManager.startThreads(); + } + + @Override + public String getDelegationToken(String renewer) throws IOException { + if (!isAllowedDelegationTokenOp()) { + throw new IOException("Delegation Token can be issued only with kerberos authentication"); + } + return secretManager.getDelegationToken(renewer); + } + + @Override + public long renewDelegationToken(String tokenStrForm) throws IOException { + if (!isAllowedDelegationTokenOp()) { + throw new IOException("Delegation Token can be issued only with kerberos authentication"); + } + return secretManager.renewDelegationToken(tokenStrForm); + } + + @Override + public String getDelegationToken(String renewer, String token_signature) + throws IOException { + if (!isAllowedDelegationTokenOp()) { + throw new IOException("Delegation Token can be issued only with kerberos authentication"); + } + return secretManager.getDelegationToken(renewer, token_signature); + } + + @Override + public void cancelDelegationToken(String tokenStrForm) throws IOException { + secretManager.cancelDelegationToken(tokenStrForm); + } + + private boolean isAllowedDelegationTokenOp() throws IOException { + + AuthenticationMethod authMethod = getConnectionAuthenticationMethod(); + if (UserGroupInformation.isSecurityEnabled() + && (authMethod != AuthenticationMethod.KERBEROS) + && (authMethod != AuthenticationMethod.KERBEROS_SSL) + && (authMethod != AuthenticationMethod.CERTIFICATE)) { + return false; + } + return true; + } + /** + * Returns authentication method used to establish the connection + * @return AuthenticationMethod used to establish connection + * @throws IOException + **/ + private AuthenticationMethod getConnectionAuthenticationMethod() + throws IOException { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + AuthenticationMethod authMethod = ugi.getAuthenticationMethod(); + if (authMethod == AuthenticationMethod.PROXY) { + authMethod = ugi.getRealUser().getAuthenticationMethod(); + } + return authMethod; + } + + /** CallbackHandler for SASL DIGEST-MD5 mechanism */ + // This code is pretty much completely based on Hadoop's + // SaslRpcServer.SaslDigestCallbackHandler - the only reason we could not + // use that Hadoop class as-is was because it needs a Server.Connection object + // which is relevant in hadoop rpc but not here in the metastore - so the + // code below does not deal with the Connection Server.object. + private static class SaslDigestCallbackHandler implements CallbackHandler { + private final DelegationTokenSecretManager secretManager; + + public SaslDigestCallbackHandler( + DelegationTokenSecretManager secretManager) { + this.secretManager = secretManager; + } + + private char[] getPassword(DelegationTokenIdentifier tokenid) throws InvalidToken { + return encodePassword(secretManager.retrievePassword(tokenid)); + } + + private char[] encodePassword(byte[] password) { + return new String(Base64.encodeBase64(password)).toCharArray(); + } + /** {@inheritDoc} */ + @Override + public void handle(Callback[] callbacks) throws InvalidToken, + UnsupportedCallbackException { + NameCallback nc = null; + PasswordCallback pc = null; + AuthorizeCallback ac = null; + for (Callback callback : callbacks) { + if (callback instanceof AuthorizeCallback) { + ac = (AuthorizeCallback) callback; + } else if (callback instanceof NameCallback) { + nc = (NameCallback) callback; + } else if (callback instanceof PasswordCallback) { + pc = (PasswordCallback) callback; + } else if (callback instanceof RealmCallback) { + continue; // realm is ignored + } else { + throw new UnsupportedCallbackException(callback, + "Unrecognized SASL DIGEST-MD5 Callback"); + } + } + if (pc != null) { + DelegationTokenIdentifier tokenIdentifier = SaslRpcServer. + getIdentifier(nc.getDefaultName(), secretManager); + char[] password = getPassword(tokenIdentifier); + + if (LOG.isDebugEnabled()) { + LOG.debug("SASL server DIGEST-MD5 callback: setting password " + + "for client: " + tokenIdentifier.getUser()); + } + pc.setPassword(password); + } + if (ac != null) { + String authid = ac.getAuthenticationID(); + String authzid = ac.getAuthorizationID(); + if (authid.equals(authzid)) { + ac.setAuthorized(true); + } else { + ac.setAuthorized(false); + } + if (ac.isAuthorized()) { + if (LOG.isDebugEnabled()) { + String username = + SaslRpcServer.getIdentifier(authzid, secretManager).getUser().getUserName(); + LOG.debug("SASL server DIGEST-MD5 callback: setting " + + "canonicalized client ID: " + username); + } + ac.setAuthorizedID(authzid); + } + } + } + } + + /** * Processor that pulls the SaslServer object out of the transport, and * assumes the remote user's UGI before calling through to the original * processor. @@ -175,22 +427,36 @@ */ private class TUGIAssumingProcessor implements TProcessor { final TProcessor wrapped; - - TUGIAssumingProcessor(TProcessor wrapped) { + DelegationTokenSecretManager secretManager; + TUGIAssumingProcessor(TProcessor wrapped, DelegationTokenSecretManager secretManager) { this.wrapped = wrapped; + this.secretManager = secretManager; } public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException { TTransport trans = inProt.getTransport(); if (!(trans instanceof TSaslServerTransport)) { - throw new TException("Unexpected non-SASL transport " +trans.getClass()); + throw new TException("Unexpected non-SASL transport " + trans.getClass()); } TSaslServerTransport saslTrans = (TSaslServerTransport)trans; - String authId = saslTrans.getSaslServer().getAuthorizationID(); + SaslServer saslServer = saslTrans.getSaslServer(); + String authId = saslServer.getAuthorizationID(); + LOG.debug("AUTH ID ======>" + authId); + String endUser = authId; + if(saslServer.getMechanismName().equals("DIGEST-MD5")) { try { + TokenIdentifier tokenId = SaslRpcServer.getIdentifier(authId, + secretManager); + endUser = tokenId.getUser().getUserName(); + } catch (InvalidToken e) { + throw new TException(e.getMessage()); + } + } + try { UserGroupInformation clientUgi = UserGroupInformation.createProxyUser( - authId, UserGroupInformation.getLoginUser()); + endUser, UserGroupInformation.getLoginUser()); + return clientUgi.doAs(new PrivilegedExceptionAction() { public Boolean run() { try { Index: shims/src/0.20S/java/org/apache/hadoop/hive/thrift/DelegationTokenIdentifier.java =================================================================== --- shims/src/0.20S/java/org/apache/hadoop/hive/thrift/DelegationTokenIdentifier.java (revision 0) +++ shims/src/0.20S/java/org/apache/hadoop/hive/thrift/DelegationTokenIdentifier.java (revision 0) @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.thrift; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; + +/** + * A delegation token identifier that is specific to Hive. + */ +public class DelegationTokenIdentifier + extends AbstractDelegationTokenIdentifier { + public static final Text HIVE_DELEGATION_KIND = new Text("HIVE_DELEGATION_TOKEN"); + + /** + * Create an empty delegation token identifier for reading into. + */ + public DelegationTokenIdentifier() { + } + + /** + * Create a new delegation token identifier + * @param owner the effective username of the token owner + * @param renewer the username of the renewer + * @param realUser the real username of the token owner + */ + public DelegationTokenIdentifier(Text owner, Text renewer, Text realUser) { + super(owner, renewer, realUser); + } + + @Override + public Text getKind() { + return HIVE_DELEGATION_KIND; + } + +} Index: shims/src/0.20S/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java =================================================================== --- shims/src/0.20S/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java (revision 0) +++ shims/src/0.20S/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java (revision 0) @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.thrift; + +import java.io.IOException; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; + +/** + * A Hive specific delegation token secret manager. + * The secret manager is responsible for generating and accepting the password + * for each token. + */ +public class DelegationTokenSecretManager + extends AbstractDelegationTokenSecretManager { + + /** + * Create a secret manager + * @param delegationKeyUpdateInterval the number of seconds for rolling new + * secret keys. + * @param delegationTokenMaxLifetime the maximum lifetime of the delegation + * tokens + * @param delegationTokenRenewInterval how often the tokens must be renewed + * @param delegationTokenRemoverScanInterval how often the tokens are scanned + * for expired tokens + */ + public DelegationTokenSecretManager(long delegationKeyUpdateInterval, + long delegationTokenMaxLifetime, + long delegationTokenRenewInterval, + long delegationTokenRemoverScanInterval) { + super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, + delegationTokenRenewInterval, delegationTokenRemoverScanInterval); + } + + @Override + public DelegationTokenIdentifier createIdentifier() { + return new DelegationTokenIdentifier(); + } + + public synchronized void cancelDelegationToken(String tokenStrForm) throws IOException { + Token t= new Token(); + t.decodeFromUrlString(tokenStrForm); + String user = UserGroupInformation.getCurrentUser().getUserName(); + cancelToken(t, user); + } + + public synchronized long renewDelegationToken(String tokenStrForm) throws IOException { + Token t= new Token(); + t.decodeFromUrlString(tokenStrForm); + String user = UserGroupInformation.getCurrentUser().getUserName(); + return renewToken(t, user); + } + + public synchronized String getDelegationToken(String renewer, String tokenSignature) throws IOException { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + Text owner = new Text(ugi.getUserName()); + Text realUser = null; + if (ugi.getRealUser() != null) { + realUser = new Text(ugi.getRealUser().getUserName()); + } + DelegationTokenIdentifier ident = + new DelegationTokenIdentifier(owner, new Text(renewer), realUser); + Token t = new Token( + ident, this); + if(tokenSignature != null) { + t.setService(new Text(tokenSignature)); + } + return t.encodeToUrlString(); + } + + public synchronized String getDelegationToken(String renewer) throws IOException { + return getDelegationToken(renewer, null); + } +} + Index: shims/src/0.20S/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector.java =================================================================== --- shims/src/0.20S/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector.java (revision 0) +++ shims/src/0.20S/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector.java (revision 0) @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.thrift; + +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector; + +/** + * A delegation token that is specialized for Hive + */ + +public class DelegationTokenSelector + extends AbstractDelegationTokenSelector{ + + public DelegationTokenSelector() { + super(DelegationTokenIdentifier.HIVE_DELEGATION_KIND); + } +} Index: shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java =================================================================== --- shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (revision 1050190) +++ shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (working copy) @@ -23,7 +23,6 @@ import java.lang.reflect.Constructor; import java.util.ArrayList; import java.util.List; -import javax.security.auth.login.LoginException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -31,6 +30,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.thrift.DelegationTokenSelector; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.FileInputFormat; @@ -50,6 +50,9 @@ import org.apache.hadoop.mapred.lib.CombineFileSplit; import org.apache.hadoop.mapred.lib.NullOutputFormat; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.security.token.TokenSelector; import org.apache.hadoop.tools.HadoopArchives; import org.apache.hadoop.util.ToolRunner; @@ -443,4 +446,19 @@ public UserGroupInformation getUGIForConf(Configuration conf) throws IOException { return UserGroupInformation.getCurrentUser(); } + + @Override + public String getTokenStrForm() throws IOException { + return getTokenStrForm(null); + } + + @Override + public String getTokenStrForm(String tokenSignature) throws IOException { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + TokenSelector tokenSelector = new DelegationTokenSelector(); + + Token token = tokenSelector.selectToken( + tokenSignature == null ? new Text() : new Text(tokenSignature), ugi.getTokens()); + return token != null ? token.encodeToUrlString() : null; + } } Index: shims/src/common/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java =================================================================== --- shims/src/common/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java (revision 1050190) +++ shims/src/common/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java (working copy) @@ -20,15 +20,15 @@ import java.io.IOException; +import org.apache.hadoop.conf.Configuration; import org.apache.thrift.TProcessor; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; import org.apache.thrift.transport.TTransportFactory; - /** * This class is only overridden by the secure hadoop shim. It allows * the Thrift SASL support to bridge to Hadoop's UserGroupInformation - * infrastructure. + * & DelegationToken infrastructure. */ public class HadoopThriftAuthBridge { public Client createClient() { @@ -44,15 +44,33 @@ public static abstract class Client { + /** + * + * @param principalConfig In the case of Kerberos authentication this will + * be the kerberos principal name, for DIGEST-MD5 (delegation token) based + * authentication this will be null + * @param host The metastore server host name + * @param methodStr "KERBEROS" or "DIGEST" + * @param tokenStrForm This is url encoded string form of + * org.apache.hadoop.security.token. + * @param underlyingTransport the underlying transport + * @return the transport + * @throws IOException + */ public abstract TTransport createClientTransport( String principalConfig, String host, - String methodStr, TTransport underlyingTransport) + String methodStr,String tokenStrForm, TTransport underlyingTransport) throws IOException; } public static abstract class Server { public abstract TTransportFactory createTransportFactory() throws TTransportException; public abstract TProcessor wrapProcessor(TProcessor processor); + public abstract void startDelegationTokenSecretManager(Configuration conf) throws IOException; + public abstract String getDelegationToken(String renewer) throws IOException; + public abstract long renewDelegationToken(String tokenStrForm) throws IOException; + public abstract String getDelegationToken(String renewer, String token_signature) throws IOException; + public abstract void cancelDelegationToken(String tokenStrForm) throws IOException; } } Index: shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java =================================================================== --- shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java (revision 1050190) +++ shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java (working copy) @@ -21,6 +21,10 @@ import java.io.DataOutput; import java.io.IOException; +import javax.security.auth.login.LoginException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -35,11 +39,7 @@ import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.TaskCompletionEvent; import org.apache.hadoop.security.UserGroupInformation; -import javax.security.auth.login.LoginException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - /** * In order to be compatible with multiple versions of Hadoop, all parts * of the Hadoop interface that are not cross-version compatible are @@ -159,6 +159,33 @@ public UserGroupInformation getUGIForConf(Configuration conf) throws LoginException, IOException; /** + * + * @return the string form of any delegation tokens present in the job's credential store + * This is relevant only when running against a "secure" hadoop release + * The method gets hold of the tokens if they are set up by hadoop- this should + * happen on the map/reduce tasks if the client added the tokens into hadoop's + * credential store in the front end during job submission. The method will + * select the hive delegation token among the set of tokens and return the string + * form of it + * @throws IOException + */ + String getTokenStrForm() throws IOException; + + + /** + * + * @return the string form of any delegation tokens present in the job's credential store + * This is relevant only when running against a "secure" hadoop release + * The method gets hold of the tokens if they are set up by hadoop- this should + * happen on the map/reduce tasks if the client added the tokens into hadoop's + * credential store in the front end during job submission. The method will + * select the hive delegation token among the set of tokens and return the string + * form of it + * @throws IOException + */ + String getTokenStrForm(String tokenSignature) throws IOException; + + /** * InputSplitShim. * */ Index: metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java (revision 1050190) +++ metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java (working copy) @@ -478,4 +478,14 @@ public boolean dropIndex(String db_name, String tbl_name, String name, boolean deleteData) throws NoSuchObjectException, MetaException, TException; + + public String getDelegationTokenWithSignature(String renewerPrincipalName, String tokenSignature) + throws MetaException, TException; + + public String getDelegationToken(String renewerPrincipalName) + throws MetaException, TException; + + public long renewDelegationToken(String tokenStrForm) throws MetaException, TException; + + public void cancelDelegationToken(String tokenStrForm) throws MetaException, TException; } Index: metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java (revision 1050190) +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java (working copy) @@ -66,6 +66,8 @@ private final boolean standAloneClient = false; private final HiveMetaHookLoader hookLoader; private final HiveConf conf; + private String tokenStrForm; + private final boolean localMetaStore; // for thrift connects private int retries = 5; @@ -86,7 +88,7 @@ } this.conf = conf; - boolean localMetaStore = conf.getBoolean("hive.metastore.local", false); + localMetaStore = conf.getBoolean("hive.metastore.local", false); if (localMetaStore) { // instantiate the metastore server handler directly instead of connecting // through the network @@ -183,9 +185,29 @@ try { HadoopThriftAuthBridge.Client authBridge = ShimLoader.getHadoopThriftAuthBridge().createClient(); + + // check if we should use delegation tokens to authenticate + + // the call below gets hold of the tokens if they are set up by hadoop + // this should happen on the map/reduce tasks if the client added the + // tokens into hadoop's credential store in the front end during job + // submission. + String tokenSig = conf.getVar(HiveConf.ConfVars.METASTORE_TOKEN_SIGNATURE); + if(tokenSig == null) { + tokenStrForm = ShimLoader.getHadoopShims().getTokenStrForm(); + } else { + tokenStrForm = ShimLoader.getHadoopShims().getTokenStrForm(tokenSig); + } + if(tokenStrForm != null) { + // authenticate using delegation tokens via the "DIGEST" mechanism + transport = authBridge.createClientTransport(null, store.getHost(), + "DIGEST", tokenStrForm, transport); + } else { String principalConfig = conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL); transport = authBridge.createClientTransport( - principalConfig, store.getHost(), "KERBEROS",transport); + principalConfig, store.getHost(), "KERBEROS", null, + transport); + } } catch (IOException ioe) { LOG.error("Couldn't create client transport", ioe); throw new MetaException(ioe.toString()); @@ -217,6 +239,10 @@ } } + public String getTokenStrForm() throws IOException { + return tokenStrForm; + } + public void close() { open = false; if ((transport != null) && transport.isOpen()) { @@ -918,4 +944,43 @@ return client.drop_index_by_name(dbName, tblName, name, deleteData); } + @Override + public String getDelegationTokenWithSignature(String renewerPrincipalName, + String tokenSignature) throws + MetaException, TException { + if(localMetaStore) { + throw new UnsupportedOperationException("getDelegationToken() can be " + + "called only in thrift (non local) mode"); + } + return client.get_delegation_token_with_signature(renewerPrincipalName, tokenSignature); + } + + @Override + public String getDelegationToken(String renewerPrincipalName) throws + MetaException, TException { + if(localMetaStore) { + throw new UnsupportedOperationException("getDelegationToken() can be " + + "called only in thrift (non local) mode"); + } + return client.get_delegation_token(renewerPrincipalName); + } + + @Override + public long renewDelegationToken(String tokenStrForm) throws MetaException, TException { + if(localMetaStore) { + throw new UnsupportedOperationException("renewDelegationToken() can be " + + "called only in thrift (non local) mode"); + } + return client.renew_delegation_token(tokenStrForm); + + } + + @Override + public void cancelDelegationToken(String tokenStrForm) throws MetaException, TException { + if(localMetaStore) { + throw new UnsupportedOperationException("renewDelegationToken() can be " + + "called only in thrift (non local) mode"); + } + client.cancel_delegation_token(tokenStrForm); + } } Index: metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (revision 1050190) +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (working copy) @@ -23,6 +23,7 @@ import static org.apache.hadoop.hive.metastore.MetaStoreUtils.DEFAULT_DATABASE_NAME; import static org.apache.hadoop.hive.metastore.MetaStoreUtils.validateName; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; @@ -80,6 +81,8 @@ public static final Log LOG = LogFactory.getLog( HiveMetaStore.class); + private static HadoopThriftAuthBridge.Server authBridge; + public static class HMSHandler extends FacebookBase implements ThriftHiveMetastore.Iface { public static final Log LOG = HiveMetaStore.LOG; @@ -2134,9 +2137,94 @@ return ret; } + @Override + public void cancel_delegation_token(String token_str_form) throws MetaException, TException { + incrementCounter("cancel_delegation_token"); + logStartFunction("cancel_delegation_token"); + try{ + + HiveMetaStore.cancelDelegationToken(token_str_form); + } catch(IOException e) { + throw new MetaException(e.getMessage()); } + } + + @Override + public String get_delegation_token_with_signature(String renewer_principal_name, String token_signature) throws MetaException, + TException { + incrementCounter("get_delegation_token_with_signature"); + logStartFunction("get_delegation_token_with_signature"); + try{ + return + HiveMetaStore.getDelegationToken(renewer_principal_name, token_signature); + } catch(IOException e) { + throw new MetaException(e.getMessage()); + } + } + + @Override + public long renew_delegation_token(String token_str_form) throws MetaException, TException { + incrementCounter("renew_delegation_token"); + logStartFunction("renew_delegation_token"); + try{ + + return HiveMetaStore.renewDelegationToken(token_str_form); + } catch(IOException e) { + throw new MetaException(e.getMessage()); + } + } + + @Override + public String get_delegation_token(String renewer_principal_name) + throws MetaException, TException { + incrementCounter("get_delegation_token_with_signature"); + logStartFunction("get_delegation_token_with_signature"); + try{ + return + HiveMetaStore.getDelegationToken(renewer_principal_name); + } catch(IOException e) { + throw new MetaException(e.getMessage()); + } + } + } + /** + * Discard a current delegation token. + */ + public static void cancelDelegationToken(String tokenStrForm + ) throws IOException { + + authBridge.cancelDelegationToken(tokenStrForm); + } + /** + * Get a new delegation token. + * @param token_signature + */ + public static String getDelegationToken(String renewer, String token_signature + )throws IOException { + + return authBridge.getDelegationToken(renewer, token_signature); + } + + /** + * Get a new delegation token. + * @param token_signature + */ + public static String getDelegationToken(String renewer)throws IOException { + + return authBridge.getDelegationToken(renewer); + } + /** + * Renew a delegation token to extend its lifetime. + */ + public static long renewDelegationToken(String tokenStrForm + ) throws IOException { + + return authBridge.renewDelegationToken(tokenStrForm); + } + + /** * @param args */ public static void main(String[] args) { @@ -2167,6 +2255,9 @@ HadoopThriftAuthBridge.Server authBridge = ShimLoader.getHadoopThriftAuthBridge().createServer( conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_KEYTAB_FILE), conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL)); + + // start delegation token manager + authBridge.startDelegationTokenSecretManager(conf); transFactory = authBridge.createTransportFactory(); processor = authBridge.wrapProcessor(processor); } else { Index: metastore/if/hive_metastore.thrift =================================================================== --- metastore/if/hive_metastore.thrift (revision 1050190) +++ metastore/if/hive_metastore.thrift (working copy) @@ -272,6 +272,21 @@ throws(1:NoSuchObjectException o1, 2:MetaException o2) list get_index_names(1:string db_name, 2:string tbl_name, 3:i16 max_indexes=-1) throws(1:MetaException o2) + // get metastore server delegation token for use from the map/reduce tasks to authenticate + // to metastore server + string get_delegation_token(1:string renewer_principal_name) throws (1:MetaException o1) + + // get metastore server delegation token for use from the map/reduce tasks to authenticate + // to metastore server - this method takes an extra token signature string which is just + // an identifier to associate with the token - this will be used by the token selector code + // to pick the right token given the associated identifier. + string get_delegation_token_with_signature(1:string renewer_principal_name, 2:string token_signature) throws (1:MetaException o1) + + // method to renew delegation token obtained from metastore server + i64 renew_delegation_token(1:string token_str_form) throws (1:MetaException o1) + + // method to cancel delegation token obtained from metastore server + void cancel_delegation_token(1:string token_str_form) throws (1:MetaException o1) } // * Note about the DDL_TIME: When creating or altering a table or a partition, Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1050190) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -328,6 +328,8 @@ // Print column names in output HIVE_CLI_PRINT_HEADER("hive.cli.print.header", false), + METASTORE_TOKEN_SIGNATURE("metastore.token.signature", null), + HIVE_ERROR_ON_EMPTY_PARTITION("hive.error.on.empty.partition", false); ;