commit 8c04c12dd358b50da10c1cc4bf743d6dbac30783 Author: Thejas Nair Date: Sat Feb 27 18:32:59 2016 -0800 HIVE-750 (monarch) - HIVE-13169 - HiveServer2: Support delegation token based connection when using http transport diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestZooKeeperTokenStore.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestZooKeeperTokenStore.java index 65a10e3..7800416 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestZooKeeperTokenStore.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/thrift/TestZooKeeperTokenStore.java @@ -70,9 +70,9 @@ protected void tearDown() throws Exception { private Configuration createConf(String zkPath) { Configuration conf = new Configuration(); - conf.set(HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, "localhost:" + conf.set(HiveDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, "localhost:" + this.zkPort); - conf.set(HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE, zkPath); + conf.set(HiveDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ZNODE, zkPath); return conf; } @@ -80,7 +80,7 @@ public void testTokenStorage() throws Exception { String ZK_PATH = "/zktokenstore-testTokenStorage"; ts = new ZooKeeperTokenStore(); Configuration conf = createConf(ZK_PATH); - conf.set(HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_ACL, "world:anyone:cdrwa"); + conf.set(HiveDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ACL, "world:anyone:cdrwa"); ts.setConf(conf); ts.init(null, ServerMode.METASTORE); @@ -129,7 +129,7 @@ public void testAclNoAuth() throws Exception { String ZK_PATH = "/zktokenstore-testAclNoAuth"; Configuration conf = createConf(ZK_PATH); conf.set( - HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_ACL, + HiveDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ACL, "ip:127.0.0.1:r"); ts = new ZooKeeperTokenStore(); @@ -147,7 +147,7 @@ public void testAclInvalid() throws Exception { String aclString = "sasl:hive/host@TEST.DOMAIN:cdrwa, fail-parse-ignored"; Configuration conf = createConf(ZK_PATH); conf.set( - HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_ACL, + HiveDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ACL, aclString); List aclList = ZooKeeperTokenStore.parseACLs(aclString); @@ -167,7 +167,7 @@ public void testAclPositive() throws Exception { String ZK_PATH = "/zktokenstore-testAcl"; Configuration conf = createConf(ZK_PATH); conf.set( - HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_ACL, + HiveDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ACL, "ip:127.0.0.1:cdrwa,world:anyone:cdrwa"); ts = new ZooKeeperTokenStore(); ts.setConf(conf); diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java index f980e05..b66d880 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java @@ -291,8 +291,14 @@ private CloseableHttpClient getHttpClient(Boolean useSsl) throws SQLException { new HttpKerberosRequestInterceptor(sessConfMap.get(JdbcConnectionParams.AUTH_PRINCIPAL), host, getServerHttpUrl(useSsl), assumeSubject, cookieStore, cookieName, useSsl, additionalHttpHeaders); - } - else { + } else { + // Check for delegation token, if present add it in the header + String tokenStr = getClientDelegationToken(sessConfMap); + if (tokenStr != null) { + requestInterceptor = + new HttpTokenAuthInterceptor(tokenStr, cookieStore, cookieName, useSsl, + additionalHttpHeaders); + } else { /** * Add an interceptor to pass username/password in the header. * In https mode, the entire information is encrypted @@ -300,6 +306,7 @@ private CloseableHttpClient getHttpClient(Boolean useSsl) throws SQLException { requestInterceptor = new HttpBasicAuthInterceptor(getUserName(), getPassword(), cookieStore, cookieName, useSsl, additionalHttpHeaders); + } } // Configure http client for cookie based authentication if (isCookieEnabled) { diff --git a/jdbc/src/java/org/apache/hive/jdbc/HttpTokenAuthInterceptor.java b/jdbc/src/java/org/apache/hive/jdbc/HttpTokenAuthInterceptor.java new file mode 100644 index 0000000..207ed9e --- /dev/null +++ b/jdbc/src/java/org/apache/hive/jdbc/HttpTokenAuthInterceptor.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import java.util.Map; + +import org.apache.http.HttpRequest; +import org.apache.http.client.CookieStore; +import org.apache.http.protocol.HttpContext; + +/** + * The class is instantiated with the username and password, it is then + * used to add header with these credentials to HTTP requests + * + */ +public class HttpTokenAuthInterceptor extends HttpRequestInterceptorBase { + private String tokenStr; + private static final String HIVE_DELEGATION_TOKEN_HEADER = "X-Hive-Delegation-Token"; + + public HttpTokenAuthInterceptor(String tokenStr, CookieStore cookieStore, String cn, + boolean isSSL, Map additionalHeaders) { + super(cookieStore, cn, isSSL, additionalHeaders); + this.tokenStr = tokenStr; + } + + @Override + protected void addHttpAuthHeader(HttpRequest httpRequest, HttpContext httpContext) + throws Exception { + httpRequest.addHeader(HIVE_DELEGATION_TOKEN_HEADER, tokenStr); + } +} \ No newline at end of file diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 9fc5498..a64bd59 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -205,6 +205,7 @@ import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge; import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; import org.apache.hadoop.hive.thrift.TUGIContainingTransport; +import org.apache.hadoop.hive.thrift.HiveDelegationTokenManager; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; @@ -267,6 +268,7 @@ protected DateFormat initialValue() { public static final String PUBLIC = "public"; private static HadoopThriftAuthBridge.Server saslServer; + private static HiveDelegationTokenManager delegationTokenManager; private static boolean useSasl; private static final class ChainedTTransportFactory extends TTransportFactory { @@ -5282,7 +5284,7 @@ public String get_delegation_token(String token_owner, try { ret = HiveMetaStore.getDelegationToken(token_owner, - renewer_kerberos_principal_name); + renewer_kerberos_principal_name, getIpAddress()); } catch (IOException e) { ex = e; throw new MetaException(e.getMessage()); @@ -5797,7 +5799,7 @@ public static Iface newRetryingHMSHandler(String name, HiveConf conf, boolean lo */ public static void cancelDelegationToken(String tokenStrForm ) throws IOException { - saslServer.cancelDelegationToken(tokenStrForm); + delegationTokenManager.cancelDelegationToken(tokenStrForm); } /** @@ -5806,9 +5808,9 @@ public static void cancelDelegationToken(String tokenStrForm * @param renewer * the designated renewer */ - public static String getDelegationToken(String owner, String renewer) + public static String getDelegationToken(String owner, String renewer, String remoteAddr) throws IOException, InterruptedException { - return saslServer.getDelegationToken(owner, renewer); + return delegationTokenManager.getDelegationToken(owner, renewer, remoteAddr); } /** @@ -5826,7 +5828,7 @@ public static boolean isMetaStoreRemote() { */ public static long renewDelegationToken(String tokenStrForm ) throws IOException { - return saslServer.renewDelegationToken(tokenStrForm); + return delegationTokenManager.renewDelegationToken(tokenStrForm); } /** @@ -6017,8 +6019,11 @@ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge, saslServer = bridge.createServer( conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_KEYTAB_FILE), conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL)); - // start delegation token manager - saslServer.startDelegationTokenSecretManager(conf, baseHandler.getMS(), ServerMode.METASTORE); + // Start delegation token manager + delegationTokenManager = new HiveDelegationTokenManager(); + delegationTokenManager.startDelegationTokenSecretManager(conf, baseHandler, + ServerMode.METASTORE); + saslServer.setSecretManager(delegationTokenManager.getSecretManager()); transFactory = saslServer.createTransportFactory( MetaStoreUtils.getMetaStoreSaslProperties(conf)); processor = saslServer.wrapProcessor( diff --git a/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java b/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java index 56ad357..4b736aa 100644 --- a/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java +++ b/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java @@ -18,6 +18,7 @@ package org.apache.hive.service.auth; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; @@ -38,11 +39,13 @@ import org.apache.hadoop.hive.shims.HadoopShims.KerberosNameShim; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.thrift.DBTokenStore; +import org.apache.hadoop.hive.thrift.HiveDelegationTokenManager; import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge; import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.ProxyUsers; +import org.apache.hive.service.ServiceException; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.thrift.ThriftCLIService; import org.apache.thrift.TProcessorFactory; @@ -90,6 +93,7 @@ public String getAuthName() { public static final String HS2_PROXY_USER = "hive.server2.proxy.user"; public static final String HS2_CLIENT_TOKEN = "hiveserver2ClientToken"; + private HiveDelegationTokenManager delegationTokenManager = null; public HiveAuthFactory(HiveConf conf) throws TTransportException { this.conf = conf; @@ -112,14 +116,17 @@ public HiveAuthFactory(HiveConf conf) throws TTransportException { // rawStore is only necessary for DBTokenStore Object rawStore = null; String tokenStoreClass = conf.getVar(HiveConf.ConfVars.METASTORE_CLUSTER_DELEGATION_TOKEN_STORE_CLS); - + delegationTokenManager = new HiveDelegationTokenManager(); + HMSHandler baseHandler = null; if (tokenStoreClass.equals(DBTokenStore.class.getName())) { - HMSHandler baseHandler = new HiveMetaStore.HMSHandler( + baseHandler = new HiveMetaStore.HMSHandler( "new db based metaserver", conf, true); rawStore = baseHandler.getMS(); } + delegationTokenManager.startDelegationTokenSecretManager(conf, rawStore, + ServerMode.HIVESERVER2); + saslServer.setSecretManager(delegationTokenManager.getSecretManager()); - saslServer.startDelegationTokenSecretManager(conf, rawStore, ServerMode.HIVESERVER2); } catch (MetaException|IOException e) { throw new TTransportException("Failed to start token manager", e); @@ -275,14 +282,16 @@ public static TServerSocket getServerSSLSocket(String hiveHost, int portNum, Str } // retrieve delegation token for the given user - public String getDelegationToken(String owner, String renewer) throws HiveSQLException { - if (saslServer == null) { + public String getDelegationToken(String owner, String renewer, String remoteAddr) + throws HiveSQLException { + if (delegationTokenManager == null) { throw new HiveSQLException( "Delegation token only supported over kerberos authentication", "08S01"); } try { - String tokenStr = saslServer.getDelegationTokenWithService(owner, renewer, HS2_CLIENT_TOKEN); + String tokenStr = delegationTokenManager.getDelegationTokenWithService(owner, renewer, + HS2_CLIENT_TOKEN, remoteAddr); if (tokenStr == null || tokenStr.isEmpty()) { throw new HiveSQLException( "Received empty retrieving delegation token for user " + owner, "08S01"); @@ -298,12 +307,12 @@ public String getDelegationToken(String owner, String renewer) throws HiveSQLExc // cancel given delegation token public void cancelDelegationToken(String delegationToken) throws HiveSQLException { - if (saslServer == null) { + if (delegationTokenManager == null) { throw new HiveSQLException( "Delegation token only supported over kerberos authentication", "08S01"); } try { - saslServer.cancelDelegationToken(delegationToken); + delegationTokenManager.cancelDelegationToken(delegationToken); } catch (IOException e) { throw new HiveSQLException( "Error canceling delegation token " + delegationToken, "08S01", e); @@ -311,25 +320,39 @@ public void cancelDelegationToken(String delegationToken) throws HiveSQLExceptio } public void renewDelegationToken(String delegationToken) throws HiveSQLException { - if (saslServer == null) { + if (delegationTokenManager == null) { throw new HiveSQLException( "Delegation token only supported over kerberos authentication", "08S01"); } try { - saslServer.renewDelegationToken(delegationToken); + delegationTokenManager.renewDelegationToken(delegationToken); } catch (IOException e) { throw new HiveSQLException( "Error renewing delegation token " + delegationToken, "08S01", e); } } + public String verifyDelegationToken(String delegationToken) throws HiveSQLException { + if (delegationTokenManager == null) { + throw new HiveSQLException( + "Delegation token only supported over kerberos authentication", "08S01"); + } + try { + return delegationTokenManager.verifyDelegationToken(delegationToken); + } catch (IOException e) { + String msg = "Error verifying delegation token " + delegationToken; + LOG.error(msg, e); + throw new HiveSQLException(msg, "08S01", e); + } + } + public String getUserFromToken(String delegationToken) throws HiveSQLException { - if (saslServer == null) { + if (delegationTokenManager == null) { throw new HiveSQLException( "Delegation token only supported over kerberos authentication", "08S01"); } try { - return saslServer.getUserFromToken(delegationToken); + return delegationTokenManager.getUserFromToken(delegationToken); } catch (IOException e) { throw new HiveSQLException( "Error extracting user from delegation token " + delegationToken, "08S01", e); diff --git a/service/src/java/org/apache/hive/service/cli/CLIService.java b/service/src/java/org/apache/hive/service/cli/CLIService.java index a3af7b2..fd08cfc 100644 --- a/service/src/java/org/apache/hive/service/cli/CLIService.java +++ b/service/src/java/org/apache/hive/service/cli/CLIService.java @@ -483,7 +483,7 @@ public String getDelegationToken(SessionHandle sessionHandle, HiveAuthFactory au String owner, String renewer) throws HiveSQLException { String delegationToken = sessionManager.getSession(sessionHandle). getDelegationToken(authFactory, owner, renewer); - LOG.info(sessionHandle + ": getDelegationToken()"); + LOG.info(sessionHandle + ": getDelegationToken()" + " owner: " + owner + ", renewer: " + renewer); return delegationToken; } diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index 78bab33..7ad9a06 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -739,7 +739,7 @@ public void setIpAddress(String ipAddress) { public String getDelegationToken(HiveAuthFactory authFactory, String owner, String renewer) throws HiveSQLException { HiveAuthFactory.verifyProxyAccess(getUsername(), owner, getIpAddress(), getHiveConf()); - return authFactory.getDelegationToken(owner, renewer); + return authFactory.getDelegationToken(owner, renewer, getIpAddress()); } @Override diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java index 2f07f79..767b35a 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java @@ -189,7 +189,7 @@ public void setProxySession(HiveSession proxySession) { @Override public String getDelegationToken(HiveAuthFactory authFactory, String owner, String renewer) throws HiveSQLException { - return authFactory.getDelegationToken(owner, renewer); + return authFactory.getDelegationToken(owner, renewer, getIpAddress()); } @Override diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java index 96a71f5..83b2bca 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java @@ -121,7 +121,7 @@ public void run() { UserGroupInformation httpUGI = cliService.getHttpUGI(); String authType = hiveConf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION); TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, authType, - serviceUGI, httpUGI); + serviceUGI, httpUGI, hiveAuthFactory); // Context handler final ServletContextHandler context = new ServletContextHandler( diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java index 56c8cb6..5be0306 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java @@ -42,12 +42,14 @@ import org.apache.hadoop.hive.shims.HadoopShims.KerberosNameShim; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator; import org.apache.hive.service.auth.AuthenticationProviderFactory; import org.apache.hive.service.auth.AuthenticationProviderFactory.AuthMethods; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.auth.HttpAuthUtils; import org.apache.hive.service.auth.HttpAuthenticationException; import org.apache.hive.service.auth.PasswdAuthenticationProvider; +import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.session.SessionManager; import org.apache.hive.service.CookieSigner; import org.apache.thrift.TProcessor; @@ -84,13 +86,17 @@ private int cookieMaxAge; private boolean isCookieSecure; private boolean isHttpOnlyCookie; + private final HiveAuthFactory hiveAuthFactory; + private static final String HIVE_DELEGATION_TOKEN_HEADER = "X-Hive-Delegation-Token"; public ThriftHttpServlet(TProcessor processor, TProtocolFactory protocolFactory, - String authType, UserGroupInformation serviceUGI, UserGroupInformation httpUGI) { + String authType, UserGroupInformation serviceUGI, UserGroupInformation httpUGI, + HiveAuthFactory hiveAuthFactory) { super(processor, protocolFactory); this.authType = authType; this.serviceUGI = serviceUGI; this.httpUGI = httpUGI; + this.hiveAuthFactory = hiveAuthFactory; this.isCookieAuthEnabled = hiveConf.getBoolVar( ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_AUTH_ENABLED); // Initialize the cookie based authentication related variables. @@ -133,7 +139,13 @@ protected void doPost(HttpServletRequest request, HttpServletResponse response) if (clientUserName == null) { // For a kerberos setup if (isKerberosAuthMode(authType)) { - clientUserName = doKerberosAuth(request); + String delegationToken = request.getHeader(HIVE_DELEGATION_TOKEN_HEADER); + // Each http request must have an Authorization header + if ((delegationToken != null) && (!delegationToken.isEmpty())) { + clientUserName = doTokenAuth(request, response); + } else { + clientUserName = doKerberosAuth(request); + } } // For password based authentication else { @@ -332,6 +344,16 @@ private String doPasswdAuth(HttpServletRequest request, String authType) return userName; } + private String doTokenAuth(HttpServletRequest request, HttpServletResponse response) + throws HttpAuthenticationException { + String tokenStr = request.getHeader(HIVE_DELEGATION_TOKEN_HEADER); + try { + return hiveAuthFactory.verifyDelegationToken(tokenStr); + } catch (HiveSQLException e) { + throw new HttpAuthenticationException(e); + } + } + /** * Do the GSS-API kerberos authentication. * We already have a logged in subject in the form of serviceUGI, diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java index 19d1fbf..5299e18 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java @@ -58,6 +58,31 @@ public DelegationTokenIdentifier createIdentifier() { return new DelegationTokenIdentifier(); } + /** + * Verify token string + * @param tokenStrForm + * @return user name + * @throws IOException + */ + public synchronized String verifyDelegationToken(String tokenStrForm) throws IOException { + Token t = new Token(); + t.decodeFromUrlString(tokenStrForm); + + DelegationTokenIdentifier id = getTokenIdentifier(t); + verifyToken(id, t.getPassword()); + return id.getUser().getShortUserName(); + } + + protected DelegationTokenIdentifier getTokenIdentifier(Token token) + throws IOException { + // turn bytes back into identifier for cache lookup + ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier()); + DataInputStream in = new DataInputStream(buf); + DelegationTokenIdentifier id = createIdentifier(); + id.readFields(in); + return id; + } + public synchronized void cancelDelegationToken(String tokenStrForm) throws IOException { Token t= new Token(); t.decodeFromUrlString(tokenStrForm); diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java index d004e40..1d7e97f 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java @@ -46,6 +46,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.Utils; +import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; +import org.apache.hadoop.hive.thrift.DelegationTokenSecretManager; import org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport; import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.SaslRpcServer.AuthMethod; @@ -290,38 +292,6 @@ static String encodeIdentifier(byte[] identifier) { public enum ServerMode { HIVESERVER2, METASTORE }; - public static final String DELEGATION_TOKEN_GC_INTERVAL = - "hive.cluster.delegation.token.gc-interval"; - private final static long DELEGATION_TOKEN_GC_INTERVAL_DEFAULT = 3600000; // 1 hour - //Delegation token related keys - public static final String DELEGATION_KEY_UPDATE_INTERVAL_KEY = - "hive.cluster.delegation.key.update-interval"; - public static final long DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT = - 24*60*60*1000; // 1 day - public static final String DELEGATION_TOKEN_RENEW_INTERVAL_KEY = - "hive.cluster.delegation.token.renew-interval"; - public static final long DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT = - 24*60*60*1000; // 1 day - public static final String DELEGATION_TOKEN_MAX_LIFETIME_KEY = - "hive.cluster.delegation.token.max-lifetime"; - public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT = - 7*24*60*60*1000; // 7 days - public static final String DELEGATION_TOKEN_STORE_CLS = - "hive.cluster.delegation.token.store.class"; - public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR = - "hive.cluster.delegation.token.store.zookeeper.connectString"; - // alternate connect string specification configuration - public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE = - "hive.zookeeper.quorum"; - - public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS = - "hive.cluster.delegation.token.store.zookeeper.connectTimeoutMillis"; - public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE = - "hive.cluster.delegation.token.store.zookeeper.znode"; - public static final String DELEGATION_TOKEN_STORE_ZK_ACL = - "hive.cluster.delegation.token.store.zookeeper.acl"; - public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT = - "/hivedelegation"; protected final UserGroupInformation realUgi; protected DelegationTokenSecretManager secretManager; @@ -359,6 +329,10 @@ protected Server(String keytabFile, String principalConf) } } + public void setSecretManager(DelegationTokenSecretManager secretManager) { + this.secretManager = secretManager; + } + /** * Create a TTransportFactory that, upon connection of a client socket, * negotiates a Kerberized SASL transport. The resulting TTransportFactory @@ -408,109 +382,6 @@ public TProcessor wrapNonAssumingProcessor(TProcessor processor) { return new TUGIAssumingProcessor(processor, secretManager, false); } - protected DelegationTokenStore getTokenStore(Configuration conf) - throws IOException { - String tokenStoreClassName = conf.get(DELEGATION_TOKEN_STORE_CLS, ""); - if (StringUtils.isBlank(tokenStoreClassName)) { - return new MemoryTokenStore(); - } - try { - Class storeClass = Class - .forName(tokenStoreClassName).asSubclass( - DelegationTokenStore.class); - return ReflectionUtils.newInstance(storeClass, conf); - } catch (ClassNotFoundException e) { - throw new IOException("Error initializing delegation token store: " + tokenStoreClassName, - e); - } - } - - - public void startDelegationTokenSecretManager(Configuration conf, Object rawStore, ServerMode smode) - throws IOException{ - long secretKeyInterval = - conf.getLong(DELEGATION_KEY_UPDATE_INTERVAL_KEY, - DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT); - long tokenMaxLifetime = - conf.getLong(DELEGATION_TOKEN_MAX_LIFETIME_KEY, - DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT); - long tokenRenewInterval = - conf.getLong(DELEGATION_TOKEN_RENEW_INTERVAL_KEY, - DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT); - long tokenGcInterval = conf.getLong(DELEGATION_TOKEN_GC_INTERVAL, - DELEGATION_TOKEN_GC_INTERVAL_DEFAULT); - - DelegationTokenStore dts = getTokenStore(conf); - dts.init(rawStore, smode); - secretManager = new TokenStoreDelegationTokenSecretManager(secretKeyInterval, - tokenMaxLifetime, - tokenRenewInterval, - tokenGcInterval, dts); - secretManager.startThreads(); - } - - - public String getDelegationToken(final String owner, final String renewer) - throws IOException, InterruptedException { - if (!authenticationMethod.get().equals(AuthenticationMethod.KERBEROS)) { - throw new AuthorizationException( - "Delegation Token can be issued only with kerberos authentication. " + - "Current AuthenticationMethod: " + authenticationMethod.get() - ); - } - //if the user asking the token is same as the 'owner' then don't do - //any proxy authorization checks. For cases like oozie, where it gets - //a delegation token for another user, we need to make sure oozie is - //authorized to get a delegation token. - //Do all checks on short names - UserGroupInformation currUser = UserGroupInformation.getCurrentUser(); - UserGroupInformation ownerUgi = UserGroupInformation.createRemoteUser(owner); - if (!ownerUgi.getShortUserName().equals(currUser.getShortUserName())) { - //in the case of proxy users, the getCurrentUser will return the - //real user (for e.g. oozie) due to the doAs that happened just before the - //server started executing the method getDelegationToken in the MetaStore - ownerUgi = UserGroupInformation.createProxyUser(owner, - UserGroupInformation.getCurrentUser()); - InetAddress remoteAddr = getRemoteAddress(); - ProxyUsers.authorize(ownerUgi,remoteAddr.getHostAddress(), null); - } - return ownerUgi.doAs(new PrivilegedExceptionAction() { - - @Override - public String run() throws IOException { - return secretManager.getDelegationToken(renewer); - } - }); - } - - - public String getDelegationTokenWithService(String owner, String renewer, String service) - throws IOException, InterruptedException { - String token = getDelegationToken(owner, renewer); - return Utils.addServiceToToken(token, service); - } - - - public long renewDelegationToken(String tokenStrForm) throws IOException { - if (!authenticationMethod.get().equals(AuthenticationMethod.KERBEROS)) { - throw new AuthorizationException( - "Delegation Token can be issued only with kerberos authentication. " + - "Current AuthenticationMethod: " + authenticationMethod.get() - ); - } - return secretManager.renewDelegationToken(tokenStrForm); - } - - - public String getUserFromToken(String tokenStr) throws IOException { - return secretManager.getUserFromToken(tokenStr); - } - - - public void cancelDelegationToken(String tokenStrForm) throws IOException { - secretManager.cancelDelegationToken(tokenStrForm); - } - final static ThreadLocal remoteAddress = new ThreadLocal() { @@ -520,7 +391,6 @@ protected synchronized InetAddress initialValue() { } }; - public InetAddress getRemoteAddress() { return remoteAddress.get(); } diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HiveDelegationTokenManager.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HiveDelegationTokenManager.java new file mode 100644 index 0000000..9ecb0ee --- /dev/null +++ b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HiveDelegationTokenManager.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.hive.thrift; + +import java.io.IOException; +import java.net.InetAddress; +import java.security.PrivilegedExceptionAction; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.shims.Utils; +import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; +import org.apache.hadoop.security.authorize.AuthorizationException; +import org.apache.hadoop.security.authorize.ProxyUsers; +import org.apache.hadoop.util.ReflectionUtils; + +public class HiveDelegationTokenManager { + + public static final String DELEGATION_TOKEN_GC_INTERVAL = + "hive.cluster.delegation.token.gc-interval"; + private final static long DELEGATION_TOKEN_GC_INTERVAL_DEFAULT = 3600000; // 1 hour + // Delegation token related keys + public static final String DELEGATION_KEY_UPDATE_INTERVAL_KEY = + "hive.cluster.delegation.key.update-interval"; + public static final long DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT = + 24*60*60*1000; // 1 day + public static final String DELEGATION_TOKEN_RENEW_INTERVAL_KEY = + "hive.cluster.delegation.token.renew-interval"; + public static final long DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT = + 24*60*60*1000; // 1 day + public static final String DELEGATION_TOKEN_MAX_LIFETIME_KEY = + "hive.cluster.delegation.token.max-lifetime"; + public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT = + 7*24*60*60*1000; // 7 days + public static final String DELEGATION_TOKEN_STORE_CLS = + "hive.cluster.delegation.token.store.class"; + public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR = + "hive.cluster.delegation.token.store.zookeeper.connectString"; + // Alternate connect string specification configuration + public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE = + "hive.zookeeper.quorum"; + + public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS = + "hive.cluster.delegation.token.store.zookeeper.connectTimeoutMillis"; + public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE = + "hive.cluster.delegation.token.store.zookeeper.znode"; + public static final String DELEGATION_TOKEN_STORE_ZK_ACL = + "hive.cluster.delegation.token.store.zookeeper.acl"; + public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT = + "/hivedelegation"; + + protected DelegationTokenSecretManager secretManager; + + public HiveDelegationTokenManager() { + } + + public DelegationTokenSecretManager getSecretManager() { + return secretManager; + } + + public void startDelegationTokenSecretManager(Configuration conf, Object hms, ServerMode smode) + throws IOException { + long secretKeyInterval = + conf.getLong(DELEGATION_KEY_UPDATE_INTERVAL_KEY, DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT); + long tokenMaxLifetime = + conf.getLong(DELEGATION_TOKEN_MAX_LIFETIME_KEY, DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT); + long tokenRenewInterval = + conf.getLong(DELEGATION_TOKEN_RENEW_INTERVAL_KEY, DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT); + long tokenGcInterval = + conf.getLong(DELEGATION_TOKEN_GC_INTERVAL, DELEGATION_TOKEN_GC_INTERVAL_DEFAULT); + + DelegationTokenStore dts = getTokenStore(conf); + dts.init(hms, smode); + secretManager = + new TokenStoreDelegationTokenSecretManager(secretKeyInterval, tokenMaxLifetime, + tokenRenewInterval, tokenGcInterval, dts); + secretManager.startThreads(); + } + + public String getDelegationToken(final String owner, final String renewer, String remoteAddr) + throws IOException, + InterruptedException { + /** + * If the user asking the token is same as the 'owner' then don't do + * any proxy authorization checks. For cases like oozie, where it gets + * a delegation token for another user, we need to make sure oozie is + * authorized to get a delegation token. + */ + // Do all checks on short names + UserGroupInformation currUser = UserGroupInformation.getCurrentUser(); + UserGroupInformation ownerUgi = UserGroupInformation.createRemoteUser(owner); + if (!ownerUgi.getShortUserName().equals(currUser.getShortUserName())) { + // in the case of proxy users, the getCurrentUser will return the + // real user (for e.g. oozie) due to the doAs that happened just before the + // server started executing the method getDelegationToken in the MetaStore + ownerUgi = UserGroupInformation.createProxyUser(owner, UserGroupInformation.getCurrentUser()); + ProxyUsers.authorize(ownerUgi, remoteAddr, null); + } + return ownerUgi.doAs(new PrivilegedExceptionAction() { + + @Override + public String run() throws IOException { + return secretManager.getDelegationToken(renewer); + } + }); + } + + public String getDelegationTokenWithService(String owner, String renewer, String service, String remoteAddr) + throws IOException, InterruptedException { + String token = getDelegationToken(owner, renewer, remoteAddr); + return Utils.addServiceToToken(token, service); + } + + public long renewDelegationToken(String tokenStrForm) + throws IOException { + return secretManager.renewDelegationToken(tokenStrForm); + } + + public String getUserFromToken(String tokenStr) throws IOException { + return secretManager.getUserFromToken(tokenStr); + } + + public void cancelDelegationToken(String tokenStrForm) throws IOException { + secretManager.cancelDelegationToken(tokenStrForm); + } + + /** + * Verify token string + * @param tokenStrForm + * @return user name + * @throws IOException + */ + public String verifyDelegationToken(String tokenStrForm) throws IOException { + return secretManager.verifyDelegationToken(tokenStrForm); + } + + private DelegationTokenStore getTokenStore(Configuration conf) throws IOException { + String tokenStoreClassName = conf.get(DELEGATION_TOKEN_STORE_CLS, ""); + if (StringUtils.isBlank(tokenStoreClassName)) { + return new MemoryTokenStore(); + } + try { + Class storeClass = + Class.forName(tokenStoreClassName).asSubclass(DelegationTokenStore.class); + return ReflectionUtils.newInstance(storeClass, conf); + } catch (ClassNotFoundException e) { + throw new IOException("Error initializing delegation token store: " + tokenStoreClassName, e); + } + } + + +} diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java index 87b418e..abe8cc2 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java @@ -76,16 +76,6 @@ public TokenStoreDelegationTokenSecretManager(long delegationKeyUpdateInterval, this.tokenStore = sharedStore; } - protected DelegationTokenIdentifier getTokenIdentifier(Token token) - throws IOException { - // turn bytes back into identifier for cache lookup - ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier()); - DataInputStream in = new DataInputStream(buf); - DelegationTokenIdentifier id = createIdentifier(); - id.readFields(in); - return id; - } - protected Map reloadKeys() { // read keys from token store String[] allKeys = tokenStore.getMasterKeys(); diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java index 9119d83..2aa5e4a 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java @@ -436,32 +436,32 @@ public void close() throws IOException { public void init(Object objectStore, ServerMode smode) { this.serverMode = smode; zkConnectString = - conf.get(HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null); + conf.get(HiveDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null); if (zkConnectString == null || zkConnectString.trim().isEmpty()) { // try alternate config param zkConnectString = conf.get( - HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE, + HiveDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE, null); if (zkConnectString == null || zkConnectString.trim().isEmpty()) { throw new IllegalArgumentException("Zookeeper connect string has to be specifed through " - + "either " + HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR + + "either " + HiveDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR + " or " - + HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE + + HiveDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE + WHEN_ZK_DSTORE_MSG); } } connectTimeoutMillis = conf.getInt( - HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS, + HiveDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS, CuratorFrameworkFactory.builder().getConnectionTimeoutMs()); - String aclStr = conf.get(HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_ACL, null); + String aclStr = conf.get(HiveDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ACL, null); if (StringUtils.isNotBlank(aclStr)) { this.newNodeAcl = parseACLs(aclStr); } rootNode = - conf.get(HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE, - HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT) + serverMode; + conf.get(HiveDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ZNODE, + HiveDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT) + serverMode; try { // Install the JAAS Configuration for the runtime