diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java index 3ee30ed..3454009 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java @@ -54,7 +54,7 @@ import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; +import org.apache.hadoop.hive.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/Security.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/Security.java index 9b62195..3ad60e4 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/Security.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/Security.java @@ -29,7 +29,7 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.hive.thrift.DelegationTokenSelector; +import org.apache.hadoop.hive.token.delegation.DelegationTokenSelector; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java index 15ab8b9..1e23462 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java @@ -156,8 +156,8 @@ private String addHMSToken(Job job, String user) throws IOException, Interrupted if(!secureMetastoreAccess) { return null; } - Token hiveToken = - new Token(); + Token hiveToken = + new Token(); String metastoreTokenStrForm = buildHcatDelegationToken(user); hiveToken.decodeFromUrlString(metastoreTokenStrForm); job.getCredentials().addToken(new diff --git a/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/thrift/TestHadoopAuthBridge23.java b/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/thrift/TestHadoopAuthBridge23.java index 6d0776a..faa1bf5 100644 --- a/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/thrift/TestHadoopAuthBridge23.java +++ b/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/thrift/TestHadoopAuthBridge23.java @@ -27,6 +27,11 @@ import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.hive.token.delegation.DelegationTokenStore; +import org.apache.hadoop.hive.token.delegation.HiveDelegationTokenManager; +import org.apache.hadoop.hive.token.delegation.MemoryTokenStore; +import org.apache.hadoop.hive.token.delegation.TokenStoreDelegationTokenSecretManager; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.SaslRpcServer.AuthMethod; @@ -91,15 +96,16 @@ public TTransportFactory createTransportFactory(Map saslProps) } static DelegationTokenStore TOKEN_STORE = new MemoryTokenStore(); - @Override protected DelegationTokenStore getTokenStore(Configuration conf) throws IOException { return TOKEN_STORE; } - @Override public void startDelegationTokenSecretManager(Configuration conf, Object hms, ServerMode sm) throws IOException{ - super.startDelegationTokenSecretManager(conf, hms, sm); + // Start delegation token manager + HiveDelegationTokenManager delegationTokenManager = + new HiveDelegationTokenManager(super.getRemoteAddress()); + delegationTokenManager.startDelegationTokenSecretManager(conf, hms, sm); isMetastoreTokenManagerInited = true; } diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java index c3a17c1..3eedc50 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java @@ -263,12 +263,13 @@ private TTransport createHttpTransport() throws SQLException, TTransportExceptio } private CloseableHttpClient getHttpClient(Boolean useSsl) throws SQLException { - boolean isCookieEnabled = sessConfMap.get(JdbcConnectionParams.COOKIE_AUTH) == null || - (!JdbcConnectionParams.COOKIE_AUTH_FALSE.equalsIgnoreCase( - sessConfMap.get(JdbcConnectionParams.COOKIE_AUTH))); - String cookieName = sessConfMap.get(JdbcConnectionParams.COOKIE_NAME) == null ? - JdbcConnectionParams.DEFAULT_COOKIE_NAMES_HS2 : - sessConfMap.get(JdbcConnectionParams.COOKIE_NAME); + boolean isCookieEnabled = + sessConfMap.get(JdbcConnectionParams.COOKIE_AUTH) == null + || (!JdbcConnectionParams.COOKIE_AUTH_FALSE.equalsIgnoreCase(sessConfMap + .get(JdbcConnectionParams.COOKIE_AUTH))); + String cookieName = + sessConfMap.get(JdbcConnectionParams.COOKIE_NAME) == null ? JdbcConnectionParams.DEFAULT_COOKIE_NAMES_HS2 : sessConfMap + .get(JdbcConnectionParams.COOKIE_NAME); CookieStore cookieStore = isCookieEnabled ? new BasicCookieStore() : null; HttpClientBuilder httpClientBuilder; // Request interceptor for any request pre-processing logic @@ -281,59 +282,65 @@ private CloseableHttpClient getHttpClient(Boolean useSsl) throws SQLException { if (key.startsWith(JdbcConnectionParams.HTTP_HEADER_PREFIX)) { additionalHttpHeaders.put(key.substring(JdbcConnectionParams.HTTP_HEADER_PREFIX.length()), - entry.getValue()); + entry.getValue()); } } // Configure http client for kerberos/password based authentication if (isKerberosAuthMode()) { /** - * Add an interceptor which sets the appropriate header in the request. - * It does the kerberos authentication and get the final service ticket, - * for sending to the server before every request. - * In https mode, the entire information is encrypted + * Add an interceptor which sets the appropriate header in the request. It does the kerberos + * authentication and get the final service ticket, for sending to the server before every + * request. In https mode, the entire information is encrypted */ requestInterceptor = new HttpKerberosRequestInterceptor(sessConfMap.get(JdbcConnectionParams.AUTH_PRINCIPAL), host, getServerHttpUrl(useSsl), assumeSubject, cookieStore, cookieName, useSsl, additionalHttpHeaders); - } - else { - /** - * Add an interceptor to pass username/password in the header. - * In https mode, the entire information is encrypted - */ - requestInterceptor = new HttpBasicAuthInterceptor(getUserName(), getPassword(), - cookieStore, cookieName, useSsl, - additionalHttpHeaders); + } else { + // Check for delegation token, if present add it in the header + String tokenStr = getClientDelegationToken(sessConfMap); + if (tokenStr != null) { + requestInterceptor = + new HttpTokenAuthInterceptor(tokenStr, cookieStore, cookieName, useSsl, + additionalHttpHeaders); + } else { + /** + * Add an interceptor to pass username/password in the header. In https mode, the entire + * information is encrypted + */ + requestInterceptor = + new HttpBasicAuthInterceptor(getUserName(), getPassword(), cookieStore, cookieName, + useSsl, additionalHttpHeaders); + } } // Configure http client for cookie based authentication if (isCookieEnabled) { // Create a http client with a retry mechanism when the server returns a status code of 401. httpClientBuilder = - HttpClients.custom().setServiceUnavailableRetryStrategy( - new ServiceUnavailableRetryStrategy() { - - @Override - public boolean retryRequest( - final HttpResponse response, - final int executionCount, - final HttpContext context) { - int statusCode = response.getStatusLine().getStatusCode(); - boolean ret = statusCode == 401 && executionCount <= 1; - - // Set the context attribute to true which will be interpreted by the request interceptor - if (ret) { - context.setAttribute(Utils.HIVE_SERVER2_RETRY_KEY, Utils.HIVE_SERVER2_RETRY_TRUE); - } - return ret; - } - - @Override - public long getRetryInterval() { - // Immediate retry - return 0; - } - }); + HttpClients.custom().setServiceUnavailableRetryStrategy( + new ServiceUnavailableRetryStrategy() { + + @Override + public boolean retryRequest(final HttpResponse response, final int executionCount, + final HttpContext context) { + int statusCode = response.getStatusLine().getStatusCode(); + boolean ret = statusCode == 401 && executionCount <= 1; + + // Set the context attribute to true which will be interpreted by the request + // interceptor + if (ret) { + context.setAttribute(Utils.HIVE_SERVER2_RETRY_KEY, + Utils.HIVE_SERVER2_RETRY_TRUE); + } + return ret; + } + + @Override + public long getRetryInterval() { + // Immediate retry + return 0; + } + }); } else { httpClientBuilder = HttpClientBuilder.create(); } @@ -343,26 +350,12 @@ public long getRetryInterval() { if (useSsl) { String useTwoWaySSL = sessConfMap.get(JdbcConnectionParams.USE_TWO_WAY_SSL); String sslTrustStorePath = sessConfMap.get(JdbcConnectionParams.SSL_TRUST_STORE); - String sslTrustStorePassword = sessConfMap.get( - JdbcConnectionParams.SSL_TRUST_STORE_PASSWORD); + String sslTrustStorePassword = sessConfMap.get(JdbcConnectionParams.SSL_TRUST_STORE_PASSWORD); KeyStore sslTrustStore; SSLSocketFactory socketFactory; - /** - * The code within the try block throws: - * 1. SSLInitializationException - * 2. KeyStoreException - * 3. IOException - * 4. NoSuchAlgorithmException - * 5. CertificateException - * 6. KeyManagementException - * 7. UnrecoverableKeyException - * We don't want the client to retry on any of these, hence we catch all - * and throw a SQLException. - */ try { - if (useTwoWaySSL != null && - useTwoWaySSL.equalsIgnoreCase(JdbcConnectionParams.TRUE)) { + if (useTwoWaySSL != null && useTwoWaySSL.equalsIgnoreCase(JdbcConnectionParams.TRUE)) { socketFactory = getTwoWaySSLSocketFactory(); } else if (sslTrustStorePath == null || sslTrustStorePath.isEmpty()) { // Create a default socket factory based on standard JSSE trust material @@ -378,15 +371,13 @@ public long getRetryInterval() { socketFactory.setHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER); final Registry registry = - RegistryBuilder.create() - .register("https", socketFactory) - .build(); + RegistryBuilder. create().register("https", socketFactory) + .build(); httpClientBuilder.setConnectionManager(new BasicHttpClientConnectionManager(registry)); - } - catch (Exception e) { - String msg = "Could not create an https connection to " + - jdbcUriString + ". " + e.getMessage(); + } catch (Exception e) { + String msg = + "Could not create an https connection to " + jdbcUriString + ". " + e.getMessage(); throw new SQLException(msg, " 08S01", e); } } diff --git a/jdbc/src/java/org/apache/hive/jdbc/HttpTokenAuthInterceptor.java b/jdbc/src/java/org/apache/hive/jdbc/HttpTokenAuthInterceptor.java new file mode 100644 index 0000000..72937a6 --- /dev/null +++ b/jdbc/src/java/org/apache/hive/jdbc/HttpTokenAuthInterceptor.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import java.util.Map; + +import org.apache.http.HttpRequest; +import org.apache.http.client.CookieStore; +import org.apache.http.protocol.HttpContext; + +/** + * The class is instantiated with the username and password, it is then + * used to add header with these credentials to HTTP requests + * + */ +public class HttpTokenAuthInterceptor extends HttpRequestInterceptorBase { + private String tokenStr; + + public HttpTokenAuthInterceptor(String tokenStr, CookieStore cookieStore, String cn, + boolean isSSL, Map additionalHeaders) { + super(cookieStore, cn, isSSL, additionalHeaders); + this.tokenStr = tokenStr; + } + + @Override + protected void addHttpAuthHeader(HttpRequest httpRequest, HttpContext httpContext) + throws Exception { + String delegationTokenHeader = "X-Hadoop-Delegation-Token"; + httpRequest.addHeader(delegationTokenHeader, tokenStr); + } +} \ No newline at end of file diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index bfebfdc..cd724e0 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -88,6 +88,7 @@ import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge; import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; import org.apache.hadoop.hive.thrift.TUGIContainingTransport; +import org.apache.hadoop.hive.token.delegation.HiveDelegationTokenManager; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; @@ -178,6 +179,7 @@ protected DateFormat initialValue() { public static final String PUBLIC = "public"; private static HadoopThriftAuthBridge.Server saslServer; + private static HiveDelegationTokenManager delegationTokenManager = null; private static boolean useSasl; private static final class ChainedTTransportFactory extends TTransportFactory { @@ -5979,7 +5981,7 @@ public static Iface newRetryingHMSHandler(String name, HiveConf conf, boolean lo */ public static void cancelDelegationToken(String tokenStrForm ) throws IOException { - saslServer.cancelDelegationToken(tokenStrForm); + delegationTokenManager.cancelDelegationToken(tokenStrForm); } /** @@ -5990,7 +5992,7 @@ public static void cancelDelegationToken(String tokenStrForm */ public static String getDelegationToken(String owner, String renewer) throws IOException, InterruptedException { - return saslServer.getDelegationToken(owner, renewer); + return delegationTokenManager.getDelegationToken(owner, renewer); } /** @@ -6008,7 +6010,7 @@ public static boolean isMetaStoreRemote() { */ public static long renewDelegationToken(String tokenStrForm ) throws IOException { - return saslServer.renewDelegationToken(tokenStrForm); + return delegationTokenManager.renewDelegationToken(tokenStrForm); } /** @@ -6223,8 +6225,10 @@ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge, saslServer = bridge.createServer( conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_KEYTAB_FILE), conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL)); - // start delegation token manager - saslServer.startDelegationTokenSecretManager(conf, baseHandler, ServerMode.METASTORE); + // Start delegation token manager + delegationTokenManager = new HiveDelegationTokenManager(saslServer.getRemoteAddress()); + delegationTokenManager.startDelegationTokenSecretManager(conf, baseHandler, + ServerMode.METASTORE); transFactory = saslServer.createTransportFactory( MetaStoreUtils.getMetaStoreSaslProperties(conf)); processor = saslServer.wrapProcessor( diff --git a/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java b/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java index 0c7455d..a77ba55 100644 --- a/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java +++ b/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; @@ -33,7 +34,6 @@ import javax.security.sasl.AuthenticationException; import javax.security.sasl.Sasl; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.HiveMetaStore; @@ -41,12 +41,14 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.shims.HadoopShims.KerberosNameShim; import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.hive.thrift.DBTokenStore; +import org.apache.hadoop.hive.token.delegation.DBTokenStore; +import org.apache.hadoop.hive.token.delegation.HiveDelegationTokenManager; import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge; import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.ProxyUsers; +import org.apache.hive.service.ServiceException; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.thrift.ThriftCLIService; import org.apache.thrift.TProcessorFactory; @@ -93,11 +95,12 @@ public String getAuthName() { private final String transportMode; private final HiveConf conf; private String hadoopAuth; + private HiveDelegationTokenManager delegationTokenManager = null; public static final String HS2_PROXY_USER = "hive.server2.proxy.user"; public static final String HS2_CLIENT_TOKEN = "hiveserver2ClientToken"; - public HiveAuthFactory(HiveConf conf) throws TTransportException { + public HiveAuthFactory(HiveConf conf, InetAddress serverIPAddress) throws TTransportException { this.conf = conf; transportMode = conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE); authTypeStr = conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION); @@ -114,25 +117,29 @@ public HiveAuthFactory(HiveConf conf) throws TTransportException { authTypeStr = AuthTypes.NONE.getAuthName(); } } - if (hadoopAuth.equalsIgnoreCase(AuthTypes.KERBEROS.getAuthName()) - && !authTypeStr.equalsIgnoreCase(AuthTypes.NOSASL.getAuthName())) { - saslServer = ShimLoader.getHadoopThriftAuthBridge().createServer( - conf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB), - conf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL)); - // start delegation token manager - try { - // rawStore is only necessary for DBTokenStore - HMSHandler baseHandler = null; - String tokenStoreClass = conf.getVar(HiveConf.ConfVars.METASTORE_CLUSTER_DELEGATION_TOKEN_STORE_CLS); - - if (tokenStoreClass.equals(DBTokenStore.class.getName())) { - baseHandler = new HiveMetaStore.HMSHandler("new db based metaserver", conf, true); - } - - saslServer.startDelegationTokenSecretManager(conf, baseHandler, ServerMode.HIVESERVER2); - } - catch (MetaException|IOException e) { - throw new TTransportException("Failed to start token manager", e); + if (hadoopAuth.equalsIgnoreCase("kerberos") + && !authTypeStr.equalsIgnoreCase(AuthTypes.NOSASL.getAuthName())) { + saslServer = + ShimLoader.getHadoopThriftAuthBridge().createServer( + conf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB), + conf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL)); + } + // Start delegation token manager + if (hadoopAuth.equalsIgnoreCase("kerberos") + && !authTypeStr.equalsIgnoreCase(AuthTypes.NOSASL.getAuthName())) { + delegationTokenManager = new HiveDelegationTokenManager(serverIPAddress); + try { + // RawStore is only necessary for DBTokenStore + HMSHandler baseHandler = null; + String tokenStoreClass = + conf.getVar(HiveConf.ConfVars.METASTORE_CLUSTER_DELEGATION_TOKEN_STORE_CLS); + if (tokenStoreClass.equals(DBTokenStore.class.getName())) { + baseHandler = new HiveMetaStore.HMSHandler("New db based metastore server", conf, true); + } + delegationTokenManager.startDelegationTokenSecretManager(conf, baseHandler, + ServerMode.HIVESERVER2); + } catch (MetaException | IOException e) { + throw new ServiceException("Failed to start token manager", e); } } } @@ -159,7 +166,7 @@ public TTransportFactory getAuthTransFactory() throws LoginException { } if (authTypeStr.equalsIgnoreCase(AuthTypes.KERBEROS.getAuthName())) { // no-op - } else if (authTypeStr.equalsIgnoreCase(AuthTypes.NONE.getAuthName()) || + } else if (authTypeStr.equalsIgnoreCase(AuthTypes.NONE.getAuthName()) || authTypeStr.equalsIgnoreCase(AuthTypes.LDAP.getAuthName()) || authTypeStr.equalsIgnoreCase(AuthTypes.PAM.getAuthName()) || authTypeStr.equalsIgnoreCase(AuthTypes.CUSTOM.getAuthName())) { @@ -168,7 +175,7 @@ public TTransportFactory getAuthTransFactory() throws LoginException { authTypeStr, null, new HashMap(), new PlainSaslHelper.PlainServerCallbackHandler(authTypeStr)); } catch (AuthenticationException e) { - throw new LoginException ("Error setting callback handler" + e); + throw new LoginException ("Error setting callback handler" + e); } } else { throw new LoginException("Unsupported authentication type " + authTypeStr); @@ -304,13 +311,13 @@ public static TServerSocket getServerSSLSocket(String hiveHost, int portNum, Str // retrieve delegation token for the given user public String getDelegationToken(String owner, String renewer) throws HiveSQLException { - if (saslServer == null) { + if (delegationTokenManager == null) { throw new HiveSQLException( "Delegation token only supported over kerberos authentication", "08S01"); } try { - String tokenStr = saslServer.getDelegationTokenWithService(owner, renewer, HS2_CLIENT_TOKEN); + String tokenStr = delegationTokenManager.getDelegationTokenWithService(owner, renewer, HS2_CLIENT_TOKEN); if (tokenStr == null || tokenStr.isEmpty()) { throw new HiveSQLException( "Received empty retrieving delegation token for user " + owner, "08S01"); @@ -326,12 +333,12 @@ public String getDelegationToken(String owner, String renewer) throws HiveSQLExc // cancel given delegation token public void cancelDelegationToken(String delegationToken) throws HiveSQLException { - if (saslServer == null) { + if (delegationTokenManager == null) { throw new HiveSQLException( "Delegation token only supported over kerberos authentication", "08S01"); } try { - saslServer.cancelDelegationToken(delegationToken); + delegationTokenManager.cancelDelegationToken(delegationToken); } catch (IOException e) { throw new HiveSQLException( "Error canceling delegation token " + delegationToken, "08S01", e); @@ -339,12 +346,12 @@ public void cancelDelegationToken(String delegationToken) throws HiveSQLExceptio } public void renewDelegationToken(String delegationToken) throws HiveSQLException { - if (saslServer == null) { + if (delegationTokenManager == null) { throw new HiveSQLException( "Delegation token only supported over kerberos authentication", "08S01"); } try { - saslServer.renewDelegationToken(delegationToken); + delegationTokenManager.renewDelegationToken(delegationToken); } catch (IOException e) { throw new HiveSQLException( "Error renewing delegation token " + delegationToken, "08S01", e); @@ -352,12 +359,12 @@ public void renewDelegationToken(String delegationToken) throws HiveSQLException } public String getUserFromToken(String delegationToken) throws HiveSQLException { - if (saslServer == null) { + if (delegationTokenManager == null) { throw new HiveSQLException( "Delegation token only supported over kerberos authentication", "08S01"); } try { - return saslServer.getUserFromToken(delegationToken); + return delegationTokenManager.getUserFromToken(delegationToken); } catch (IOException e) { throw new HiveSQLException( "Error extracting user from delegation token " + delegationToken, "08S01", e); diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java index cf575a4..5eb6c00 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java @@ -56,7 +56,7 @@ public void run() { oomHook); // Thrift configs - hiveAuthFactory = new HiveAuthFactory(hiveConf); + hiveAuthFactory = new HiveAuthFactory(hiveConf, serverIPAddress); TTransportFactory transportFactory = hiveAuthFactory.getAuthTransFactory(); TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this); TServerSocket serverSocket = null; diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 8dff264..00c6e68 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -38,7 +38,6 @@ import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hive.service.AbstractService; import org.apache.hive.service.ServiceException; -import org.apache.hive.service.ServiceUtils; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.auth.TSetIpAddressProcessor; import org.apache.hive.service.cli.CLIService; @@ -124,18 +123,14 @@ protected String hiveHost; protected TServer server; protected org.eclipse.jetty.server.Server httpServer; - - private boolean isStarted = false; protected boolean isEmbedded = false; - protected HiveConf hiveConf; - protected int minWorkerThreads; protected int maxWorkerThreads; protected long workerKeepAliveTime; - protected TServerEventHandler serverEventHandler; protected ThreadLocal currentServerContext; + private boolean isStarted = false; static class ThriftCLIServerContext implements ServerContext { private SessionHandle sessionHandle = null; @@ -483,7 +478,7 @@ private String getDelegationToken(String userName) return cliService.getDelegationTokenFromMetaStore(userName); } catch (UnsupportedOperationException e) { // The delegation token is not applicable in the given deployment mode - // such as HMS is not kerberos secured + // such as HMS is not kerberos secured } return null; } diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java index cafe21f..a398768 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java @@ -113,7 +113,7 @@ public void run() { httpServer.addConnector(connector); // Thrift configs - hiveAuthFactory = new HiveAuthFactory(hiveConf); + hiveAuthFactory = new HiveAuthFactory(hiveConf, serverIPAddress); TProcessor processor = new TCLIService.Processor(this); TProtocolFactory protocolFactory = new TBinaryProtocol.Factory(); // Set during the init phase of HiveServer2 if auth mode is kerberos diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java index df16544..62e0f87 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java @@ -39,6 +39,10 @@ import org.apache.hadoop.hive.shims.HadoopShims.KerberosNameShim; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.server.AuthenticationToken; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticationHandler; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator; +import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticationHandler; import org.apache.hive.service.CookieSigner; import org.apache.hive.service.auth.AuthenticationProviderFactory; import org.apache.hive.service.auth.AuthenticationProviderFactory.AuthMethods; @@ -132,7 +136,13 @@ protected void doPost(HttpServletRequest request, HttpServletResponse response) if (clientUserName == null) { // For a kerberos setup if (isKerberosAuthMode(authType)) { - clientUserName = doKerberosAuth(request); + String delegationToken = request.getHeader(DelegationTokenAuthenticator.DELEGATION_TOKEN_HEADER); + // Each http request must have an Authorization header + if ((delegationToken != null) && (!delegationToken.isEmpty())) { + clientUserName = doTokenAuth(request, response); + } else { + clientUserName = doKerberosAuth(request); + } } // For password based authentication else { @@ -331,6 +341,19 @@ private String doPasswdAuth(HttpServletRequest request, String authType) return userName; } + private String doTokenAuth(HttpServletRequest request, HttpServletResponse response) + throws HttpAuthenticationException { + DelegationTokenAuthenticationHandler tokenAuthHandler = + new KerberosDelegationTokenAuthenticationHandler(); + try { + AuthenticationToken authenticatedToken = tokenAuthHandler.authenticate(request, response); + String userName = authenticatedToken.getUserName(); + return userName; + } catch (Exception e) { + throw new HttpAuthenticationException(e); + } + } + /** * Do the GSS-API kerberos authentication. * We already have a logged in subject in the form of serviceUGI, diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/Utils.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/Utils.java index 4bcb8c3..f4de22c 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/Utils.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/Utils.java @@ -26,8 +26,8 @@ import javax.security.auth.login.LoginException; import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag; -import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; -import org.apache.hadoop.hive.thrift.DelegationTokenSelector; +import org.apache.hadoop.hive.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.hive.token.delegation.DelegationTokenSelector; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DBTokenStore.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DBTokenStore.java deleted file mode 100644 index de39d3d..0000000 --- a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DBTokenStore.java +++ /dev/null @@ -1,176 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.thrift; - -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.codec.binary.Base64; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; -import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; -import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class DBTokenStore implements DelegationTokenStore { - private static final Logger LOG = LoggerFactory.getLogger(DBTokenStore.class); - - @Override - public int addMasterKey(String s) throws TokenStoreException { - if (LOG.isTraceEnabled()) { - LOG.trace("addMasterKey: s = " + s); - } - return (Integer)invokeOnRawStore("addMasterKey", new Object[]{s},String.class); - } - - @Override - public void updateMasterKey(int keySeq, String s) throws TokenStoreException { - if (LOG.isTraceEnabled()) { - LOG.trace("updateMasterKey: s = " + s + ", keySeq = " + keySeq); - } - invokeOnRawStore("updateMasterKey", new Object[] {Integer.valueOf(keySeq), s}, - Integer.class, String.class); - } - - @Override - public boolean removeMasterKey(int keySeq) { - return (Boolean)invokeOnRawStore("removeMasterKey", new Object[] {Integer.valueOf(keySeq)}, - Integer.class); - } - - @Override - public String[] getMasterKeys() throws TokenStoreException { - return (String[])invokeOnRawStore("getMasterKeys", new Object[0]); - } - - @Override - public boolean addToken(DelegationTokenIdentifier tokenIdentifier, - DelegationTokenInformation token) throws TokenStoreException { - - try { - String identifier = TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier); - String tokenStr = Base64.encodeBase64URLSafeString( - HiveDelegationTokenSupport.encodeDelegationTokenInformation(token)); - boolean result = (Boolean)invokeOnRawStore("addToken", new Object[] {identifier, tokenStr}, - String.class, String.class); - if (LOG.isTraceEnabled()) { - LOG.trace("addToken: tokenIdentifier = " + tokenIdentifier + ", added = " + result); - } - return result; - } catch (IOException e) { - throw new TokenStoreException(e); - } - } - - @Override - public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) - throws TokenStoreException { - try { - String tokenStr = (String)invokeOnRawStore("getToken", new Object[] { - TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier)}, String.class); - DelegationTokenInformation result = null; - if (tokenStr != null) { - result = HiveDelegationTokenSupport.decodeDelegationTokenInformation(Base64.decodeBase64(tokenStr)); - } - if (LOG.isTraceEnabled()) { - LOG.trace("getToken: tokenIdentifier = " + tokenIdentifier + ", result = " + result); - } - return result; - } catch (IOException e) { - throw new TokenStoreException(e); - } - } - - @Override - public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) throws TokenStoreException{ - try { - boolean result = (Boolean)invokeOnRawStore("removeToken", new Object[] { - TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier)}, String.class); - if (LOG.isTraceEnabled()) { - LOG.trace("removeToken: tokenIdentifier = " + tokenIdentifier + ", removed = " + result); - } - return result; - } catch (IOException e) { - throw new TokenStoreException(e); - } - } - - @Override - public List getAllDelegationTokenIdentifiers() throws TokenStoreException{ - - List tokenIdents = (List)invokeOnRawStore("getAllTokenIdentifiers", new Object[0]); - List delTokenIdents = new ArrayList(tokenIdents.size()); - - for (String tokenIdent : tokenIdents) { - DelegationTokenIdentifier delToken = new DelegationTokenIdentifier(); - try { - TokenStoreDelegationTokenSecretManager.decodeWritable(delToken, tokenIdent); - } catch (IOException e) { - throw new TokenStoreException(e); - } - delTokenIdents.add(delToken); - } - return delTokenIdents; - } - - private Object hmsHandler; - - @Override - public void init(Object hms, ServerMode smode) throws TokenStoreException { - this.hmsHandler = hms; - } - - private Object invokeOnRawStore(String methName, Object[] params, Class ... paramTypes) - throws TokenStoreException{ - - try { - Object rawStore = hmsHandler.getClass().getMethod("getMS").invoke(hmsHandler); - return rawStore.getClass().getMethod(methName, paramTypes).invoke(rawStore, params); - } catch (IllegalArgumentException e) { - throw new TokenStoreException(e); - } catch (SecurityException e) { - throw new TokenStoreException(e); - } catch (IllegalAccessException e) { - throw new TokenStoreException(e); - } catch (InvocationTargetException e) { - throw new TokenStoreException(e.getCause()); - } catch (NoSuchMethodException e) { - throw new TokenStoreException(e); - } - } - - @Override - public void setConf(Configuration conf) { - // No-op - } - - @Override - public Configuration getConf() { - return null; - } - - @Override - public void close() throws IOException { - // No-op. - } - -} diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenIdentifier.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenIdentifier.java deleted file mode 100644 index 4ca3c0b..0000000 --- a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenIdentifier.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.thrift; - -import org.apache.hadoop.io.Text; -import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; - -/** - * A delegation token identifier that is specific to Hive. - */ -public class DelegationTokenIdentifier - extends AbstractDelegationTokenIdentifier { - public static final Text HIVE_DELEGATION_KIND = new Text("HIVE_DELEGATION_TOKEN"); - - /** - * Create an empty delegation token identifier for reading into. - */ - public DelegationTokenIdentifier() { - } - - /** - * Create a new delegation token identifier - * @param owner the effective username of the token owner - * @param renewer the username of the renewer - * @param realUser the real username of the token owner - */ - public DelegationTokenIdentifier(Text owner, Text renewer, Text realUser) { - super(owner, renewer, realUser); - } - - @Override - public Text getKind() { - return HIVE_DELEGATION_KIND; - } - -} diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java deleted file mode 100644 index 19d1fbf..0000000 --- a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.thrift; - -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.IOException; - -import org.apache.hadoop.io.Text; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; - -/** - * A Hive specific delegation token secret manager. - * The secret manager is responsible for generating and accepting the password - * for each token. - */ -public class DelegationTokenSecretManager - extends AbstractDelegationTokenSecretManager { - - /** - * Create a secret manager - * @param delegationKeyUpdateInterval the number of seconds for rolling new - * secret keys. - * @param delegationTokenMaxLifetime the maximum lifetime of the delegation - * tokens - * @param delegationTokenRenewInterval how often the tokens must be renewed - * @param delegationTokenRemoverScanInterval how often the tokens are scanned - * for expired tokens - */ - public DelegationTokenSecretManager(long delegationKeyUpdateInterval, - long delegationTokenMaxLifetime, - long delegationTokenRenewInterval, - long delegationTokenRemoverScanInterval) { - super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, - delegationTokenRenewInterval, delegationTokenRemoverScanInterval); - } - - @Override - public DelegationTokenIdentifier createIdentifier() { - return new DelegationTokenIdentifier(); - } - - public synchronized void cancelDelegationToken(String tokenStrForm) throws IOException { - Token t= new Token(); - t.decodeFromUrlString(tokenStrForm); - String user = UserGroupInformation.getCurrentUser().getUserName(); - cancelToken(t, user); - } - - public synchronized long renewDelegationToken(String tokenStrForm) throws IOException { - Token t= new Token(); - t.decodeFromUrlString(tokenStrForm); - String user = UserGroupInformation.getCurrentUser().getUserName(); - return renewToken(t, user); - } - - public synchronized String getDelegationToken(String renewer) throws IOException { - UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - Text owner = new Text(ugi.getUserName()); - Text realUser = null; - if (ugi.getRealUser() != null) { - realUser = new Text(ugi.getRealUser().getUserName()); - } - DelegationTokenIdentifier ident = - new DelegationTokenIdentifier(owner, new Text(renewer), realUser); - Token t = new Token( - ident, this); - return t.encodeToUrlString(); - } - - public String getUserFromToken(String tokenStr) throws IOException { - Token delegationToken = new Token(); - delegationToken.decodeFromUrlString(tokenStr); - - ByteArrayInputStream buf = new ByteArrayInputStream(delegationToken.getIdentifier()); - DataInputStream in = new DataInputStream(buf); - DelegationTokenIdentifier id = createIdentifier(); - id.readFields(in); - return id.getUser().getShortUserName(); - } -} - diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector.java deleted file mode 100644 index f6e2420..0000000 --- a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.thrift; - -import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector; - -/** - * A delegation token that is specialized for Hive - */ - -public class DelegationTokenSelector - extends AbstractDelegationTokenSelector{ - - public DelegationTokenSelector() { - super(DelegationTokenIdentifier.HIVE_DELEGATION_KIND); - } -} diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java deleted file mode 100644 index 867b4ed..0000000 --- a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java +++ /dev/null @@ -1,118 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.thrift; - -import java.io.Closeable; -import java.util.List; - -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; -import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; - -/** - * Interface for pluggable token store that can be implemented with shared external - * storage for load balancing and high availability (for example using ZooKeeper). - * Internal, store specific errors are translated into {@link TokenStoreException}. - */ -public interface DelegationTokenStore extends Configurable, Closeable { - - /** - * Exception for internal token store errors that typically cannot be handled by the caller. - */ - public static class TokenStoreException extends RuntimeException { - private static final long serialVersionUID = -8693819817623074083L; - - public TokenStoreException(Throwable cause) { - super(cause); - } - - public TokenStoreException(String message, Throwable cause) { - super(message, cause); - } - } - - /** - * Add new master key. The token store assigns and returns the sequence number. - * Caller needs to use the identifier to update the key (since it is embedded in the key). - * - * @param s - * @return sequence number for new key - */ - int addMasterKey(String s) throws TokenStoreException; - - /** - * Update master key (for expiration and setting store assigned sequence within key) - * @param keySeq - * @param s - * @throws TokenStoreException - */ - void updateMasterKey(int keySeq, String s) throws TokenStoreException; - - /** - * Remove key for given id. - * @param keySeq - * @return false if key no longer present, true otherwise. - */ - boolean removeMasterKey(int keySeq); - - /** - * Return all master keys. - * @return - * @throws TokenStoreException - */ - String[] getMasterKeys() throws TokenStoreException; - - /** - * Add token. If identifier is already present, token won't be added. - * @param tokenIdentifier - * @param token - * @return true if token was added, false for existing identifier - */ - boolean addToken(DelegationTokenIdentifier tokenIdentifier, - DelegationTokenInformation token) throws TokenStoreException; - - /** - * Get token. Returns null if the token does not exist. - * @param tokenIdentifier - * @return - */ - DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) - throws TokenStoreException; - - /** - * Remove token. Return value can be used by caller to detect concurrency. - * @param tokenIdentifier - * @return true if token was removed, false if it was already removed. - * @throws TokenStoreException - */ - boolean removeToken(DelegationTokenIdentifier tokenIdentifier) throws TokenStoreException; - - /** - * List of all token identifiers in the store. This is used to remove expired tokens - * and a potential scalability improvement would be to partition by master key id - * @return - */ - List getAllDelegationTokenIdentifiers() throws TokenStoreException; - - /** - * @param hmsHandler ObjectStore used by DBTokenStore - * @param smode Indicate whether this is a metastore or hiveserver2 token store - */ - void init(Object hmsHandler, ServerMode smode); - -} diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java index d3b2ff5..04b7a6d 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java @@ -39,25 +39,21 @@ import javax.security.sasl.SaslServer; import org.apache.commons.codec.binary.Base64; -import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport; +import org.apache.hadoop.hive.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.hive.token.delegation.DelegationTokenSecretManager; import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.SaslRpcServer.AuthMethod; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; -import org.apache.hadoop.security.authorize.AuthorizationException; -import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.thrift.TException; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TProtocol; @@ -377,7 +373,7 @@ public TTransportFactory createTransportFactory(Map saslProps) /** * Create a TSaslServerTransport.Factory that, upon connection of a client - * socket, negotiates a Kerberized SASL transport. + * socket, negotiates a Kerberized SASL transport. * * @param saslProps Map of SASL properties */ @@ -430,109 +426,6 @@ public TProcessor wrapNonAssumingProcessor(TProcessor processor) { return new TUGIAssumingProcessor(processor, secretManager, false); } - protected DelegationTokenStore getTokenStore(Configuration conf) - throws IOException { - String tokenStoreClassName = conf.get(DELEGATION_TOKEN_STORE_CLS, ""); - if (StringUtils.isBlank(tokenStoreClassName)) { - return new MemoryTokenStore(); - } - try { - Class storeClass = Class - .forName(tokenStoreClassName).asSubclass( - DelegationTokenStore.class); - return ReflectionUtils.newInstance(storeClass, conf); - } catch (ClassNotFoundException e) { - throw new IOException("Error initializing delegation token store: " + tokenStoreClassName, - e); - } - } - - - public void startDelegationTokenSecretManager(Configuration conf, Object hms, ServerMode smode) - throws IOException { - long secretKeyInterval = - conf.getLong(DELEGATION_KEY_UPDATE_INTERVAL_KEY, - DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT); - long tokenMaxLifetime = - conf.getLong(DELEGATION_TOKEN_MAX_LIFETIME_KEY, - DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT); - long tokenRenewInterval = - conf.getLong(DELEGATION_TOKEN_RENEW_INTERVAL_KEY, - DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT); - long tokenGcInterval = conf.getLong(DELEGATION_TOKEN_GC_INTERVAL, - DELEGATION_TOKEN_GC_INTERVAL_DEFAULT); - - DelegationTokenStore dts = getTokenStore(conf); - dts.init(hms, smode); - secretManager = new TokenStoreDelegationTokenSecretManager(secretKeyInterval, - tokenMaxLifetime, - tokenRenewInterval, - tokenGcInterval, dts); - secretManager.startThreads(); - } - - - public String getDelegationToken(final String owner, final String renewer) - throws IOException, InterruptedException { - if (!authenticationMethod.get().equals(AuthenticationMethod.KERBEROS)) { - throw new AuthorizationException( - "Delegation Token can be issued only with kerberos authentication. " + - "Current AuthenticationMethod: " + authenticationMethod.get() - ); - } - //if the user asking the token is same as the 'owner' then don't do - //any proxy authorization checks. For cases like oozie, where it gets - //a delegation token for another user, we need to make sure oozie is - //authorized to get a delegation token. - //Do all checks on short names - UserGroupInformation currUser = UserGroupInformation.getCurrentUser(); - UserGroupInformation ownerUgi = UserGroupInformation.createRemoteUser(owner); - if (!ownerUgi.getShortUserName().equals(currUser.getShortUserName())) { - //in the case of proxy users, the getCurrentUser will return the - //real user (for e.g. oozie) due to the doAs that happened just before the - //server started executing the method getDelegationToken in the MetaStore - ownerUgi = UserGroupInformation.createProxyUser(owner, - UserGroupInformation.getCurrentUser()); - InetAddress remoteAddr = getRemoteAddress(); - ProxyUsers.authorize(ownerUgi,remoteAddr.getHostAddress(), null); - } - return ownerUgi.doAs(new PrivilegedExceptionAction() { - - @Override - public String run() throws IOException { - return secretManager.getDelegationToken(renewer); - } - }); - } - - - public String getDelegationTokenWithService(String owner, String renewer, String service) - throws IOException, InterruptedException { - String token = getDelegationToken(owner, renewer); - return Utils.addServiceToToken(token, service); - } - - - public long renewDelegationToken(String tokenStrForm) throws IOException { - if (!authenticationMethod.get().equals(AuthenticationMethod.KERBEROS)) { - throw new AuthorizationException( - "Delegation Token can be issued only with kerberos authentication. " + - "Current AuthenticationMethod: " + authenticationMethod.get() - ); - } - return secretManager.renewDelegationToken(tokenStrForm); - } - - - public String getUserFromToken(String tokenStr) throws IOException { - return secretManager.getUserFromToken(tokenStr); - } - - - public void cancelDelegationToken(String tokenStrForm) throws IOException { - secretManager.cancelDelegationToken(tokenStrForm); - } - final static ThreadLocal remoteAddress = new ThreadLocal() { @@ -542,7 +435,6 @@ protected InetAddress initialValue() { } }; - public InetAddress getRemoteAddress() { return remoteAddress.get(); } diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java deleted file mode 100644 index 9d837b8..0000000 --- a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java +++ /dev/null @@ -1,137 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.thrift; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; -import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Default in-memory token store implementation. - */ -public class MemoryTokenStore implements DelegationTokenStore { - private static final Logger LOG = LoggerFactory.getLogger(MemoryTokenStore.class); - - private final Map masterKeys - = new ConcurrentHashMap(); - - private final ConcurrentHashMap tokens - = new ConcurrentHashMap(); - - private final AtomicInteger masterKeySeq = new AtomicInteger(); - private Configuration conf; - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - } - - @Override - public Configuration getConf() { - return this.conf; - } - - @Override - public int addMasterKey(String s) { - int keySeq = masterKeySeq.getAndIncrement(); - if (LOG.isTraceEnabled()) { - LOG.trace("addMasterKey: s = " + s + ", keySeq = " + keySeq); - } - masterKeys.put(keySeq, s); - return keySeq; - } - - @Override - public void updateMasterKey(int keySeq, String s) { - if (LOG.isTraceEnabled()) { - LOG.trace("updateMasterKey: s = " + s + ", keySeq = " + keySeq); - } - masterKeys.put(keySeq, s); - } - - @Override - public boolean removeMasterKey(int keySeq) { - if (LOG.isTraceEnabled()) { - LOG.trace("removeMasterKey: keySeq = " + keySeq); - } - return masterKeys.remove(keySeq) != null; - } - - @Override - public String[] getMasterKeys() { - return masterKeys.values().toArray(new String[0]); - } - - @Override - public boolean addToken(DelegationTokenIdentifier tokenIdentifier, - DelegationTokenInformation token) { - DelegationTokenInformation tokenInfo = tokens.putIfAbsent(tokenIdentifier, token); - if (LOG.isTraceEnabled()) { - LOG.trace("addToken: tokenIdentifier = " + tokenIdentifier + ", added = " + (tokenInfo == null)); - } - return (tokenInfo == null); - } - - @Override - public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) { - DelegationTokenInformation tokenInfo = tokens.remove(tokenIdentifier); - if (LOG.isTraceEnabled()) { - LOG.trace("removeToken: tokenIdentifier = " + tokenIdentifier + ", removed = " + (tokenInfo != null)); - } - return tokenInfo != null; - } - - @Override - public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) { - DelegationTokenInformation result = tokens.get(tokenIdentifier); - if (LOG.isTraceEnabled()) { - LOG.trace("getToken: tokenIdentifier = " + tokenIdentifier + ", result = " + result); - } - return result; - } - - @Override - public List getAllDelegationTokenIdentifiers() { - List result = new ArrayList( - tokens.size()); - for (DelegationTokenIdentifier id : tokens.keySet()) { - result.add(id); - } - return result; - } - - @Override - public void close() throws IOException { - //no-op - } - - @Override - public void init(Object hmsHandler, ServerMode smode) throws TokenStoreException { - // no-op - } -} diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java deleted file mode 100644 index 87b418e..0000000 --- a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java +++ /dev/null @@ -1,337 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.thrift; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.lang.reflect.Method; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -import org.apache.commons.codec.binary.Base64; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; -import org.apache.hadoop.security.token.delegation.DelegationKey; -import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport; -import org.apache.hadoop.util.Daemon; -import org.apache.hadoop.util.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Extension of {@link DelegationTokenSecretManager} to support alternative to default in-memory - * token management for fail-over and clustering through plug-able token store (ZooKeeper etc.). - * Delegation tokens will be retrieved from the store on-demand and (unlike base class behavior) not - * cached in memory. This avoids complexities related to token expiration. The security token is - * needed only at the time the transport is opened (as opposed to per interface operation). The - * assumption therefore is low cost of interprocess token retrieval (for random read efficient store - * such as ZooKeeper) compared to overhead of synchronizing per-process in-memory token caches. - * The wrapper incorporates the token store abstraction within the limitations of current - * Hive/Hadoop dependency (.20S) with minimum code duplication. - * Eventually this should be supported by Hadoop security directly. - */ -public class TokenStoreDelegationTokenSecretManager extends DelegationTokenSecretManager { - - private static final Logger LOGGER = - LoggerFactory.getLogger(TokenStoreDelegationTokenSecretManager.class.getName()); - - final private long keyUpdateInterval; - final private long tokenRemoverScanInterval; - private Thread tokenRemoverThread; - - final private DelegationTokenStore tokenStore; - - public TokenStoreDelegationTokenSecretManager(long delegationKeyUpdateInterval, - long delegationTokenMaxLifetime, long delegationTokenRenewInterval, - long delegationTokenRemoverScanInterval, - DelegationTokenStore sharedStore) { - super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, delegationTokenRenewInterval, - delegationTokenRemoverScanInterval); - this.keyUpdateInterval = delegationKeyUpdateInterval; - this.tokenRemoverScanInterval = delegationTokenRemoverScanInterval; - - this.tokenStore = sharedStore; - } - - protected DelegationTokenIdentifier getTokenIdentifier(Token token) - throws IOException { - // turn bytes back into identifier for cache lookup - ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier()); - DataInputStream in = new DataInputStream(buf); - DelegationTokenIdentifier id = createIdentifier(); - id.readFields(in); - return id; - } - - protected Map reloadKeys() { - // read keys from token store - String[] allKeys = tokenStore.getMasterKeys(); - Map keys - = new HashMap(allKeys.length); - for (String keyStr : allKeys) { - DelegationKey key = new DelegationKey(); - try { - decodeWritable(key, keyStr); - keys.put(key.getKeyId(), key); - } catch (IOException ex) { - LOGGER.error("Failed to load master key.", ex); - } - } - synchronized (this) { - super.allKeys.clear(); - super.allKeys.putAll(keys); - } - return keys; - } - - @Override - public byte[] retrievePassword(DelegationTokenIdentifier identifier) throws InvalidToken { - DelegationTokenInformation info = this.tokenStore.getToken(identifier); - if (info == null) { - throw new InvalidToken("token expired or does not exist: " + identifier); - } - // must reuse super as info.getPassword is not accessible - synchronized (this) { - try { - super.currentTokens.put(identifier, info); - return super.retrievePassword(identifier); - } finally { - super.currentTokens.remove(identifier); - } - } - } - - @Override - public DelegationTokenIdentifier cancelToken(Token token, - String canceller) throws IOException { - DelegationTokenIdentifier id = getTokenIdentifier(token); - LOGGER.info("Token cancelation requested for identifier: "+id); - this.tokenStore.removeToken(id); - return id; - } - - /** - * Create the password and add it to shared store. - */ - @Override - protected byte[] createPassword(DelegationTokenIdentifier id) { - byte[] password; - DelegationTokenInformation info; - synchronized (this) { - password = super.createPassword(id); - // add new token to shared store - // need to persist expiration along with password - info = super.currentTokens.remove(id); - if (info == null) { - throw new IllegalStateException("Failed to retrieve token after creation"); - } - } - this.tokenStore.addToken(id, info); - return password; - } - - @Override - public long renewToken(Token token, - String renewer) throws InvalidToken, IOException { - // since renewal is KERBEROS authenticated token may not be cached - final DelegationTokenIdentifier id = getTokenIdentifier(token); - DelegationTokenInformation tokenInfo = this.tokenStore.getToken(id); - if (tokenInfo == null) { - throw new InvalidToken("token does not exist: " + id); // no token found - } - // ensure associated master key is available - if (!super.allKeys.containsKey(id.getMasterKeyId())) { - LOGGER.info("Unknown master key (id={}), (re)loading keys from token store.", - id.getMasterKeyId()); - reloadKeys(); - } - // reuse super renewal logic - synchronized (this) { - super.currentTokens.put(id, tokenInfo); - try { - return super.renewToken(token, renewer); - } finally { - super.currentTokens.remove(id); - } - } - } - - public static String encodeWritable(Writable key) throws IOException { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(bos); - key.write(dos); - dos.flush(); - return Base64.encodeBase64URLSafeString(bos.toByteArray()); - } - - public static void decodeWritable(Writable w, String idStr) throws IOException { - DataInputStream in = new DataInputStream(new ByteArrayInputStream(Base64.decodeBase64(idStr))); - w.readFields(in); - } - - /** - * Synchronize master key updates / sequence generation for multiple nodes. - * NOTE: {@Link AbstractDelegationTokenSecretManager} keeps currentKey private, so we need - * to utilize this "hook" to manipulate the key through the object reference. - * This .20S workaround should cease to exist when Hadoop supports token store. - */ - @Override - protected void logUpdateMasterKey(DelegationKey key) throws IOException { - int keySeq = this.tokenStore.addMasterKey(encodeWritable(key)); - // update key with assigned identifier - DelegationKey keyWithSeq = new DelegationKey(keySeq, key.getExpiryDate(), key.getKey()); - String keyStr = encodeWritable(keyWithSeq); - this.tokenStore.updateMasterKey(keySeq, keyStr); - decodeWritable(key, keyStr); - LOGGER.info("New master key with key id={}", key.getKeyId()); - super.logUpdateMasterKey(key); - } - - @Override - public synchronized void startThreads() throws IOException { - try { - // updateCurrentKey needs to be called to initialize the master key - // (there should be a null check added in the future in rollMasterKey) - // updateCurrentKey(); - Method m = AbstractDelegationTokenSecretManager.class.getDeclaredMethod("updateCurrentKey"); - m.setAccessible(true); - m.invoke(this); - } catch (Exception e) { - throw new IOException("Failed to initialize master key", e); - } - running = true; - tokenRemoverThread = new Daemon(new ExpiredTokenRemover()); - tokenRemoverThread.start(); - } - - @Override - public synchronized void stopThreads() { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Stopping expired delegation token remover thread"); - } - running = false; - if (tokenRemoverThread != null) { - tokenRemoverThread.interrupt(); - } - } - - /** - * Remove expired tokens. Replaces logic in {@link AbstractDelegationTokenSecretManager} - * that cannot be reused due to private method access. Logic here can more efficiently - * deal with external token store by only loading into memory the minimum data needed. - */ - protected void removeExpiredTokens() { - long now = System.currentTimeMillis(); - Iterator i = tokenStore.getAllDelegationTokenIdentifiers() - .iterator(); - while (i.hasNext()) { - DelegationTokenIdentifier id = i.next(); - if (now > id.getMaxDate()) { - this.tokenStore.removeToken(id); // no need to look at token info - } else { - // get token info to check renew date - DelegationTokenInformation tokenInfo = tokenStore.getToken(id); - if (tokenInfo != null) { - if (now > tokenInfo.getRenewDate()) { - this.tokenStore.removeToken(id); - } - } - } - } - } - - /** - * Extension of rollMasterKey to remove expired keys from store. - * - * @throws IOException - */ - protected void rollMasterKeyExt() throws IOException { - Map keys = reloadKeys(); - int currentKeyId = super.currentId; - HiveDelegationTokenSupport.rollMasterKey(TokenStoreDelegationTokenSecretManager.this); - List keysAfterRoll = Arrays.asList(getAllKeys()); - for (DelegationKey key : keysAfterRoll) { - keys.remove(key.getKeyId()); - if (key.getKeyId() == currentKeyId) { - tokenStore.updateMasterKey(currentKeyId, encodeWritable(key)); - } - } - for (DelegationKey expiredKey : keys.values()) { - LOGGER.info("Removing expired key id={}", expiredKey.getKeyId()); - try { - tokenStore.removeMasterKey(expiredKey.getKeyId()); - } catch (Exception e) { - LOGGER.error("Error removing expired key id={}", expiredKey.getKeyId(), e); - } - } - } - - /** - * Cloned from {@link AbstractDelegationTokenSecretManager} to deal with private access - * restriction (there would not be an need to clone the remove thread if the remove logic was - * protected/extensible). - */ - protected class ExpiredTokenRemover extends Thread { - private long lastMasterKeyUpdate; - private long lastTokenCacheCleanup; - - @Override - public void run() { - LOGGER.info("Starting expired delegation token remover thread, " - + "tokenRemoverScanInterval=" + tokenRemoverScanInterval - / (60 * 1000) + " min(s)"); - try { - while (running) { - long now = System.currentTimeMillis(); - if (lastMasterKeyUpdate + keyUpdateInterval < now) { - try { - rollMasterKeyExt(); - lastMasterKeyUpdate = now; - } catch (IOException e) { - LOGGER.error("Master key updating failed. " - + StringUtils.stringifyException(e)); - } - } - if (lastTokenCacheCleanup + tokenRemoverScanInterval < now) { - removeExpiredTokens(); - lastTokenCacheCleanup = now; - } - try { - Thread.sleep(5000); // 5 seconds - } catch (InterruptedException ie) { - LOGGER - .error("InterruptedExcpetion recieved for ExpiredTokenRemover thread " - + ie); - } - } - } catch (Throwable t) { - LOGGER.error("ExpiredTokenRemover thread received unexpected exception. " - + t, t); - } - } - } - -} diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java deleted file mode 100644 index 528e55d..0000000 --- a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java +++ /dev/null @@ -1,476 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.thrift; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.commons.lang.StringUtils; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.framework.api.ACLProvider; -import org.apache.curator.framework.imps.CuratorFrameworkState; -import org.apache.curator.retry.ExponentialBackoffRetry; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.hive.shims.Utils; -import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; -import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.ZooDefs.Perms; -import org.apache.zookeeper.data.ACL; -import org.apache.zookeeper.data.Id; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * ZooKeeper token store implementation. - */ -public class ZooKeeperTokenStore implements DelegationTokenStore { - - private static final Logger LOGGER = - LoggerFactory.getLogger(ZooKeeperTokenStore.class.getName()); - - protected static final String ZK_SEQ_FORMAT = "%010d"; - private static final String NODE_KEYS = "/keys"; - private static final String NODE_TOKENS = "/tokens"; - - private String rootNode = ""; - private volatile CuratorFramework zkSession; - private String zkConnectString; - private int connectTimeoutMillis; - private List newNodeAcl = Arrays.asList(new ACL(Perms.ALL, Ids.AUTH_IDS)); - - /** - * ACLProvider permissions will be used in case parent dirs need to be created - */ - private final ACLProvider aclDefaultProvider = new ACLProvider() { - - @Override - public List getDefaultAcl() { - return newNodeAcl; - } - - @Override - public List getAclForPath(String path) { - return getDefaultAcl(); - } - }; - - - private ServerMode serverMode; - - private final String WHEN_ZK_DSTORE_MSG = "when zookeeper based delegation token storage is enabled" - + "(hive.cluster.delegation.token.store.class=" + ZooKeeperTokenStore.class.getName() + ")"; - - private Configuration conf; - - /** - * Default constructor for dynamic instantiation w/ Configurable - * (ReflectionUtils does not support Configuration constructor injection). - */ - protected ZooKeeperTokenStore() { - } - - private CuratorFramework getSession() { - if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) { - synchronized (this) { - if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) { - zkSession = - CuratorFrameworkFactory.builder().connectString(zkConnectString) - .connectionTimeoutMs(connectTimeoutMillis).aclProvider(aclDefaultProvider) - .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); - zkSession.start(); - } - } - } - return zkSession; - } - - private void setupJAASConfig(Configuration conf) throws IOException { - if (!UserGroupInformation.getLoginUser().isFromKeytab()) { - // The process has not logged in using keytab - // this should be a test mode, can't use keytab to authenticate - // with zookeeper. - LOGGER.warn("Login is not from keytab"); - return; - } - - String principal; - String keytab; - switch (serverMode) { - case METASTORE: - principal = getNonEmptyConfVar(conf, "hive.metastore.kerberos.principal"); - keytab = getNonEmptyConfVar(conf, "hive.metastore.kerberos.keytab.file"); - break; - case HIVESERVER2: - principal = getNonEmptyConfVar(conf, "hive.server2.authentication.kerberos.principal"); - keytab = getNonEmptyConfVar(conf, "hive.server2.authentication.kerberos.keytab"); - break; - default: - throw new AssertionError("Unexpected server mode " + serverMode); - } - Utils.setZookeeperClientKerberosJaasConfig(principal, keytab); - } - - private String getNonEmptyConfVar(Configuration conf, String param) throws IOException { - String val = conf.get(param); - if (val == null || val.trim().isEmpty()) { - throw new IOException("Configuration parameter " + param + " should be set, " - + WHEN_ZK_DSTORE_MSG); - } - return val; - } - - /** - * Create a path if it does not already exist ("mkdir -p") - * @param path string with '/' separator - * @param acl list of ACL entries - * @throws TokenStoreException - */ - public void ensurePath(String path, List acl) - throws TokenStoreException { - try { - CuratorFramework zk = getSession(); - String node = zk.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) - .withACL(acl).forPath(path); - LOGGER.info("Created path: {} ", node); - } catch (KeeperException.NodeExistsException e) { - // node already exists - } catch (Exception e) { - throw new TokenStoreException("Error creating path " + path, e); - } - } - - /** - * Parse ACL permission string, from ZooKeeperMain private method - * @param permString - * @return - */ - public static int getPermFromString(String permString) { - int perm = 0; - for (int i = 0; i < permString.length(); i++) { - switch (permString.charAt(i)) { - case 'r': - perm |= ZooDefs.Perms.READ; - break; - case 'w': - perm |= ZooDefs.Perms.WRITE; - break; - case 'c': - perm |= ZooDefs.Perms.CREATE; - break; - case 'd': - perm |= ZooDefs.Perms.DELETE; - break; - case 'a': - perm |= ZooDefs.Perms.ADMIN; - break; - default: - LOGGER.error("Unknown perm type: " + permString.charAt(i)); - } - } - return perm; - } - - /** - * Parse comma separated list of ACL entries to secure generated nodes, e.g. - * sasl:hive/host1@MY.DOMAIN:cdrwa,sasl:hive/host2@MY.DOMAIN:cdrwa - * @param aclString - * @return ACL list - */ - public static List parseACLs(String aclString) { - String[] aclComps = StringUtils.splitByWholeSeparator(aclString, ","); - List acl = new ArrayList(aclComps.length); - for (String a : aclComps) { - if (StringUtils.isBlank(a)) { - continue; - } - a = a.trim(); - // from ZooKeeperMain private method - int firstColon = a.indexOf(':'); - int lastColon = a.lastIndexOf(':'); - if (firstColon == -1 || lastColon == -1 || firstColon == lastColon) { - LOGGER.error(a + " does not have the form scheme:id:perm"); - continue; - } - ACL newAcl = new ACL(); - newAcl.setId(new Id(a.substring(0, firstColon), a.substring( - firstColon + 1, lastColon))); - newAcl.setPerms(getPermFromString(a.substring(lastColon + 1))); - acl.add(newAcl); - } - return acl; - } - - private void initClientAndPaths() { - if (this.zkSession != null) { - this.zkSession.close(); - } - try { - ensurePath(rootNode + NODE_KEYS, newNodeAcl); - ensurePath(rootNode + NODE_TOKENS, newNodeAcl); - } catch (TokenStoreException e) { - throw e; - } - } - - @Override - public void setConf(Configuration conf) { - if (conf == null) { - throw new IllegalArgumentException("conf is null"); - } - this.conf = conf; - } - - @Override - public Configuration getConf() { - return null; // not required - } - - private Map getAllKeys() throws KeeperException, InterruptedException { - - String masterKeyNode = rootNode + NODE_KEYS; - - // get children of key node - List nodes = zkGetChildren(masterKeyNode); - - // read each child node, add to results - Map result = new HashMap(); - for (String node : nodes) { - String nodePath = masterKeyNode + "/" + node; - byte[] data = zkGetData(nodePath); - if (data != null) { - result.put(getSeq(node), data); - } - } - return result; - } - - private List zkGetChildren(String path) { - CuratorFramework zk = getSession(); - try { - return zk.getChildren().forPath(path); - } catch (Exception e) { - throw new TokenStoreException("Error getting children for " + path, e); - } - } - - private byte[] zkGetData(String nodePath) { - CuratorFramework zk = getSession(); - try { - return zk.getData().forPath(nodePath); - } catch (KeeperException.NoNodeException ex) { - return null; - } catch (Exception e) { - throw new TokenStoreException("Error reading " + nodePath, e); - } - } - - private int getSeq(String path) { - String[] pathComps = path.split("/"); - return Integer.parseInt(pathComps[pathComps.length-1]); - } - - @Override - public int addMasterKey(String s) { - String keysPath = rootNode + NODE_KEYS + "/"; - CuratorFramework zk = getSession(); - String newNode; - try { - newNode = zk.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).withACL(newNodeAcl) - .forPath(keysPath, s.getBytes()); - } catch (Exception e) { - throw new TokenStoreException("Error creating new node with path " + keysPath, e); - } - LOGGER.info("Added key {}", newNode); - return getSeq(newNode); - } - - @Override - public void updateMasterKey(int keySeq, String s) { - CuratorFramework zk = getSession(); - String keyPath = rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq); - try { - zk.setData().forPath(keyPath, s.getBytes()); - } catch (Exception e) { - throw new TokenStoreException("Error setting data in " + keyPath, e); - } - } - - @Override - public boolean removeMasterKey(int keySeq) { - String keyPath = rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq); - zkDelete(keyPath); - return true; - } - - private void zkDelete(String path) { - CuratorFramework zk = getSession(); - try { - zk.delete().forPath(path); - } catch (KeeperException.NoNodeException ex) { - // already deleted - } catch (Exception e) { - throw new TokenStoreException("Error deleting " + path, e); - } - } - - @Override - public String[] getMasterKeys() { - try { - Map allKeys = getAllKeys(); - String[] result = new String[allKeys.size()]; - int resultIdx = 0; - for (byte[] keyBytes : allKeys.values()) { - result[resultIdx++] = new String(keyBytes); - } - return result; - } catch (KeeperException ex) { - throw new TokenStoreException(ex); - } catch (InterruptedException ex) { - throw new TokenStoreException(ex); - } - } - - - private String getTokenPath(DelegationTokenIdentifier tokenIdentifier) { - try { - return rootNode + NODE_TOKENS + "/" - + TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier); - } catch (IOException ex) { - throw new TokenStoreException("Failed to encode token identifier", ex); - } - } - - @Override - public boolean addToken(DelegationTokenIdentifier tokenIdentifier, - DelegationTokenInformation token) { - byte[] tokenBytes = HiveDelegationTokenSupport.encodeDelegationTokenInformation(token); - String tokenPath = getTokenPath(tokenIdentifier); - CuratorFramework zk = getSession(); - String newNode; - try { - newNode = zk.create().withMode(CreateMode.PERSISTENT).withACL(newNodeAcl) - .forPath(tokenPath, tokenBytes); - } catch (Exception e) { - throw new TokenStoreException("Error creating new node with path " + tokenPath, e); - } - - LOGGER.info("Added token: {}", newNode); - return true; - } - - @Override - public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) { - String tokenPath = getTokenPath(tokenIdentifier); - zkDelete(tokenPath); - return true; - } - - @Override - public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) { - byte[] tokenBytes = zkGetData(getTokenPath(tokenIdentifier)); - if(tokenBytes == null) { - // The token is already removed. - return null; - } - try { - return HiveDelegationTokenSupport.decodeDelegationTokenInformation(tokenBytes); - } catch (Exception ex) { - throw new TokenStoreException("Failed to decode token", ex); - } - } - - @Override - public List getAllDelegationTokenIdentifiers() { - String containerNode = rootNode + NODE_TOKENS; - final List nodes = zkGetChildren(containerNode); - List result = new java.util.ArrayList( - nodes.size()); - for (String node : nodes) { - DelegationTokenIdentifier id = new DelegationTokenIdentifier(); - try { - TokenStoreDelegationTokenSecretManager.decodeWritable(id, node); - result.add(id); - } catch (Exception e) { - LOGGER.warn("Failed to decode token '{}'", node); - } - } - return result; - } - - @Override - public void close() throws IOException { - if (this.zkSession != null) { - this.zkSession.close(); - } - } - - @Override - public void init(Object hmsHandler, ServerMode smode) { - this.serverMode = smode; - zkConnectString = - conf.get(HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null); - if (zkConnectString == null || zkConnectString.trim().isEmpty()) { - // try alternate config param - zkConnectString = - conf.get( - HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE, - null); - if (zkConnectString == null || zkConnectString.trim().isEmpty()) { - throw new IllegalArgumentException("Zookeeper connect string has to be specifed through " - + "either " + HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR - + " or " - + HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE - + WHEN_ZK_DSTORE_MSG); - } - } - connectTimeoutMillis = - conf.getInt( - HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS, - CuratorFrameworkFactory.builder().getConnectionTimeoutMs()); - String aclStr = conf.get(HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_ACL, null); - if (StringUtils.isNotBlank(aclStr)) { - this.newNodeAcl = parseACLs(aclStr); - } - rootNode = - conf.get(HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE, - HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT) + serverMode; - - try { - // Install the JAAS Configuration for the runtime - setupJAASConfig(conf); - } catch (IOException e) { - throw new TokenStoreException("Error setting up JAAS configuration for zookeeper client " - + e.getMessage(), e); - } - initClientAndPaths(); - } - -} diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/token/delegation/DBTokenStore.java b/shims/common/src/main/java/org/apache/hadoop/hive/token/delegation/DBTokenStore.java new file mode 100644 index 0000000..1922cc0 --- /dev/null +++ b/shims/common/src/main/java/org/apache/hadoop/hive/token/delegation/DBTokenStore.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.token.delegation; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; +import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DBTokenStore implements DelegationTokenStore { + private static final Logger LOG = LoggerFactory.getLogger(DBTokenStore.class); + + @Override + public int addMasterKey(String s) throws TokenStoreException { + if (LOG.isTraceEnabled()) { + LOG.trace("addMasterKey: s = " + s); + } + return (Integer)invokeOnRawStore("addMasterKey", new Object[]{s},String.class); + } + + @Override + public void updateMasterKey(int keySeq, String s) throws TokenStoreException { + if (LOG.isTraceEnabled()) { + LOG.trace("updateMasterKey: s = " + s + ", keySeq = " + keySeq); + } + invokeOnRawStore("updateMasterKey", new Object[] {Integer.valueOf(keySeq), s}, + Integer.class, String.class); + } + + @Override + public boolean removeMasterKey(int keySeq) { + return (Boolean)invokeOnRawStore("removeMasterKey", new Object[] {Integer.valueOf(keySeq)}, + Integer.class); + } + + @Override + public String[] getMasterKeys() throws TokenStoreException { + return (String[])invokeOnRawStore("getMasterKeys", new Object[0]); + } + + @Override + public boolean addToken(DelegationTokenIdentifier tokenIdentifier, + DelegationTokenInformation token) throws TokenStoreException { + + try { + String identifier = TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier); + String tokenStr = Base64.encodeBase64URLSafeString( + HiveDelegationTokenSupport.encodeDelegationTokenInformation(token)); + boolean result = (Boolean)invokeOnRawStore("addToken", new Object[] {identifier, tokenStr}, + String.class, String.class); + if (LOG.isTraceEnabled()) { + LOG.trace("addToken: tokenIdentifier = " + tokenIdentifier + ", added = " + result); + } + return result; + } catch (IOException e) { + throw new TokenStoreException(e); + } + } + + @Override + public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) + throws TokenStoreException { + try { + String tokenStr = (String)invokeOnRawStore("getToken", new Object[] { + TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier)}, String.class); + DelegationTokenInformation result = null; + if (tokenStr != null) { + result = HiveDelegationTokenSupport.decodeDelegationTokenInformation(Base64.decodeBase64(tokenStr)); + } + if (LOG.isTraceEnabled()) { + LOG.trace("getToken: tokenIdentifier = " + tokenIdentifier + ", result = " + result); + } + return result; + } catch (IOException e) { + throw new TokenStoreException(e); + } + } + + @Override + public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) throws TokenStoreException{ + try { + boolean result = (Boolean)invokeOnRawStore("removeToken", new Object[] { + TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier)}, String.class); + if (LOG.isTraceEnabled()) { + LOG.trace("removeToken: tokenIdentifier = " + tokenIdentifier + ", removed = " + result); + } + return result; + } catch (IOException e) { + throw new TokenStoreException(e); + } + } + + @Override + public List getAllDelegationTokenIdentifiers() throws TokenStoreException{ + + List tokenIdents = (List)invokeOnRawStore("getAllTokenIdentifiers", new Object[0]); + List delTokenIdents = new ArrayList(tokenIdents.size()); + + for (String tokenIdent : tokenIdents) { + DelegationTokenIdentifier delToken = new DelegationTokenIdentifier(); + try { + TokenStoreDelegationTokenSecretManager.decodeWritable(delToken, tokenIdent); + } catch (IOException e) { + throw new TokenStoreException(e); + } + delTokenIdents.add(delToken); + } + return delTokenIdents; + } + + private Object hmsHandler; + + @Override + public void init(Object hms, ServerMode smode) throws TokenStoreException { + this.hmsHandler = hms; + } + + private Object invokeOnRawStore(String methName, Object[] params, Class ... paramTypes) + throws TokenStoreException{ + + try { + Object rawStore = hmsHandler.getClass().getMethod("getMS").invoke(hmsHandler); + return rawStore.getClass().getMethod(methName, paramTypes).invoke(rawStore, params); + } catch (IllegalArgumentException e) { + throw new TokenStoreException(e); + } catch (SecurityException e) { + throw new TokenStoreException(e); + } catch (IllegalAccessException e) { + throw new TokenStoreException(e); + } catch (InvocationTargetException e) { + throw new TokenStoreException(e.getCause()); + } catch (NoSuchMethodException e) { + throw new TokenStoreException(e); + } + } + + @Override + public void setConf(Configuration conf) { + // No-op + } + + @Override + public Configuration getConf() { + return null; + } + + @Override + public void close() throws IOException { + // No-op. + } + +} diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/token/delegation/DelegationTokenIdentifier.java b/shims/common/src/main/java/org/apache/hadoop/hive/token/delegation/DelegationTokenIdentifier.java new file mode 100644 index 0000000..15f5b22 --- /dev/null +++ b/shims/common/src/main/java/org/apache/hadoop/hive/token/delegation/DelegationTokenIdentifier.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.token.delegation; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; + +/** + * A delegation token identifier that is specific to Hive. + */ +public class DelegationTokenIdentifier + extends AbstractDelegationTokenIdentifier { + public static final Text HIVE_DELEGATION_KIND = new Text("HIVE_DELEGATION_TOKEN"); + + /** + * Create an empty delegation token identifier for reading into. + */ + public DelegationTokenIdentifier() { + } + + /** + * Create a new delegation token identifier + * @param owner the effective username of the token owner + * @param renewer the username of the renewer + * @param realUser the real username of the token owner + */ + public DelegationTokenIdentifier(Text owner, Text renewer, Text realUser) { + super(owner, renewer, realUser); + } + + @Override + public Text getKind() { + return HIVE_DELEGATION_KIND; + } + +} diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/token/delegation/DelegationTokenSecretManager.java b/shims/common/src/main/java/org/apache/hadoop/hive/token/delegation/DelegationTokenSecretManager.java new file mode 100644 index 0000000..d6e6bc6 --- /dev/null +++ b/shims/common/src/main/java/org/apache/hadoop/hive/token/delegation/DelegationTokenSecretManager.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.token.delegation; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; + +/** + * A Hive specific delegation token secret manager. + * The secret manager is responsible for generating and accepting the password + * for each token. + */ +public class DelegationTokenSecretManager + extends AbstractDelegationTokenSecretManager { + + /** + * Create a secret manager + * @param delegationKeyUpdateInterval the number of seconds for rolling new + * secret keys. + * @param delegationTokenMaxLifetime the maximum lifetime of the delegation + * tokens + * @param delegationTokenRenewInterval how often the tokens must be renewed + * @param delegationTokenRemoverScanInterval how often the tokens are scanned + * for expired tokens + */ + public DelegationTokenSecretManager(long delegationKeyUpdateInterval, + long delegationTokenMaxLifetime, + long delegationTokenRenewInterval, + long delegationTokenRemoverScanInterval) { + super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, + delegationTokenRenewInterval, delegationTokenRemoverScanInterval); + } + + @Override + public DelegationTokenIdentifier createIdentifier() { + return new DelegationTokenIdentifier(); + } + + public synchronized void cancelDelegationToken(String tokenStrForm) throws IOException { + Token t= new Token(); + t.decodeFromUrlString(tokenStrForm); + String user = UserGroupInformation.getCurrentUser().getUserName(); + cancelToken(t, user); + } + + public synchronized long renewDelegationToken(String tokenStrForm) throws IOException { + Token t= new Token(); + t.decodeFromUrlString(tokenStrForm); + String user = UserGroupInformation.getCurrentUser().getUserName(); + return renewToken(t, user); + } + + public synchronized String getDelegationToken(String renewer) throws IOException { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + Text owner = new Text(ugi.getUserName()); + Text realUser = null; + if (ugi.getRealUser() != null) { + realUser = new Text(ugi.getRealUser().getUserName()); + } + DelegationTokenIdentifier ident = + new DelegationTokenIdentifier(owner, new Text(renewer), realUser); + Token t = new Token( + ident, this); + return t.encodeToUrlString(); + } + + public String getUserFromToken(String tokenStr) throws IOException { + Token delegationToken = new Token(); + delegationToken.decodeFromUrlString(tokenStr); + + ByteArrayInputStream buf = new ByteArrayInputStream(delegationToken.getIdentifier()); + DataInputStream in = new DataInputStream(buf); + DelegationTokenIdentifier id = createIdentifier(); + id.readFields(in); + return id.getUser().getShortUserName(); + } +} + diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/token/delegation/DelegationTokenSelector.java b/shims/common/src/main/java/org/apache/hadoop/hive/token/delegation/DelegationTokenSelector.java new file mode 100644 index 0000000..c9fa566 --- /dev/null +++ b/shims/common/src/main/java/org/apache/hadoop/hive/token/delegation/DelegationTokenSelector.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.token.delegation; + +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector; + +/** + * A delegation token that is specialized for Hive + */ + +public class DelegationTokenSelector + extends AbstractDelegationTokenSelector{ + + public DelegationTokenSelector() { + super(DelegationTokenIdentifier.HIVE_DELEGATION_KIND); + } +} diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/token/delegation/DelegationTokenStore.java b/shims/common/src/main/java/org/apache/hadoop/hive/token/delegation/DelegationTokenStore.java new file mode 100644 index 0000000..14adbae --- /dev/null +++ b/shims/common/src/main/java/org/apache/hadoop/hive/token/delegation/DelegationTokenStore.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.token.delegation; + +import java.io.Closeable; +import java.util.List; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; + +/** + * Interface for pluggable token store that can be implemented with shared external + * storage for load balancing and high availability (for example using ZooKeeper). + * Internal, store specific errors are translated into {@link TokenStoreException}. + */ +public interface DelegationTokenStore extends Configurable, Closeable { + + /** + * Exception for internal token store errors that typically cannot be handled by the caller. + */ + public static class TokenStoreException extends RuntimeException { + private static final long serialVersionUID = -8693819817623074083L; + + public TokenStoreException(Throwable cause) { + super(cause); + } + + public TokenStoreException(String message, Throwable cause) { + super(message, cause); + } + } + + /** + * Add new master key. The token store assigns and returns the sequence number. + * Caller needs to use the identifier to update the key (since it is embedded in the key). + * + * @param s + * @return sequence number for new key + */ + int addMasterKey(String s) throws TokenStoreException; + + /** + * Update master key (for expiration and setting store assigned sequence within key) + * @param keySeq + * @param s + * @throws TokenStoreException + */ + void updateMasterKey(int keySeq, String s) throws TokenStoreException; + + /** + * Remove key for given id. + * @param keySeq + * @return false if key no longer present, true otherwise. + */ + boolean removeMasterKey(int keySeq); + + /** + * Return all master keys. + * @return + * @throws TokenStoreException + */ + String[] getMasterKeys() throws TokenStoreException; + + /** + * Add token. If identifier is already present, token won't be added. + * @param tokenIdentifier + * @param token + * @return true if token was added, false for existing identifier + */ + boolean addToken(DelegationTokenIdentifier tokenIdentifier, + DelegationTokenInformation token) throws TokenStoreException; + + /** + * Get token. Returns null if the token does not exist. + * @param tokenIdentifier + * @return + */ + DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) + throws TokenStoreException; + + /** + * Remove token. Return value can be used by caller to detect concurrency. + * @param tokenIdentifier + * @return true if token was removed, false if it was already removed. + * @throws TokenStoreException + */ + boolean removeToken(DelegationTokenIdentifier tokenIdentifier) throws TokenStoreException; + + /** + * List of all token identifiers in the store. This is used to remove expired tokens + * and a potential scalability improvement would be to partition by master key id + * @return + */ + List getAllDelegationTokenIdentifiers() throws TokenStoreException; + + /** + * @param hmsHandler ObjectStore used by DBTokenStore + * @param smode Indicate whether this is a metastore or hiveserver2 token store + */ + void init(Object hmsHandler, ServerMode smode); + +} diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/token/delegation/HiveDelegationTokenManager.java b/shims/common/src/main/java/org/apache/hadoop/hive/token/delegation/HiveDelegationTokenManager.java new file mode 100644 index 0000000..c693d50 --- /dev/null +++ b/shims/common/src/main/java/org/apache/hadoop/hive/token/delegation/HiveDelegationTokenManager.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.hive.token.delegation; + +import java.io.IOException; +import java.net.InetAddress; +import java.security.PrivilegedExceptionAction; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.shims.Utils; +import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; +import org.apache.hadoop.security.authorize.AuthorizationException; +import org.apache.hadoop.security.authorize.ProxyUsers; +import org.apache.hadoop.util.ReflectionUtils; + +public class HiveDelegationTokenManager { + + public static final String DELEGATION_TOKEN_GC_INTERVAL = + "hive.cluster.delegation.token.gc-interval"; + private final static long DELEGATION_TOKEN_GC_INTERVAL_DEFAULT = 3600000; // 1 hour + // Delegation token related keys + public static final String DELEGATION_KEY_UPDATE_INTERVAL_KEY = + "hive.cluster.delegation.key.update-interval"; + public static final long DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT = + 24*60*60*1000; // 1 day + public static final String DELEGATION_TOKEN_RENEW_INTERVAL_KEY = + "hive.cluster.delegation.token.renew-interval"; + public static final long DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT = + 24*60*60*1000; // 1 day + public static final String DELEGATION_TOKEN_MAX_LIFETIME_KEY = + "hive.cluster.delegation.token.max-lifetime"; + public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT = + 7*24*60*60*1000; // 7 days + public static final String DELEGATION_TOKEN_STORE_CLS = + "hive.cluster.delegation.token.store.class"; + public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR = + "hive.cluster.delegation.token.store.zookeeper.connectString"; + // Alternate connect string specification configuration + public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE = + "hive.zookeeper.quorum"; + + public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS = + "hive.cluster.delegation.token.store.zookeeper.connectTimeoutMillis"; + public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE = + "hive.cluster.delegation.token.store.zookeeper.znode"; + public static final String DELEGATION_TOKEN_STORE_ZK_ACL = + "hive.cluster.delegation.token.store.zookeeper.acl"; + public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT = + "/hivedelegation"; + + protected DelegationTokenSecretManager secretManager; + private InetAddress remoteAddr; + + + public HiveDelegationTokenManager(InetAddress remoteAddr) { + this.remoteAddr = remoteAddr; + } + + public void startDelegationTokenSecretManager(Configuration conf, Object hms, ServerMode smode) + throws IOException { + long secretKeyInterval = + conf.getLong(DELEGATION_KEY_UPDATE_INTERVAL_KEY, DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT); + long tokenMaxLifetime = + conf.getLong(DELEGATION_TOKEN_MAX_LIFETIME_KEY, DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT); + long tokenRenewInterval = + conf.getLong(DELEGATION_TOKEN_RENEW_INTERVAL_KEY, DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT); + long tokenGcInterval = + conf.getLong(DELEGATION_TOKEN_GC_INTERVAL, DELEGATION_TOKEN_GC_INTERVAL_DEFAULT); + + DelegationTokenStore dts = getTokenStore(conf); + dts.init(hms, smode); + secretManager = + new TokenStoreDelegationTokenSecretManager(secretKeyInterval, tokenMaxLifetime, + tokenRenewInterval, tokenGcInterval, dts); + secretManager.startThreads(); + } + + public String getDelegationToken(final String owner, final String renewer) throws IOException, + InterruptedException { + /** + * If the user asking the token is same as the 'owner' then don't do + * any proxy authorization checks. For cases like oozie, where it gets + * a delegation token for another user, we need to make sure oozie is + * authorized to get a delegation token. + */ + // Do all checks on short names + UserGroupInformation currUser = UserGroupInformation.getCurrentUser(); + UserGroupInformation ownerUgi = UserGroupInformation.createRemoteUser(owner); + if (!ownerUgi.getShortUserName().equals(currUser.getShortUserName())) { + // in the case of proxy users, the getCurrentUser will return the + // real user (for e.g. oozie) due to the doAs that happened just before the + // server started executing the method getDelegationToken in the MetaStore + ownerUgi = UserGroupInformation.createProxyUser(owner, UserGroupInformation.getCurrentUser()); + ProxyUsers.authorize(ownerUgi, remoteAddr.getHostAddress(), null); + } + return ownerUgi.doAs(new PrivilegedExceptionAction() { + + @Override + public String run() throws IOException { + return secretManager.getDelegationToken(renewer); + } + }); + } + + public String getDelegationTokenWithService(String owner, String renewer, String service) + throws IOException, InterruptedException { + String token = getDelegationToken(owner, renewer); + return Utils.addServiceToToken(token, service); + } + + public long renewDelegationToken(String tokenStrForm) + throws IOException { + return secretManager.renewDelegationToken(tokenStrForm); + } + + public String getUserFromToken(String tokenStr) throws IOException { + return secretManager.getUserFromToken(tokenStr); + } + + public void cancelDelegationToken(String tokenStrForm) throws IOException { + secretManager.cancelDelegationToken(tokenStrForm); + } + + private DelegationTokenStore getTokenStore(Configuration conf) throws IOException { + String tokenStoreClassName = conf.get(DELEGATION_TOKEN_STORE_CLS, ""); + if (StringUtils.isBlank(tokenStoreClassName)) { + return new MemoryTokenStore(); + } + try { + Class storeClass = + Class.forName(tokenStoreClassName).asSubclass(DelegationTokenStore.class); + return ReflectionUtils.newInstance(storeClass, conf); + } catch (ClassNotFoundException e) { + throw new IOException("Error initializing delegation token store: " + tokenStoreClassName, e); + } + } + + +} diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/token/delegation/MemoryTokenStore.java b/shims/common/src/main/java/org/apache/hadoop/hive/token/delegation/MemoryTokenStore.java new file mode 100644 index 0000000..96eed7b --- /dev/null +++ b/shims/common/src/main/java/org/apache/hadoop/hive/token/delegation/MemoryTokenStore.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.token.delegation; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Default in-memory token store implementation. + */ +public class MemoryTokenStore implements DelegationTokenStore { + private static final Logger LOG = LoggerFactory.getLogger(MemoryTokenStore.class); + + private final Map masterKeys + = new ConcurrentHashMap(); + + private final ConcurrentHashMap tokens + = new ConcurrentHashMap(); + + private final AtomicInteger masterKeySeq = new AtomicInteger(); + private Configuration conf; + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public Configuration getConf() { + return this.conf; + } + + @Override + public int addMasterKey(String s) { + int keySeq = masterKeySeq.getAndIncrement(); + if (LOG.isTraceEnabled()) { + LOG.trace("addMasterKey: s = " + s + ", keySeq = " + keySeq); + } + masterKeys.put(keySeq, s); + return keySeq; + } + + @Override + public void updateMasterKey(int keySeq, String s) { + if (LOG.isTraceEnabled()) { + LOG.trace("updateMasterKey: s = " + s + ", keySeq = " + keySeq); + } + masterKeys.put(keySeq, s); + } + + @Override + public boolean removeMasterKey(int keySeq) { + if (LOG.isTraceEnabled()) { + LOG.trace("removeMasterKey: keySeq = " + keySeq); + } + return masterKeys.remove(keySeq) != null; + } + + @Override + public String[] getMasterKeys() { + return masterKeys.values().toArray(new String[0]); + } + + @Override + public boolean addToken(DelegationTokenIdentifier tokenIdentifier, + DelegationTokenInformation token) { + DelegationTokenInformation tokenInfo = tokens.putIfAbsent(tokenIdentifier, token); + if (LOG.isTraceEnabled()) { + LOG.trace("addToken: tokenIdentifier = " + tokenIdentifier + ", added = " + (tokenInfo == null)); + } + return (tokenInfo == null); + } + + @Override + public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) { + DelegationTokenInformation tokenInfo = tokens.remove(tokenIdentifier); + if (LOG.isTraceEnabled()) { + LOG.trace("removeToken: tokenIdentifier = " + tokenIdentifier + ", removed = " + (tokenInfo != null)); + } + return tokenInfo != null; + } + + @Override + public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) { + DelegationTokenInformation result = tokens.get(tokenIdentifier); + if (LOG.isTraceEnabled()) { + LOG.trace("getToken: tokenIdentifier = " + tokenIdentifier + ", result = " + result); + } + return result; + } + + @Override + public List getAllDelegationTokenIdentifiers() { + List result = new ArrayList( + tokens.size()); + for (DelegationTokenIdentifier id : tokens.keySet()) { + result.add(id); + } + return result; + } + + @Override + public void close() throws IOException { + //no-op + } + + @Override + public void init(Object hmsHandler, ServerMode smode) throws TokenStoreException { + // no-op + } +} diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/token/delegation/TokenStoreDelegationTokenSecretManager.java b/shims/common/src/main/java/org/apache/hadoop/hive/token/delegation/TokenStoreDelegationTokenSecretManager.java new file mode 100644 index 0000000..51db69e --- /dev/null +++ b/shims/common/src/main/java/org/apache/hadoop/hive/token/delegation/TokenStoreDelegationTokenSecretManager.java @@ -0,0 +1,337 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.token.delegation; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; +import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport; +import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Extension of {@link DelegationTokenSecretManager} to support alternative to default in-memory + * token management for fail-over and clustering through plug-able token store (ZooKeeper etc.). + * Delegation tokens will be retrieved from the store on-demand and (unlike base class behavior) not + * cached in memory. This avoids complexities related to token expiration. The security token is + * needed only at the time the transport is opened (as opposed to per interface operation). The + * assumption therefore is low cost of interprocess token retrieval (for random read efficient store + * such as ZooKeeper) compared to overhead of synchronizing per-process in-memory token caches. + * The wrapper incorporates the token store abstraction within the limitations of current + * Hive/Hadoop dependency (.20S) with minimum code duplication. + * Eventually this should be supported by Hadoop security directly. + */ +public class TokenStoreDelegationTokenSecretManager extends DelegationTokenSecretManager { + + private static final Logger LOGGER = + LoggerFactory.getLogger(TokenStoreDelegationTokenSecretManager.class.getName()); + + final private long keyUpdateInterval; + final private long tokenRemoverScanInterval; + private Thread tokenRemoverThread; + + final private DelegationTokenStore tokenStore; + + public TokenStoreDelegationTokenSecretManager(long delegationKeyUpdateInterval, + long delegationTokenMaxLifetime, long delegationTokenRenewInterval, + long delegationTokenRemoverScanInterval, + DelegationTokenStore sharedStore) { + super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, delegationTokenRenewInterval, + delegationTokenRemoverScanInterval); + this.keyUpdateInterval = delegationKeyUpdateInterval; + this.tokenRemoverScanInterval = delegationTokenRemoverScanInterval; + + this.tokenStore = sharedStore; + } + + protected DelegationTokenIdentifier getTokenIdentifier(Token token) + throws IOException { + // turn bytes back into identifier for cache lookup + ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier()); + DataInputStream in = new DataInputStream(buf); + DelegationTokenIdentifier id = createIdentifier(); + id.readFields(in); + return id; + } + + public Map reloadKeys() { + // read keys from token store + String[] allKeys = tokenStore.getMasterKeys(); + Map keys + = new HashMap(allKeys.length); + for (String keyStr : allKeys) { + DelegationKey key = new DelegationKey(); + try { + decodeWritable(key, keyStr); + keys.put(key.getKeyId(), key); + } catch (IOException ex) { + LOGGER.error("Failed to load master key.", ex); + } + } + synchronized (this) { + super.allKeys.clear(); + super.allKeys.putAll(keys); + } + return keys; + } + + @Override + public byte[] retrievePassword(DelegationTokenIdentifier identifier) throws InvalidToken { + DelegationTokenInformation info = this.tokenStore.getToken(identifier); + if (info == null) { + throw new InvalidToken("token expired or does not exist: " + identifier); + } + // must reuse super as info.getPassword is not accessible + synchronized (this) { + try { + super.currentTokens.put(identifier, info); + return super.retrievePassword(identifier); + } finally { + super.currentTokens.remove(identifier); + } + } + } + + @Override + public DelegationTokenIdentifier cancelToken(Token token, + String canceller) throws IOException { + DelegationTokenIdentifier id = getTokenIdentifier(token); + LOGGER.info("Token cancelation requested for identifier: "+id); + this.tokenStore.removeToken(id); + return id; + } + + /** + * Create the password and add it to shared store. + */ + @Override + protected byte[] createPassword(DelegationTokenIdentifier id) { + byte[] password; + DelegationTokenInformation info; + synchronized (this) { + password = super.createPassword(id); + // add new token to shared store + // need to persist expiration along with password + info = super.currentTokens.remove(id); + if (info == null) { + throw new IllegalStateException("Failed to retrieve token after creation"); + } + } + this.tokenStore.addToken(id, info); + return password; + } + + @Override + public long renewToken(Token token, + String renewer) throws InvalidToken, IOException { + // since renewal is KERBEROS authenticated token may not be cached + final DelegationTokenIdentifier id = getTokenIdentifier(token); + DelegationTokenInformation tokenInfo = this.tokenStore.getToken(id); + if (tokenInfo == null) { + throw new InvalidToken("token does not exist: " + id); // no token found + } + // ensure associated master key is available + if (!super.allKeys.containsKey(id.getMasterKeyId())) { + LOGGER.info("Unknown master key (id={}), (re)loading keys from token store.", + id.getMasterKeyId()); + reloadKeys(); + } + // reuse super renewal logic + synchronized (this) { + super.currentTokens.put(id, tokenInfo); + try { + return super.renewToken(token, renewer); + } finally { + super.currentTokens.remove(id); + } + } + } + + public static String encodeWritable(Writable key) throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + key.write(dos); + dos.flush(); + return Base64.encodeBase64URLSafeString(bos.toByteArray()); + } + + public static void decodeWritable(Writable w, String idStr) throws IOException { + DataInputStream in = new DataInputStream(new ByteArrayInputStream(Base64.decodeBase64(idStr))); + w.readFields(in); + } + + /** + * Synchronize master key updates / sequence generation for multiple nodes. + * NOTE: {@Link AbstractDelegationTokenSecretManager} keeps currentKey private, so we need + * to utilize this "hook" to manipulate the key through the object reference. + * This .20S workaround should cease to exist when Hadoop supports token store. + */ + @Override + public void logUpdateMasterKey(DelegationKey key) throws IOException { + int keySeq = this.tokenStore.addMasterKey(encodeWritable(key)); + // update key with assigned identifier + DelegationKey keyWithSeq = new DelegationKey(keySeq, key.getExpiryDate(), key.getKey()); + String keyStr = encodeWritable(keyWithSeq); + this.tokenStore.updateMasterKey(keySeq, keyStr); + decodeWritable(key, keyStr); + LOGGER.info("New master key with key id={}", key.getKeyId()); + super.logUpdateMasterKey(key); + } + + @Override + public synchronized void startThreads() throws IOException { + try { + // updateCurrentKey needs to be called to initialize the master key + // (there should be a null check added in the future in rollMasterKey) + // updateCurrentKey(); + Method m = AbstractDelegationTokenSecretManager.class.getDeclaredMethod("updateCurrentKey"); + m.setAccessible(true); + m.invoke(this); + } catch (Exception e) { + throw new IOException("Failed to initialize master key", e); + } + running = true; + tokenRemoverThread = new Daemon(new ExpiredTokenRemover()); + tokenRemoverThread.start(); + } + + @Override + public synchronized void stopThreads() { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Stopping expired delegation token remover thread"); + } + running = false; + if (tokenRemoverThread != null) { + tokenRemoverThread.interrupt(); + } + } + + /** + * Remove expired tokens. Replaces logic in {@link AbstractDelegationTokenSecretManager} + * that cannot be reused due to private method access. Logic here can more efficiently + * deal with external token store by only loading into memory the minimum data needed. + */ + public void removeExpiredTokens() { + long now = System.currentTimeMillis(); + Iterator i = tokenStore.getAllDelegationTokenIdentifiers() + .iterator(); + while (i.hasNext()) { + DelegationTokenIdentifier id = i.next(); + if (now > id.getMaxDate()) { + this.tokenStore.removeToken(id); // no need to look at token info + } else { + // get token info to check renew date + DelegationTokenInformation tokenInfo = tokenStore.getToken(id); + if (tokenInfo != null) { + if (now > tokenInfo.getRenewDate()) { + this.tokenStore.removeToken(id); + } + } + } + } + } + + /** + * Extension of rollMasterKey to remove expired keys from store. + * + * @throws IOException + */ + public void rollMasterKeyExt() throws IOException { + Map keys = reloadKeys(); + int currentKeyId = super.currentId; + HiveDelegationTokenSupport.rollMasterKey(TokenStoreDelegationTokenSecretManager.this); + List keysAfterRoll = Arrays.asList(getAllKeys()); + for (DelegationKey key : keysAfterRoll) { + keys.remove(key.getKeyId()); + if (key.getKeyId() == currentKeyId) { + tokenStore.updateMasterKey(currentKeyId, encodeWritable(key)); + } + } + for (DelegationKey expiredKey : keys.values()) { + LOGGER.info("Removing expired key id={}", expiredKey.getKeyId()); + try { + tokenStore.removeMasterKey(expiredKey.getKeyId()); + } catch (Exception e) { + LOGGER.error("Error removing expired key id={}", expiredKey.getKeyId(), e); + } + } + } + + /** + * Cloned from {@link AbstractDelegationTokenSecretManager} to deal with private access + * restriction (there would not be an need to clone the remove thread if the remove logic was + * protected/extensible). + */ + protected class ExpiredTokenRemover extends Thread { + private long lastMasterKeyUpdate; + private long lastTokenCacheCleanup; + + @Override + public void run() { + LOGGER.info("Starting expired delegation token remover thread, " + + "tokenRemoverScanInterval=" + tokenRemoverScanInterval + / (60 * 1000) + " min(s)"); + try { + while (running) { + long now = System.currentTimeMillis(); + if (lastMasterKeyUpdate + keyUpdateInterval < now) { + try { + rollMasterKeyExt(); + lastMasterKeyUpdate = now; + } catch (IOException e) { + LOGGER.error("Master key updating failed. " + + StringUtils.stringifyException(e)); + } + } + if (lastTokenCacheCleanup + tokenRemoverScanInterval < now) { + removeExpiredTokens(); + lastTokenCacheCleanup = now; + } + try { + Thread.sleep(5000); // 5 seconds + } catch (InterruptedException ie) { + LOGGER + .error("InterruptedExcpetion recieved for ExpiredTokenRemover thread " + + ie); + } + } + } catch (Throwable t) { + LOGGER.error("ExpiredTokenRemover thread received unexpected exception. " + + t, t); + } + } + } + +} diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/token/delegation/ZooKeeperTokenStore.java b/shims/common/src/main/java/org/apache/hadoop/hive/token/delegation/ZooKeeperTokenStore.java new file mode 100644 index 0000000..c4b4935 --- /dev/null +++ b/shims/common/src/main/java/org/apache/hadoop/hive/token/delegation/ZooKeeperTokenStore.java @@ -0,0 +1,477 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.token.delegation; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang.StringUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.ACLProvider; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.hive.shims.Utils; +import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge; +import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; +import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooDefs.Perms; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Id; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ZooKeeper token store implementation. + */ +public class ZooKeeperTokenStore implements DelegationTokenStore { + + private static final Logger LOGGER = + LoggerFactory.getLogger(ZooKeeperTokenStore.class.getName()); + + protected static final String ZK_SEQ_FORMAT = "%010d"; + private static final String NODE_KEYS = "/keys"; + private static final String NODE_TOKENS = "/tokens"; + + private String rootNode = ""; + private volatile CuratorFramework zkSession; + private String zkConnectString; + private int connectTimeoutMillis; + private List newNodeAcl = Arrays.asList(new ACL(Perms.ALL, Ids.AUTH_IDS)); + + /** + * ACLProvider permissions will be used in case parent dirs need to be created + */ + private final ACLProvider aclDefaultProvider = new ACLProvider() { + + @Override + public List getDefaultAcl() { + return newNodeAcl; + } + + @Override + public List getAclForPath(String path) { + return getDefaultAcl(); + } + }; + + + private ServerMode serverMode; + + private final String WHEN_ZK_DSTORE_MSG = "when zookeeper based delegation token storage is enabled" + + "(hive.cluster.delegation.token.store.class=" + ZooKeeperTokenStore.class.getName() + ")"; + + private Configuration conf; + + /** + * Default constructor for dynamic instantiation w/ Configurable + * (ReflectionUtils does not support Configuration constructor injection). + */ + protected ZooKeeperTokenStore() { + } + + private CuratorFramework getSession() { + if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) { + synchronized (this) { + if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) { + zkSession = + CuratorFrameworkFactory.builder().connectString(zkConnectString) + .connectionTimeoutMs(connectTimeoutMillis).aclProvider(aclDefaultProvider) + .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); + zkSession.start(); + } + } + } + return zkSession; + } + + private void setupJAASConfig(Configuration conf) throws IOException { + if (!UserGroupInformation.getLoginUser().isFromKeytab()) { + // The process has not logged in using keytab + // this should be a test mode, can't use keytab to authenticate + // with zookeeper. + LOGGER.warn("Login is not from keytab"); + return; + } + + String principal; + String keytab; + switch (serverMode) { + case METASTORE: + principal = getNonEmptyConfVar(conf, "hive.metastore.kerberos.principal"); + keytab = getNonEmptyConfVar(conf, "hive.metastore.kerberos.keytab.file"); + break; + case HIVESERVER2: + principal = getNonEmptyConfVar(conf, "hive.server2.authentication.kerberos.principal"); + keytab = getNonEmptyConfVar(conf, "hive.server2.authentication.kerberos.keytab"); + break; + default: + throw new AssertionError("Unexpected server mode " + serverMode); + } + Utils.setZookeeperClientKerberosJaasConfig(principal, keytab); + } + + private String getNonEmptyConfVar(Configuration conf, String param) throws IOException { + String val = conf.get(param); + if (val == null || val.trim().isEmpty()) { + throw new IOException("Configuration parameter " + param + " should be set, " + + WHEN_ZK_DSTORE_MSG); + } + return val; + } + + /** + * Create a path if it does not already exist ("mkdir -p") + * @param path string with '/' separator + * @param acl list of ACL entries + * @throws TokenStoreException + */ + public void ensurePath(String path, List acl) + throws TokenStoreException { + try { + CuratorFramework zk = getSession(); + String node = zk.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) + .withACL(acl).forPath(path); + LOGGER.info("Created path: {} ", node); + } catch (KeeperException.NodeExistsException e) { + // node already exists + } catch (Exception e) { + throw new TokenStoreException("Error creating path " + path, e); + } + } + + /** + * Parse ACL permission string, from ZooKeeperMain private method + * @param permString + * @return + */ + public static int getPermFromString(String permString) { + int perm = 0; + for (int i = 0; i < permString.length(); i++) { + switch (permString.charAt(i)) { + case 'r': + perm |= ZooDefs.Perms.READ; + break; + case 'w': + perm |= ZooDefs.Perms.WRITE; + break; + case 'c': + perm |= ZooDefs.Perms.CREATE; + break; + case 'd': + perm |= ZooDefs.Perms.DELETE; + break; + case 'a': + perm |= ZooDefs.Perms.ADMIN; + break; + default: + LOGGER.error("Unknown perm type: " + permString.charAt(i)); + } + } + return perm; + } + + /** + * Parse comma separated list of ACL entries to secure generated nodes, e.g. + * sasl:hive/host1@MY.DOMAIN:cdrwa,sasl:hive/host2@MY.DOMAIN:cdrwa + * @param aclString + * @return ACL list + */ + public static List parseACLs(String aclString) { + String[] aclComps = StringUtils.splitByWholeSeparator(aclString, ","); + List acl = new ArrayList(aclComps.length); + for (String a : aclComps) { + if (StringUtils.isBlank(a)) { + continue; + } + a = a.trim(); + // from ZooKeeperMain private method + int firstColon = a.indexOf(':'); + int lastColon = a.lastIndexOf(':'); + if (firstColon == -1 || lastColon == -1 || firstColon == lastColon) { + LOGGER.error(a + " does not have the form scheme:id:perm"); + continue; + } + ACL newAcl = new ACL(); + newAcl.setId(new Id(a.substring(0, firstColon), a.substring( + firstColon + 1, lastColon))); + newAcl.setPerms(getPermFromString(a.substring(lastColon + 1))); + acl.add(newAcl); + } + return acl; + } + + private void initClientAndPaths() { + if (this.zkSession != null) { + this.zkSession.close(); + } + try { + ensurePath(rootNode + NODE_KEYS, newNodeAcl); + ensurePath(rootNode + NODE_TOKENS, newNodeAcl); + } catch (TokenStoreException e) { + throw e; + } + } + + @Override + public void setConf(Configuration conf) { + if (conf == null) { + throw new IllegalArgumentException("conf is null"); + } + this.conf = conf; + } + + @Override + public Configuration getConf() { + return null; // not required + } + + private Map getAllKeys() throws KeeperException, InterruptedException { + + String masterKeyNode = rootNode + NODE_KEYS; + + // get children of key node + List nodes = zkGetChildren(masterKeyNode); + + // read each child node, add to results + Map result = new HashMap(); + for (String node : nodes) { + String nodePath = masterKeyNode + "/" + node; + byte[] data = zkGetData(nodePath); + if (data != null) { + result.put(getSeq(node), data); + } + } + return result; + } + + private List zkGetChildren(String path) { + CuratorFramework zk = getSession(); + try { + return zk.getChildren().forPath(path); + } catch (Exception e) { + throw new TokenStoreException("Error getting children for " + path, e); + } + } + + private byte[] zkGetData(String nodePath) { + CuratorFramework zk = getSession(); + try { + return zk.getData().forPath(nodePath); + } catch (KeeperException.NoNodeException ex) { + return null; + } catch (Exception e) { + throw new TokenStoreException("Error reading " + nodePath, e); + } + } + + private int getSeq(String path) { + String[] pathComps = path.split("/"); + return Integer.parseInt(pathComps[pathComps.length-1]); + } + + @Override + public int addMasterKey(String s) { + String keysPath = rootNode + NODE_KEYS + "/"; + CuratorFramework zk = getSession(); + String newNode; + try { + newNode = zk.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).withACL(newNodeAcl) + .forPath(keysPath, s.getBytes()); + } catch (Exception e) { + throw new TokenStoreException("Error creating new node with path " + keysPath, e); + } + LOGGER.info("Added key {}", newNode); + return getSeq(newNode); + } + + @Override + public void updateMasterKey(int keySeq, String s) { + CuratorFramework zk = getSession(); + String keyPath = rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq); + try { + zk.setData().forPath(keyPath, s.getBytes()); + } catch (Exception e) { + throw new TokenStoreException("Error setting data in " + keyPath, e); + } + } + + @Override + public boolean removeMasterKey(int keySeq) { + String keyPath = rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq); + zkDelete(keyPath); + return true; + } + + private void zkDelete(String path) { + CuratorFramework zk = getSession(); + try { + zk.delete().forPath(path); + } catch (KeeperException.NoNodeException ex) { + // already deleted + } catch (Exception e) { + throw new TokenStoreException("Error deleting " + path, e); + } + } + + @Override + public String[] getMasterKeys() { + try { + Map allKeys = getAllKeys(); + String[] result = new String[allKeys.size()]; + int resultIdx = 0; + for (byte[] keyBytes : allKeys.values()) { + result[resultIdx++] = new String(keyBytes); + } + return result; + } catch (KeeperException ex) { + throw new TokenStoreException(ex); + } catch (InterruptedException ex) { + throw new TokenStoreException(ex); + } + } + + + private String getTokenPath(DelegationTokenIdentifier tokenIdentifier) { + try { + return rootNode + NODE_TOKENS + "/" + + TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier); + } catch (IOException ex) { + throw new TokenStoreException("Failed to encode token identifier", ex); + } + } + + @Override + public boolean addToken(DelegationTokenIdentifier tokenIdentifier, + DelegationTokenInformation token) { + byte[] tokenBytes = HiveDelegationTokenSupport.encodeDelegationTokenInformation(token); + String tokenPath = getTokenPath(tokenIdentifier); + CuratorFramework zk = getSession(); + String newNode; + try { + newNode = zk.create().withMode(CreateMode.PERSISTENT).withACL(newNodeAcl) + .forPath(tokenPath, tokenBytes); + } catch (Exception e) { + throw new TokenStoreException("Error creating new node with path " + tokenPath, e); + } + + LOGGER.info("Added token: {}", newNode); + return true; + } + + @Override + public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) { + String tokenPath = getTokenPath(tokenIdentifier); + zkDelete(tokenPath); + return true; + } + + @Override + public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) { + byte[] tokenBytes = zkGetData(getTokenPath(tokenIdentifier)); + if(tokenBytes == null) { + // The token is already removed. + return null; + } + try { + return HiveDelegationTokenSupport.decodeDelegationTokenInformation(tokenBytes); + } catch (Exception ex) { + throw new TokenStoreException("Failed to decode token", ex); + } + } + + @Override + public List getAllDelegationTokenIdentifiers() { + String containerNode = rootNode + NODE_TOKENS; + final List nodes = zkGetChildren(containerNode); + List result = new java.util.ArrayList( + nodes.size()); + for (String node : nodes) { + DelegationTokenIdentifier id = new DelegationTokenIdentifier(); + try { + TokenStoreDelegationTokenSecretManager.decodeWritable(id, node); + result.add(id); + } catch (Exception e) { + LOGGER.warn("Failed to decode token '{}'", node); + } + } + return result; + } + + @Override + public void close() throws IOException { + if (this.zkSession != null) { + this.zkSession.close(); + } + } + + @Override + public void init(Object hmsHandler, ServerMode smode) { + this.serverMode = smode; + zkConnectString = + conf.get(HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null); + if (zkConnectString == null || zkConnectString.trim().isEmpty()) { + // try alternate config param + zkConnectString = + conf.get( + HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE, + null); + if (zkConnectString == null || zkConnectString.trim().isEmpty()) { + throw new IllegalArgumentException("Zookeeper connect string has to be specifed through " + + "either " + HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR + + " or " + + HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE + + WHEN_ZK_DSTORE_MSG); + } + } + connectTimeoutMillis = + conf.getInt( + HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS, + CuratorFrameworkFactory.builder().getConnectionTimeoutMs()); + String aclStr = conf.get(HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_ACL, null); + if (StringUtils.isNotBlank(aclStr)) { + this.newNodeAcl = parseACLs(aclStr); + } + rootNode = + conf.get(HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE, + HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT) + serverMode; + + try { + // Install the JAAS Configuration for the runtime + setupJAASConfig(conf); + } catch (IOException e) { + throw new TokenStoreException("Error setting up JAAS configuration for zookeeper client " + + e.getMessage(), e); + } + initClientAndPaths(); + } + +}