diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/thrift/TestThriftHttpCLIService.java b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/thrift/TestThriftHttpCLIService.java index 57fda94..43f92a3 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/thrift/TestThriftHttpCLIService.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/thrift/TestThriftHttpCLIService.java @@ -24,7 +24,6 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hive.jdbc.HttpBasicAuthInterceptor; import org.apache.hive.service.auth.HiveAuthFactory.AuthTypes; -import org.apache.hive.service.server.HiveServer2; import org.apache.http.impl.client.DefaultHttpClient; import org.apache.thrift.transport.THttpClient; import org.apache.thrift.transport.TTransport; @@ -170,40 +169,6 @@ public void testIncorrectHttpPath() throws Exception { } } - - private void testWithAuthMode(AuthTypes authType) throws Exception { - // Stop and restart HiveServer2 in given incorrect auth mode - stopHiveServer2(); - hiveConf.setVar(ConfVars.HIVE_SERVER2_AUTHENTICATION, authType.toString()); - hiveServer2 = new HiveServer2(); - // HiveServer2 in Http mode will not start using KERBEROS/LDAP/CUSTOM auth types - startHiveServer2WithConf(hiveConf); - - // This will throw an expected exception since Http server is not running - testOpenSessionExpectedException(); - - // Stop and restart back with the original config - stopHiveServer2(); - hiveConf.setVar(ConfVars.HIVE_SERVER2_AUTHENTICATION, AuthTypes.NOSASL.toString()); - hiveServer2 = new HiveServer2(); - startHiveServer2WithConf(hiveConf); - } - - @Test - public void testKerberosMode() throws Exception { - testWithAuthMode(AuthTypes.KERBEROS); - } - - @Test - public void testLDAPMode() throws Exception { - testWithAuthMode(AuthTypes.LDAP); - } - - @Test - public void testCustomMode() throws Exception { - testWithAuthMode(AuthTypes.CUSTOM); - } - private static TTransport createHttpTransport() throws Exception { DefaultHttpClient httpClient = new DefaultHttpClient(); String httpUrl = transportMode + "://" + host + ":" + port + diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java index c55aad2..cb54959 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java @@ -47,7 +47,6 @@ import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; -import javax.net.ssl.SSLContext; import javax.security.sasl.Sasl; import javax.security.sasl.SaslException; @@ -74,10 +73,9 @@ import org.apache.hive.service.cli.thrift.TRenewDelegationTokenResp; import org.apache.hive.service.cli.thrift.TSessionHandle; import org.apache.http.HttpRequestInterceptor; -import org.apache.http.conn.ssl.SSLConnectionSocketFactory; -import org.apache.http.conn.ssl.SSLContexts; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; +import org.apache.http.conn.scheme.Scheme; +import org.apache.http.conn.ssl.SSLSocketFactory; +import org.apache.http.impl.client.DefaultHttpClient; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.transport.THttpClient; @@ -196,6 +194,7 @@ public HiveConnection(String uri, Properties info) throws SQLException { } private void openTransport() throws SQLException { + // TODO: Refactor transport creation to a factory, it's getting uber messy here transport = isHttpTransportMode() ? createHttpTransport() : createBinaryTransport(); try { if (!transport.isOpen()) { @@ -207,8 +206,10 @@ private void openTransport() throws SQLException { } } - private TTransport createHttpTransport() throws SQLException { - CloseableHttpClient httpClient; + private String getServerHttpUrl(boolean useSsl) { + // Create the http/https url + // JDBC driver will set up an https url if ssl is enabled, otherwise http + String schemeName = useSsl ? "https" : "http"; // http path should begin with "/" String httpPath; httpPath = hiveConfMap.get( @@ -216,21 +217,28 @@ private TTransport createHttpTransport() throws SQLException { if(httpPath == null) { httpPath = "/"; } - if(!httpPath.startsWith("/")) { + else if(!httpPath.startsWith("/")) { httpPath = "/" + httpPath; } + return schemeName + "://" + host + ":" + port + httpPath; + } + + private TTransport createHttpTransport() throws SQLException { + DefaultHttpClient httpClient; - boolean useSsl = "true".equalsIgnoreCase(sessConfMap.get(HIVE_USE_SSL)); + boolean useSsl = isSslConnection(); // Create an http client from the configs - httpClient = getHttpClient(useSsl); + try { + httpClient = getHttpClient(useSsl); + } catch (Exception e) { + String msg = "Could not create http connection to " + + jdbcURI + ". " + e.getMessage(); + throw new SQLException(msg, " 08S01", e); + } - // Create the http/https url - // JDBC driver will set up an https url if ssl is enabled, otherwise http - String schemeName = useSsl ? "https" : "http"; - String httpUrl = schemeName + "://" + host + ":" + port + httpPath; try { - transport = new THttpClient(httpUrl, httpClient); + transport = new THttpClient(getServerHttpUrl(useSsl), httpClient); } catch (TTransportException e) { String msg = "Could not create http connection to " + @@ -240,29 +248,53 @@ private TTransport createHttpTransport() throws SQLException { return transport; } - private CloseableHttpClient getHttpClient(Boolean useSsl) throws SQLException { - // Add an interceptor to pass username/password in the header - // for basic preemtive http authentication at the server - // In https mode, the entire information is encrypted - HttpRequestInterceptor authInterceptor = new HttpBasicAuthInterceptor( - getUserName(), getPasswd()); - if (useSsl) { - String sslTrustStorePath = sessConfMap.get(HIVE_SSL_TRUST_STORE); - String sslTrustStorePassword = sessConfMap.get( - HIVE_SSL_TRUST_STORE_PASSWORD); - KeyStore sslTrustStore; - SSLContext sslContext; - if (sslTrustStorePath == null || sslTrustStorePath.isEmpty()) { - // Create a default client context based on standard JSSE trust material - sslContext = SSLContexts.createDefault(); - } else { - // Pick trust store config from the given path + private DefaultHttpClient getHttpClient(Boolean useSsl) throws SQLException { + DefaultHttpClient httpClient = new DefaultHttpClient(); + // Request interceptor for any request pre-processing logic + HttpRequestInterceptor requestInterceptor; + // If Kerberos + if (isKerberosAuthMode()) { + if (useSsl) { + String msg = "SSL encryption is currently not supported with " + + "kerberos authentication"; + throw new SQLException(msg, " 08S01"); + } + /** + * Add an interceptor which sets the appropriate header in the request. + * It does the kerberos authentication and get the final service ticket, + * for sending to the server before every request. + */ + requestInterceptor = new HttpKerberosRequestInterceptor( + sessConfMap.get(HIVE_AUTH_PRINCIPAL), host, getServerHttpUrl(false)); + } + else { + /** + * Add an interceptor to pass username/password in the header. + * In https mode, the entire information is encrypted + */ + requestInterceptor = new HttpBasicAuthInterceptor(getUserName(), getPassword()); + // Configure httpClient for SSL + if (useSsl) { + String sslTrustStorePath = sessConfMap.get(HIVE_SSL_TRUST_STORE); + String sslTrustStorePassword = sessConfMap.get( + HIVE_SSL_TRUST_STORE_PASSWORD); + KeyStore sslTrustStore; + SSLSocketFactory socketFactory; try { - sslTrustStore = KeyStore.getInstance(HIVE_SSL_TRUST_STORE_TYPE); - sslTrustStore.load(new FileInputStream(sslTrustStorePath), - sslTrustStorePassword.toCharArray()); - sslContext = SSLContexts.custom().loadTrustMaterial( - sslTrustStore).build(); + if (sslTrustStorePath == null || sslTrustStorePath.isEmpty()) { + // Create a default socket factory based on standard JSSE trust material + socketFactory = SSLSocketFactory.getSocketFactory(); + } + else { + // Pick trust store config from the given path + sslTrustStore = KeyStore.getInstance(HIVE_SSL_TRUST_STORE_TYPE); + sslTrustStore.load(new FileInputStream(sslTrustStorePath), + sslTrustStorePassword.toCharArray()); + socketFactory = new SSLSocketFactory(sslTrustStore); + } + socketFactory.setHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER); + Scheme sslScheme = new Scheme("https", 443, socketFactory); + httpClient.getConnectionManager().getSchemeRegistry().register(sslScheme); } catch (Exception e) { String msg = "Could not create an https connection to " + @@ -270,13 +302,9 @@ private CloseableHttpClient getHttpClient(Boolean useSsl) throws SQLException { throw new SQLException(msg, " 08S01", e); } } - return HttpClients.custom().setHostnameVerifier(SSLConnectionSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER).setSslcontext( - sslContext).addInterceptorFirst(authInterceptor).build(); - } - else { - // Create a plain http client - return HttpClients.custom().addInterceptorFirst(authInterceptor).build(); } + httpClient.addRequestInterceptor(requestInterceptor); + return httpClient; } /** @@ -318,19 +346,12 @@ private TTransport createBinaryTransport() throws SQLException { String tokenStr = getClientDelegationToken(sessConfMap); if (tokenStr != null) { transport = KerberosSaslHelper.getTokenTransport(tokenStr, - host, HiveAuthFactory.getSocketTransport(host, port, loginTimeout), saslProps); + host, HiveAuthFactory.getSocketTransport(host, port, loginTimeout), saslProps); } else { // we are using PLAIN Sasl connection with user/password - String userName = sessConfMap.get(HIVE_AUTH_USER); - if ((userName == null) || userName.isEmpty()) { - userName = HIVE_ANONYMOUS_USER; - } - String passwd = sessConfMap.get(HIVE_AUTH_PASSWD); - if ((passwd == null) || passwd.isEmpty()) { - passwd = HIVE_ANONYMOUS_PASSWD; - } - String useSslStr = sessConfMap.get(HIVE_USE_SSL); - if ("true".equalsIgnoreCase(useSslStr)) { + String userName = getUserName(); + String passwd = getPassword(); + if (isSslConnection()) { // get SSL socket String sslTrustStore = sessConfMap.get(HIVE_SSL_TRUST_STORE); String sslTrustStorePassword = sessConfMap.get(HIVE_SSL_TRUST_STORE_PASSWORD); @@ -338,14 +359,14 @@ private TTransport createBinaryTransport() throws SQLException { transport = HiveAuthFactory.getSSLSocket(host, port, loginTimeout); } else { transport = HiveAuthFactory.getSSLSocket(host, port, loginTimeout, - sslTrustStore, sslTrustStorePassword); + sslTrustStore, sslTrustStorePassword); } } else { // get non-SSL socket transport transport = HiveAuthFactory.getSocketTransport(host, port, loginTimeout); } - // Overlay the SASL transport on top of the base socket transport (SSL or non-SSL) - transport = PlainSaslHelper.getPlainTransport(userName, passwd, transport); + // Overlay the SASL transport on top of the base socket transport (SSL or non-SSL) + transport = PlainSaslHelper.getPlainTransport(userName, passwd, transport); } } } else { @@ -362,16 +383,6 @@ private TTransport createBinaryTransport() throws SQLException { return transport; } - - private boolean isHttpTransportMode() { - String transportMode = - hiveConfMap.get(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname); - if(transportMode != null && (transportMode.equalsIgnoreCase("http"))) { - return true; - } - return false; - } - // Lookup the delegation token. First in the connection URL, then Configuration private String getClientDelegationToken(Map jdbcConnConf) throws SQLException { @@ -452,10 +463,28 @@ private String getUserName() { /** * @return password from sessConfMap */ - private String getPasswd() { + private String getPassword() { return getSessionValue(HIVE_AUTH_PASSWD, HIVE_ANONYMOUS_PASSWD); } + private boolean isSslConnection() { + return "true".equalsIgnoreCase(sessConfMap.get(HIVE_USE_SSL)); + } + + private boolean isKerberosAuthMode() { + return !HIVE_AUTH_SIMPLE.equals(sessConfMap.get(HIVE_AUTH_TYPE)) + && sessConfMap.containsKey(HIVE_AUTH_PRINCIPAL); + } + + private boolean isHttpTransportMode() { + String transportMode = + hiveConfMap.get(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname); + if(transportMode != null && (transportMode.equalsIgnoreCase("http"))) { + return true; + } + return false; + } + /** * Lookup varName in sessConfMap, if its null or empty return the default * value varDefault @@ -494,7 +523,7 @@ public String getDelegationToken(String owner, String renewer) throws SQLExcepti return tokenResp.getDelegationToken(); } catch (TException e) { throw new SQLException("Could not retrieve token: " + - e.getMessage(), " 08S01", e); + e.getMessage(), " 08S01", e); } } @@ -502,12 +531,12 @@ public void cancelDelegationToken(String tokenStr) throws SQLException { TCancelDelegationTokenReq cancelReq = new TCancelDelegationTokenReq(sessHandle, tokenStr); try { TCancelDelegationTokenResp cancelResp = - client.CancelDelegationToken(cancelReq); + client.CancelDelegationToken(cancelReq); Utils.verifySuccess(cancelResp.getStatus()); return; } catch (TException e) { throw new SQLException("Could not cancel token: " + - e.getMessage(), " 08S01", e); + e.getMessage(), " 08S01", e); } } @@ -515,12 +544,12 @@ public void renewDelegationToken(String tokenStr) throws SQLException { TRenewDelegationTokenReq cancelReq = new TRenewDelegationTokenReq(sessHandle, tokenStr); try { TRenewDelegationTokenResp renewResp = - client.RenewDelegationToken(cancelReq); + client.RenewDelegationToken(cancelReq); Utils.verifySuccess(renewResp.getStatus()); return; } catch (TException e) { throw new SQLException("Could not renew token: " + - e.getMessage(), " 08S01", e); + e.getMessage(), " 08S01", e); } } diff --git a/jdbc/src/java/org/apache/hive/jdbc/HttpBasicAuthInterceptor.java b/jdbc/src/java/org/apache/hive/jdbc/HttpBasicAuthInterceptor.java index 66eba1b..dd4f62a 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HttpBasicAuthInterceptor.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HttpBasicAuthInterceptor.java @@ -25,6 +25,7 @@ import org.apache.http.HttpRequest; import org.apache.http.HttpRequestInterceptor; import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.impl.auth.AuthSchemeBase; import org.apache.http.impl.auth.BasicScheme; import org.apache.http.protocol.HttpContext; @@ -34,20 +35,22 @@ * */ public class HttpBasicAuthInterceptor implements HttpRequestInterceptor { + UsernamePasswordCredentials credentials; + AuthSchemeBase authScheme; - Header basicAuthHeader; - public HttpBasicAuthInterceptor(String username, String password){ + public HttpBasicAuthInterceptor(String username, String password) { if(username != null){ - UsernamePasswordCredentials creds = new UsernamePasswordCredentials(username, password); - basicAuthHeader = BasicScheme.authenticate(creds, "UTF-8", false); + credentials = new UsernamePasswordCredentials(username, password); } + authScheme = new BasicScheme(); } @Override - public void process(HttpRequest httpRequest, HttpContext httpContext) throws HttpException, IOException { - if(basicAuthHeader != null){ - httpRequest.addHeader(basicAuthHeader); - } + public void process(HttpRequest httpRequest, HttpContext httpContext) + throws HttpException, IOException { + Header basicAuthHeader = authScheme.authenticate( + credentials, httpRequest, httpContext); + httpRequest.addHeader(basicAuthHeader); } } diff --git a/jdbc/src/java/org/apache/hive/jdbc/HttpKerberosRequestInterceptor.java b/jdbc/src/java/org/apache/hive/jdbc/HttpKerberosRequestInterceptor.java new file mode 100644 index 0000000..1a613fb --- /dev/null +++ b/jdbc/src/java/org/apache/hive/jdbc/HttpKerberosRequestInterceptor.java @@ -0,0 +1,65 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hive.jdbc; + +import java.io.IOException; + +import org.apache.hive.service.auth.HttpAuthUtils; +import org.apache.http.HttpException; +import org.apache.http.HttpRequest; +import org.apache.http.HttpRequestInterceptor; +import org.apache.http.protocol.HttpContext; + +/** +* +* Authentication interceptor which adds Base64 encoded payload, +* containing the username and kerberos service ticket, +* to the outgoing http request header. +* +*/ +public class HttpKerberosRequestInterceptor implements HttpRequestInterceptor { + +String principal; +String host; +String serverHttpUrl; + +public HttpKerberosRequestInterceptor(String principal, String host, + String serverHttpUrl) { + this.principal = principal; + this.host = host; + this.serverHttpUrl = serverHttpUrl; +} + +@Override +public void process(HttpRequest httpRequest, HttpContext httpContext) + throws HttpException, IOException { + String kerberosAuthHeader; + try { + // Generate the service ticket for sending to the server. + kerberosAuthHeader = HttpAuthUtils.getKerberosServiceTicket( + principal, host, serverHttpUrl); + // Set the session key token (Base64 encoded) in the headers + httpRequest.addHeader(HttpAuthUtils.AUTHORIZATION + ": " + + HttpAuthUtils.NEGOTIATE + " ", kerberosAuthHeader); + } catch (Exception e) { + throw new HttpException(e.getMessage(), e); + } +} + +} diff --git a/pom.xml b/pom.xml index 6e8a735..94531e8 100644 --- a/pom.xml +++ b/pom.xml @@ -106,8 +106,8 @@ 0.96.0-hadoop1 0.96.0-hadoop2 - 4.3.2 - 4.3.1 + 4.2.5 + 4.2.5 1.9.2 0.3.2 5.5.1 diff --git a/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java b/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java index 6759903..551a69d 100644 --- a/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java +++ b/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java @@ -32,9 +32,9 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.thrift.ThriftCLIService; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.thrift.TProcessorFactory; import org.apache.thrift.transport.TSSLTransportFactory; import org.apache.thrift.transport.TServerSocket; @@ -55,7 +55,7 @@ KERBEROS("KERBEROS"), CUSTOM("CUSTOM"); - private String authType; // Auth type for SASL + private String authType; AuthTypes(String authType) { this.authType = authType; @@ -69,6 +69,7 @@ public String getAuthName() { private HadoopThriftAuthBridge.Server saslServer = null; private String authTypeStr; + private String transportMode; private final HiveConf conf; public static final String HS2_PROXY_USER = "hive.server2.proxy.user"; @@ -76,40 +77,48 @@ public String getAuthName() { public HiveAuthFactory() throws TTransportException { conf = new HiveConf(); - + transportMode = conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE); authTypeStr = conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION); - if (authTypeStr == null) { - authTypeStr = AuthTypes.NONE.getAuthName(); + + // In http mode we use NOSASL as the default auth type + if (transportMode.equalsIgnoreCase("http")) { + if (authTypeStr == null) { + authTypeStr = AuthTypes.NOSASL.getAuthName(); + } } - if (authTypeStr.equalsIgnoreCase(AuthTypes.KERBEROS.getAuthName()) - && ShimLoader.getHadoopShims().isSecureShimImpl()) { - saslServer = ShimLoader.getHadoopThriftAuthBridge().createServer( - conf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB), - conf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL) - ); - // start delegation token manager - try { - saslServer.startDelegationTokenSecretManager(conf, null); - } catch (IOException e) { - throw new TTransportException("Failed to start token manager", e); + else { + if (authTypeStr == null) { + authTypeStr = AuthTypes.NONE.getAuthName(); + } + if (authTypeStr.equalsIgnoreCase(AuthTypes.KERBEROS.getAuthName()) + && ShimLoader.getHadoopShims().isSecureShimImpl()) { + saslServer = ShimLoader.getHadoopThriftAuthBridge().createServer( + conf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB), + conf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL) + ); + // start delegation token manager + try { + saslServer.startDelegationTokenSecretManager(conf, null); + } catch (IOException e) { + throw new TTransportException("Failed to start token manager", e); + } } - } } public Map getSaslProperties() { Map saslProps = new HashMap(); SaslQOP saslQOP = - SaslQOP.fromString(conf.getVar(ConfVars.HIVE_SERVER2_THRIFT_SASL_QOP)); + SaslQOP.fromString(conf.getVar(ConfVars.HIVE_SERVER2_THRIFT_SASL_QOP)); // hadoop.rpc.protection being set to a higher level than hive.server2.thrift.rpc.protection // does not make sense in most situations. Log warning message in such cases. Map hadoopSaslProps = ShimLoader.getHadoopThriftAuthBridge(). - getHadoopSaslProperties(conf); + getHadoopSaslProperties(conf); SaslQOP hadoopSaslQOP = SaslQOP.fromString(hadoopSaslProps.get(Sasl.QOP)); if(hadoopSaslQOP.ordinal() > saslQOP.ordinal()) { LOG.warn(MessageFormat.format("\"hadoop.rpc.protection\" is set to higher security level " + - "{0} then {1} which is set to {2}", hadoopSaslQOP.toString(), - ConfVars.HIVE_SERVER2_THRIFT_SASL_QOP.varname, saslQOP.toString())); + "{0} then {1} which is set to {2}", hadoopSaslQOP.toString(), + ConfVars.HIVE_SERVER2_THRIFT_SASL_QOP.varname, saslQOP.toString())); } saslProps.put(Sasl.QOP, saslQOP.toString()); saslProps.put(Sasl.SERVER_AUTH, "true"); @@ -142,10 +151,15 @@ public TTransportFactory getAuthTransFactory() throws LoginException { public TProcessorFactory getAuthProcFactory(ThriftCLIService service) throws LoginException { - if (authTypeStr.equalsIgnoreCase(AuthTypes.KERBEROS.getAuthName())) { - return KerberosSaslHelper.getKerberosProcessorFactory(saslServer, service); - } else { - return PlainSaslHelper.getPlainProcessorFactory(service); + if (transportMode.equalsIgnoreCase("http")) { + return HttpAuthUtils.getAuthProcFactory(service); + } + else { + if (authTypeStr.equalsIgnoreCase(AuthTypes.KERBEROS.getAuthName())) { + return KerberosSaslHelper.getKerberosProcessorFactory(saslServer, service); + } else { + return PlainSaslHelper.getPlainProcessorFactory(service); + } } } @@ -161,14 +175,11 @@ public String getIpAddress() { return saslServer != null ? saslServer.getRemoteAddress().toString() : null; } - /* perform kerberos login using the hadoop shim API if the configuration is available */ + // Perform kerberos login using the hadoop shim API if the configuration is available public static void loginFromKeytab(HiveConf hiveConf) throws IOException { String principal = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL); String keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB); - if (principal.isEmpty() && keyTabFile.isEmpty()) { - // no security configuration available - return; - } else if (!principal.isEmpty() && !keyTabFile.isEmpty()) { + if (!principal.isEmpty() && !keyTabFile.isEmpty()) { ShimLoader.getHadoopShims().loginUserFromKeytab(principal, keyTabFile); } else { throw new IOException ("HiveServer2 kerberos principal or keytab is not correctly configured"); @@ -289,7 +300,7 @@ public static void verifyProxyAccess(String realUser, String proxyUser, String i } if (!proxyUser.equalsIgnoreCase(realUser)) { ShimLoader.getHadoopShims(). - authorizeProxyAccess(proxyUser, sessionUgi, ipAddress, hiveConf); + authorizeProxyAccess(proxyUser, sessionUgi, ipAddress, hiveConf); } } catch (IOException e) { throw new HiveSQLException("Failed to validate proxy privilage of " + realUser + diff --git a/service/src/java/org/apache/hive/service/auth/HttpAuthUtils.java b/service/src/java/org/apache/hive/service/auth/HttpAuthUtils.java new file mode 100644 index 0000000..91d7188 --- /dev/null +++ b/service/src/java/org/apache/hive/service/auth/HttpAuthUtils.java @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hive.service.auth; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hive.service.cli.thrift.TCLIService; +import org.apache.hive.service.cli.thrift.TCLIService.Iface; +import org.apache.hive.service.cli.thrift.ThriftCLIService; +import org.apache.http.protocol.BasicHttpContext; +import org.apache.http.protocol.HttpContext; +import org.apache.thrift.TProcessor; +import org.apache.thrift.TProcessorFactory; +import org.apache.thrift.transport.TTransport; +import org.ietf.jgss.GSSContext; +import org.ietf.jgss.GSSCredential; +import org.ietf.jgss.GSSException; +import org.ietf.jgss.GSSManager; +import org.ietf.jgss.GSSName; +import org.ietf.jgss.Oid; + +/** + * + * Utility functions for http mode authentication + * + */ +public class HttpAuthUtils { + + public static final String WWW_AUTHENTICATE = "WWW-Authenticate"; + public static final String AUTHORIZATION = "Authorization"; + public static final String BASIC = "Basic"; + public static final String NEGOTIATE = "Negotiate"; + + public static class HttpCLIServiceProcessorFactory extends TProcessorFactory { + private final ThriftCLIService service; + private final HiveConf hiveConf; + private final boolean isDoAsEnabled; + + public HttpCLIServiceProcessorFactory(ThriftCLIService service) { + super(null); + this.service = service; + this.hiveConf = service.getHiveConf(); + this.isDoAsEnabled = hiveConf.getBoolVar( + HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS); + } + + @Override + public TProcessor getProcessor(TTransport trans) { + TProcessor baseProcessor = new TCLIService.Processor(service); + return isDoAsEnabled ? new HttpCLIServiceUGIProcessor(baseProcessor) : + baseProcessor; + } + } + + public static TProcessorFactory getAuthProcFactory(ThriftCLIService service) { + return new HttpCLIServiceProcessorFactory(service); + } + + /** + * + * @return Stringified Base64 encoded kerberosAuthHeader on success + * @throws GSSException + * @throws IOException + * @throws InterruptedException + */ + public static String getKerberosServiceTicket(String principal, + String host, String serverHttpUrl) + throws GSSException, IOException, InterruptedException { + UserGroupInformation clientUGI = getClientUGI("kerberos"); + String serverPrincipal = getServerPrincipal(principal, host); + // Uses the Ticket Granting Ticket in the UserGroupInformation + return clientUGI.doAs(new HttpKerberosClientAction(serverPrincipal, + clientUGI.getShortUserName(), serverHttpUrl)); + } + + /** + * Get server pricipal and verify that hostname is present + * @return + * @throws IOException + */ + private static String getServerPrincipal(String principal, String host) + throws IOException { + return ShimLoader.getHadoopThriftAuthBridge().getServerPrincipal( + principal, host); + } + + /** + * JAAS login to setup the client UserGroupInformation. + * Sets up the kerberos Ticket Granting Ticket, + * in the client UserGroupInformation object + * @return Client's UserGroupInformation + * @throws IOException + */ + public static UserGroupInformation getClientUGI(String authType) + throws IOException { + return ShimLoader.getHadoopThriftAuthBridge().getCurrentUGIWithConf(authType); + } + + /** + * + * HttpKerberosClientAction + * + */ + public static class HttpKerberosClientAction implements + PrivilegedExceptionAction { + String serverPrincipal; + String clientUserName; + String serverHttpUrl; + private final Base64 base64codec; + public static final String HTTP_RESPONSE = "HTTP_RESPONSE"; + public static final String SERVER_HTTP_URL = "SERVER_HTTP_URL"; + private final HttpContext httpContext; + + public HttpKerberosClientAction(String serverPrincipal, + String clientUserName, String serverHttpUrl) { + this.serverPrincipal = serverPrincipal; + this.clientUserName = clientUserName; + this.serverHttpUrl = serverHttpUrl; + this.base64codec = new Base64(0); + this.httpContext = new BasicHttpContext(); + httpContext.setAttribute(SERVER_HTTP_URL, serverHttpUrl); + } + + @Override + public String run() throws Exception { + // This Oid for Kerberos GSS-API mechanism. + Oid mechOid = new Oid("1.2.840.113554.1.2.2"); + // Oid for kerberos principal name + Oid krb5PrincipalOid = new Oid("1.2.840.113554.1.2.2.1"); + + GSSManager manager = GSSManager.getInstance(); + + // GSS name for client + GSSName clientName = manager.createName(clientUserName, GSSName.NT_USER_NAME); + // GSS name for server + GSSName serverName = manager.createName(serverPrincipal, krb5PrincipalOid); + + // GSS credentials for client + GSSCredential clientCreds = manager.createCredential(clientName, + GSSCredential.DEFAULT_LIFETIME, mechOid, + GSSCredential.INITIATE_ONLY); + + /* + * Create a GSSContext for mutual authentication with the + * server. + * - serverName is the GSSName that represents the server. + * - krb5Oid is the Oid that represents the mechanism to + * use. The client chooses the mechanism to use. + * - clientCreds are the client credentials + */ + GSSContext gssContext = manager.createContext(serverName, + mechOid, clientCreds, GSSContext.DEFAULT_LIFETIME); + + // Mutual authentication not r + gssContext.requestMutualAuth(false); + + // Estabilish context + byte[] inToken = new byte[0]; + byte[] outToken; + + outToken = gssContext.initSecContext(inToken, 0, inToken.length); + + gssContext.dispose(); + // Base64 encoded and stringified token for server + String authHeaderBase64String = new String(base64codec.encode(outToken)); + return authHeaderBase64String; + } + } +} diff --git a/service/src/java/org/apache/hive/service/auth/HttpAuthenticationException.java b/service/src/java/org/apache/hive/service/auth/HttpAuthenticationException.java new file mode 100644 index 0000000..bf6be47 --- /dev/null +++ b/service/src/java/org/apache/hive/service/auth/HttpAuthenticationException.java @@ -0,0 +1,42 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package org.apache.hive.service.auth; + +public class HttpAuthenticationException extends Exception{ + static final long serialVersionUID = 0; + + /** + * @param cause original exception. + */ + public HttpAuthenticationException(Throwable cause) { + super(cause); + } + + /** + * @param msg exception message. + */ + public HttpAuthenticationException(String msg) { + super(msg); + } + + /** + * @param msg exception message. + * @param cause original exception. + */ + public HttpAuthenticationException(String msg, Throwable cause) { + super(msg, cause); + } + +} diff --git a/service/src/java/org/apache/hive/service/auth/HttpCLIServiceUGIProcessor.java b/service/src/java/org/apache/hive/service/auth/HttpCLIServiceUGIProcessor.java new file mode 100644 index 0000000..8368938 --- /dev/null +++ b/service/src/java/org/apache/hive/service/auth/HttpCLIServiceUGIProcessor.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.auth; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; + +import org.apache.hadoop.hive.shims.HadoopShims; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hive.service.cli.session.SessionManager; +import org.apache.thrift.TException; +import org.apache.thrift.TProcessor; +import org.apache.thrift.protocol.TProtocol; + +/** + * + * Wraps the underlying thrift processor's process call, + * to assume the client user's UGI/Subject for the doAs calls. + * Gets the client's username from a threadlocal in SessionManager which is + * set in the ThriftHttpServlet, and constructs a client UGI object from that. + * + */ + +public class HttpCLIServiceUGIProcessor implements TProcessor { + + private final TProcessor underlyingProcessor; + private final HadoopShims shim; + + public HttpCLIServiceUGIProcessor(TProcessor underlyingProcessor) { + this.underlyingProcessor = underlyingProcessor; + this.shim = ShimLoader.getHadoopShims(); + } + + @Override + public boolean process(final TProtocol in, final TProtocol out) throws TException { + /** + * Build the client UGI from threadlocal username [SessionManager.getUserName()]. + * The threadlocal username is set in the ThriftHttpServlet. + */ + UserGroupInformation clientUgi = null; + try { + clientUgi = shim.createRemoteUser(SessionManager.getUserName(), new ArrayList()); + return shim.doAs(clientUgi, new PrivilegedExceptionAction() { + @Override + public Boolean run() { + try { + return underlyingProcessor.process(in, out); + } catch (TException te) { + throw new RuntimeException(te); + } + } + }); + } + catch (RuntimeException rte) { + if (rte.getCause() instanceof TException) { + throw (TException)rte.getCause(); + } + throw rte; + } catch (InterruptedException ie) { + throw new RuntimeException(ie); // unexpected! + } catch (IOException ioe) { + throw new RuntimeException(ioe); // unexpected! + } + } +} diff --git a/service/src/java/org/apache/hive/service/cli/CLIService.java b/service/src/java/org/apache/hive/service/cli/CLIService.java index bdc943e..403c140 100644 --- a/service/src/java/org/apache/hive/service/cli/CLIService.java +++ b/service/src/java/org/apache/hive/service/cli/CLIService.java @@ -34,11 +34,13 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.service.CompositeService; import org.apache.hive.service.ServiceException; import org.apache.hive.service.auth.HiveAuthFactory; @@ -64,8 +66,7 @@ private HiveConf hiveConf; private SessionManager sessionManager; private IMetaStoreClient metastoreClient; - private String serverUserName = null; - + private UserGroupInformation serviceUGI; public CLIService() { super("CLIService"); @@ -74,21 +75,29 @@ public CLIService() { @Override public synchronized void init(HiveConf hiveConf) { this.hiveConf = hiveConf; - sessionManager = new SessionManager(); addService(sessionManager); - try { - HiveAuthFactory.loginFromKeytab(hiveConf); - serverUserName = ShimLoader.getHadoopShims(). - getShortUserName(ShimLoader.getHadoopShims().getUGIForConf(hiveConf)); - } catch (IOException e) { - throw new ServiceException("Unable to login to kerberos with given principal/keytab", e); - } catch (LoginException e) { - throw new ServiceException("Unable to login to kerberos with given principal/keytab", e); + /** + * If auth mode is Kerberos, do a kerberos login for the service from the keytab + */ + if (hiveConf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION).equalsIgnoreCase( + HiveAuthFactory.AuthTypes.KERBEROS.toString())) { + try { + HiveAuthFactory.loginFromKeytab(hiveConf); + this.serviceUGI = ShimLoader.getHadoopShims().getUGIForConf(hiveConf); + } catch (IOException e) { + throw new ServiceException("Unable to login to kerberos with given principal/keytab", e); + } catch (LoginException e) { + throw new ServiceException("Unable to login to kerberos with given principal/keytab", e); + } } super.init(hiveConf); } + public UserGroupInformation getServiceUGI() { + return this.serviceUGI; + } + @Override public synchronized void start() { super.start(); @@ -441,7 +450,7 @@ public String getDelegationToken(SessionHandle sessionHandle, HiveAuthFactory au public void cancelDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory, String tokenStr) throws HiveSQLException { sessionManager.getSession(sessionHandle). - cancelDelegationToken(authFactory, tokenStr); + cancelDelegationToken(authFactory, tokenStr); LOG.info(sessionHandle + ": cancelDelegationToken()"); } diff --git a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java index 4545d2b..eaf2aa0 100644 --- a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java +++ b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java @@ -18,7 +18,6 @@ package org.apache.hive.service.cli.session; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -32,9 +31,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.hooks.HookUtils; -import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.CompositeService; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.SessionHandle; @@ -104,11 +100,11 @@ public SessionHandle openSession(TProtocolVersion protocol, String username, Str public SessionHandle openSession(TProtocolVersion protocol, String username, String password, Map sessionConf, boolean withImpersonation, String delegationToken) - throws HiveSQLException { + throws HiveSQLException { HiveSession session; if (withImpersonation) { HiveSessionImplwithUGI hiveSessionUgi = new HiveSessionImplwithUGI(protocol, username, password, - hiveConf, sessionConf, threadLocalIpAddress.get(), delegationToken); + hiveConf, sessionConf, threadLocalIpAddress.get(), delegationToken); session = HiveSessionProxy.getProxy(hiveSessionUgi, hiveSessionUgi.getSessionUgi()); hiveSessionUgi.setProxySession(session); } else { @@ -158,7 +154,7 @@ public static void setIpAddress(String ipAddress) { threadLocalIpAddress.set(ipAddress); } - private void clearIpAddress() { + public static void clearIpAddress() { threadLocalIpAddress.remove(); } @@ -177,7 +173,7 @@ public static void setUserName(String userName) { threadLocalUserName.set(userName); } - private void clearUserName() { + public static void clearUserName() { threadLocalUserName.remove(); } diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 0c9ac37..0835770 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -29,8 +29,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.service.AbstractService; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.cli.CLIService; @@ -222,16 +220,31 @@ private String getUserName(TOpenSessionReq req) throws HiveSQLException { return getProxyUser(userName, req.getConfiguration(), getIpAddress()); } + /** + * Create a session handle + * @param req + * @param res + * @return + * @throws HiveSQLException + * @throws LoginException + * @throws IOException + */ SessionHandle getSessionHandle(TOpenSessionReq req, TOpenSessionResp res) throws HiveSQLException, LoginException, IOException { - String userName = getUserName(req); - TProtocolVersion protocol = getMinVersion(CLIService.SERVER_VERSION, req.getClient_protocol()); - + TProtocolVersion protocol = getMinVersion(CLIService.SERVER_VERSION, + req.getClient_protocol()); SessionHandle sessionHandle; if (cliService.getHiveConf().getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS) && (userName != null)) { String delegationTokenStr = null; + // In case of http transport mode, we set the thread local username, + // while handling each request (in ThriftHttpServlet), + // since SASL layer is not used in HTTP Kerberos. + if (cliService.getHiveConf().getVar( + ConfVars.HIVE_SERVER2_TRANSPORT_MODE).equalsIgnoreCase("http")) { + userName = SessionManager.getUserName(); + } try { delegationTokenStr = cliService.getDelegationTokenFromMetaStore(userName); } catch (UnsupportedOperationException e) { diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java index a6ff6ce..cb01cfd 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java @@ -20,9 +20,13 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Shell; +import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.auth.HiveAuthFactory.AuthTypes; import org.apache.hive.service.cli.CLIService; +import org.apache.thrift.TProcessor; +import org.apache.thrift.TProcessorFactory; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.server.TServlet; @@ -59,32 +63,20 @@ public void run() { minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MIN_WORKER_THREADS); maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_WORKER_THREADS); - String httpPath = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH); - // The config parameter can be like "path", "/path", "/path/", "path/*", "/path1/path2/*" and so on. - // httpPath should end up as "/*", "/path/*" or "/path1/../pathN/*" - if(httpPath == null || httpPath.equals("")) { - httpPath = "/*"; - } - else { - if(!httpPath.startsWith("/")) { - httpPath = "/" + httpPath; - } - if(httpPath.endsWith("/")) { - httpPath = httpPath + "*"; - } - if(!httpPath.endsWith("/*")) { - httpPath = httpPath + "/*"; - } - } + String httpPath = getHttpPath(hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH)); httpServer = new org.eclipse.jetty.server.Server(); QueuedThreadPool threadPool = new QueuedThreadPool(); threadPool.setMinThreads(minWorkerThreads); threadPool.setMaxThreads(maxWorkerThreads); httpServer.setThreadPool(threadPool); - SelectChannelConnector connector; - Boolean useSsl = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL); + + SelectChannelConnector connector = new SelectChannelConnector();; + boolean useSsl = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL); String schemeName = useSsl ? "https" : "http"; + String authType = hiveConf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION); + // Set during the init phase of HiveServer2 if auth mode is kerberos + UserGroupInformation serviceUGI = cliService.getServiceUGI(); if (useSsl) { String keyStorePath = hiveConf.getVar(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH).trim(); @@ -97,8 +89,6 @@ public void run() { sslContextFactory.setKeyStorePath(keyStorePath); sslContextFactory.setKeyStorePassword(keyStorePassword); connector = new SslSelectChannelConnector(sslContextFactory); - } else { - connector = new SelectChannelConnector(); } connector.setPort(portNum); @@ -106,14 +96,18 @@ public void run() { connector.setReuseAddress(!Shell.WINDOWS); httpServer.addConnector(connector); - TCLIService.Processor processor = - new TCLIService.Processor(new EmbeddedThriftBinaryCLIService()); + hiveAuthFactory = new HiveAuthFactory(); + TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this); + TProcessor processor = processorFactory.getProcessor(null); TProtocolFactory protocolFactory = new TBinaryProtocol.Factory(); - TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory); + TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, + authType, serviceUGI); - final ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); + final ServletContextHandler context = new ServletContextHandler( + ServletContextHandler.SESSIONS); context.setContextPath("/"); + httpServer.setHandler(context); context.addServlet(new ServletHolder(thriftHttpServlet), httpPath); @@ -130,39 +124,53 @@ public void run() { } /** + * The config parameter can be like "path", "/path", "/path/", "path/*", "/path1/path2/*" and so on. + * httpPath should end up as "/*", "/path/*" or "/path1/../pathN/*" + * @param httpPath + * @return + */ + private String getHttpPath(String httpPath) { + if(httpPath == null || httpPath.equals("")) { + httpPath = "/*"; + } + else { + if(!httpPath.startsWith("/")) { + httpPath = "/" + httpPath; + } + if(httpPath.endsWith("/")) { + httpPath = httpPath + "*"; + } + if(!httpPath.endsWith("/*")) { + httpPath = httpPath + "/*"; + } + } + return httpPath; + } + + /** * Verify that this configuration is supported by transportMode of HTTP * @param hiveConf */ private static void verifyHttpConfiguration(HiveConf hiveConf) { String authType = hiveConf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION); - // error out if KERBEROS or LDAP mode is being used, it is not supported - if(authType.equalsIgnoreCase(AuthTypes.KERBEROS.toString()) || - authType.equalsIgnoreCase(AuthTypes.LDAP.toString()) || - authType.equalsIgnoreCase(AuthTypes.CUSTOM.toString())) { + // Error out if KERBEROS auth mode is being used and use SSL is also set to true + if(authType.equalsIgnoreCase(AuthTypes.KERBEROS.toString()) && + hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL)) { String msg = ConfVars.HIVE_SERVER2_AUTHENTICATION + " setting of " + - authType + " is currently not supported with " + - ConfVars.HIVE_SERVER2_TRANSPORT_MODE + " setting of http"; + authType + " is not supported with " + + ConfVars.HIVE_SERVER2_USE_SSL + " set to true"; LOG.fatal(msg); throw new RuntimeException(msg); } - // Throw exception here + // Warn that SASL is not used in http mode if(authType.equalsIgnoreCase(AuthTypes.NONE.toString())) { // NONE in case of thrift mode uses SASL LOG.warn(ConfVars.HIVE_SERVER2_AUTHENTICATION + " setting to " + - authType + ". SASL is not supported with http transportMode," + + authType + ". SASL is not supported with http transport mode," + " so using equivalent of " + AuthTypes.NOSASL); } - - // doAs is currently not supported with http - if(hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS)) { - String msg = ConfVars.HIVE_SERVER2_ENABLE_DOAS + " setting of " + - "true is currently not supported with " + - ConfVars.HIVE_SERVER2_TRANSPORT_MODE + " setting of http"; - LOG.fatal(msg); - throw new RuntimeException(msg); - } } } \ No newline at end of file diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java index e77f043..255a165 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java @@ -19,6 +19,7 @@ package org.apache.hive.service.cli.thrift; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; @@ -28,61 +29,252 @@ import org.apache.commons.codec.binary.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hive.service.auth.AuthenticationProviderFactory; +import org.apache.hive.service.auth.AuthenticationProviderFactory.AuthMethods; +import org.apache.hive.service.auth.HiveAuthFactory; +import org.apache.hive.service.auth.HttpAuthUtils; +import org.apache.hive.service.auth.HttpAuthenticationException; +import org.apache.hive.service.auth.PasswdAuthenticationProvider; +import org.apache.hive.service.cli.session.SessionManager; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.server.TServlet; +import org.ietf.jgss.GSSContext; +import org.ietf.jgss.GSSCredential; +import org.ietf.jgss.GSSException; +import org.ietf.jgss.GSSManager; +import org.ietf.jgss.GSSName; +import org.ietf.jgss.Oid; +/** + * + * ThriftHttpServlet + * + */ public class ThriftHttpServlet extends TServlet { private static final long serialVersionUID = 1L; public static final Log LOG = LogFactory.getLog(ThriftHttpServlet.class.getName()); + private final String authType; + private final UserGroupInformation serviceUGI; - public ThriftHttpServlet(TProcessor processor, TProtocolFactory protocolFactory) { + public ThriftHttpServlet(TProcessor processor, TProtocolFactory protocolFactory, + String authType, UserGroupInformation serviceUGI) { super(processor, protocolFactory); + this.authType = authType; + this.serviceUGI = serviceUGI; } @Override protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { - logRequestHeader(request); - super.doPost(request, response); - } + String clientUserName; + try { + // For a kerberos setup + if(isKerberosAuthMode(authType)) { + clientUserName = doKerberosAuth(request, serviceUGI); + } + else { + clientUserName = doPasswdAuth(request, authType); + } - protected void logRequestHeader(HttpServletRequest request) { - String authHeaderBase64 = request.getHeader("Authorization"); - if(authHeaderBase64 == null) { - LOG.warn("ThriftHttpServlet: no HTTP Authorization header"); + LOG.info("Client username: " + clientUserName); + + // Set the thread local username to be used for doAs if true + SessionManager.setUserName(clientUserName); + super.doPost(request, response); } - else { - if(!authHeaderBase64.startsWith("Basic")) { - LOG.warn("ThriftHttpServlet: HTTP Authorization header exists but is not Basic."); + catch (HttpAuthenticationException e) { + // Send a 403 to the client + LOG.error("Error: ", e); + response.setContentType("application/x-thrift"); + response.setStatus(HttpServletResponse.SC_FORBIDDEN); + // Send the response back to the client + response.getWriter().println("Authentication Error: " + e.getMessage()); + } + finally { + // Clear the thread local username since we set it in each http request + SessionManager.clearUserName(); + } + } + + /** + * Do the LDAP/PAM authentication + * @param request + * @param authType + * @throws HttpAuthenticationException + */ + private String doPasswdAuth(HttpServletRequest request, String authType) + throws HttpAuthenticationException { + String userName = getUsername(request, authType); + // No-op when authType is NOSASL + if (!authType.equalsIgnoreCase(HiveAuthFactory.AuthTypes.NOSASL.toString())) { + try { + AuthMethods authMethod = AuthMethods.getValidAuthMethod(authType); + PasswdAuthenticationProvider provider = + AuthenticationProviderFactory.getAuthenticationProvider(authMethod); + provider.Authenticate(userName, getPassword(request, authType)); + + } catch (Exception e) { + throw new HttpAuthenticationException(e); } - else if(LOG.isDebugEnabled()) { - String authHeaderBase64_Payload = authHeaderBase64.substring("Basic ".length()); - String authHeaderString = StringUtils.newStringUtf8( - Base64.decodeBase64(authHeaderBase64_Payload.getBytes())); - String[] creds = authHeaderString.split(":"); - String username = null; - String password = null; - - if(creds.length >= 1) { - username = creds[0]; - } - if(creds.length >= 2) { - password = creds[1]; - } - if(password == null || password.equals("null") || password.equals("")) { - password = ""; + } + return userName; + } + + /** + * Do the GSS-API kerberos authentication. + * We already have a logged in subject in the form of serviceUGI, + * which GSS-API will extract information from. + * @param request + * @return + * @throws HttpAuthenticationException + */ + private String doKerberosAuth(HttpServletRequest request, + UserGroupInformation serviceUGI) throws HttpAuthenticationException { + try { + return serviceUGI.doAs(new HttpKerberosServerAction(request, serviceUGI)); + } catch (Exception e) { + throw new HttpAuthenticationException(e); + } + } + + class HttpKerberosServerAction implements PrivilegedExceptionAction { + HttpServletRequest request; + UserGroupInformation serviceUGI; + + HttpKerberosServerAction(HttpServletRequest request, + UserGroupInformation serviceUGI) { + this.request = request; + this.serviceUGI = serviceUGI; + } + + @Override + public String run() throws HttpAuthenticationException { + // Get own Kerberos credentials for accepting connection + GSSManager manager = GSSManager.getInstance(); + GSSContext gssContext = null; + String serverPrincipal = getPrincipalWithoutRealm( + serviceUGI.getUserName()); + try { + // This Oid for Kerberos GSS-API mechanism. + Oid mechOid = new Oid("1.2.840.113554.1.2.2"); + // Oid for kerberos principal name + Oid krb5PrincipalOid = new Oid("1.2.840.113554.1.2.2.1"); + + // GSS name for server + GSSName serverName = manager.createName(serverPrincipal, krb5PrincipalOid); + + // GSS credentials for server + GSSCredential serverCreds = manager.createCredential(serverName, + GSSCredential.DEFAULT_LIFETIME, mechOid, GSSCredential.ACCEPT_ONLY); + + // Create a GSS context + gssContext = manager.createContext(serverCreds); + + // Get service ticket from the authorization header + String serviceTicketBase64 = getAuthHeader(request, authType); + byte[] inToken = Base64.decodeBase64(serviceTicketBase64.getBytes()); + + gssContext.acceptSecContext(inToken, 0, inToken.length); + // Authenticate or deny based on its context completion + if (!gssContext.isEstablished()) { + throw new HttpAuthenticationException("Kerberos authentication failed: " + + "unable to establish context with the service ticket " + + "provided by the client."); } else { - // don't log the actual password. - password = "******"; + return getPrincipalWithoutRealm(gssContext.getSrcName().toString()); } - LOG.debug("HttpServlet: HTTP Authorization header:: username=" + username + - " password=" + password); } + catch (GSSException e) { + throw new HttpAuthenticationException("Kerberos authentication failed: ", e); + } + finally { + if (gssContext != null) { + try { + gssContext.dispose(); + } catch (GSSException e) { + // No-op + } + } + } + } + + private String getPrincipalWithoutRealm(String fullPrincipal) { + String names[] = fullPrincipal.split("[@]"); + return names[0]; + } + } + + private String getUsername(HttpServletRequest request, String authType) + throws HttpAuthenticationException { + String creds[] = getAuthHeaderTokens(request, authType); + // Username must be present + if (creds[0] == null || creds[0].isEmpty()) { + throw new HttpAuthenticationException("Authorization header received " + + "from the client does not contain username."); + } + return creds[0]; + } + + private String getPassword(HttpServletRequest request, String authType) + throws HttpAuthenticationException { + String creds[] = getAuthHeaderTokens(request, authType); + // Password must be present + if (creds[1] == null || creds[1].isEmpty()) { + throw new HttpAuthenticationException("Authorization header received " + + "from the client does not contain username."); } + return creds[1]; } + private String[] getAuthHeaderTokens(HttpServletRequest request, + String authType) throws HttpAuthenticationException { + String authHeaderBase64 = getAuthHeader(request, authType); + String authHeaderString = StringUtils.newStringUtf8( + Base64.decodeBase64(authHeaderBase64.getBytes())); + String[] creds = authHeaderString.split(":"); + return creds; + } + + /** + * Returns the base64 encoded auth header payload + * @param request + * @param authType + * @return + * @throws HttpAuthenticationException + */ + private String getAuthHeader(HttpServletRequest request, String authType) + throws HttpAuthenticationException { + String authHeader = request.getHeader(HttpAuthUtils.AUTHORIZATION); + // Each http request must have an Authorization header + if (authHeader == null || authHeader.isEmpty()) { + throw new HttpAuthenticationException("Authorization header received " + + "from the client is empty."); + } + + String authHeaderBase64String; + int beginIndex; + if (isKerberosAuthMode(authType)) { + beginIndex = (HttpAuthUtils.NEGOTIATE + " ").length(); + } + else { + beginIndex = (HttpAuthUtils.BASIC + " ").length(); + } + authHeaderBase64String = authHeader.substring(beginIndex); + // Authorization header must have a payload + if (authHeaderBase64String == null || authHeaderBase64String.isEmpty()) { + throw new HttpAuthenticationException("Authorization header received " + + "from the client does not contain any data."); + } + return authHeaderBase64String; + } + + private boolean isKerberosAuthMode(String authType) { + return authType.equalsIgnoreCase(HiveAuthFactory.AuthTypes.KERBEROS.toString()); + } } + diff --git a/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java b/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java index e3f3e38..9e296de 100644 --- a/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java +++ b/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java @@ -66,99 +66,121 @@ import org.apache.thrift.transport.TTransportException; import org.apache.thrift.transport.TTransportFactory; - /** - * Functions that bridge Thrift's SASL transports to Hadoop's - * SASL callback handlers and authentication classes. - */ - public class HadoopThriftAuthBridge20S extends HadoopThriftAuthBridge { - static final Log LOG = LogFactory.getLog(HadoopThriftAuthBridge.class); - - @Override - public Client createClient() { - return new Client(); - } - - @Override - public Client createClientWithConf(String authType) { - Configuration conf = new Configuration(); - conf.set(HADOOP_SECURITY_AUTHENTICATION, authType); - UserGroupInformation.setConfiguration(conf); - return new Client(); - } - - @Override - public Server createServer(String keytabFile, String principalConf) throws TTransportException { - return new Server(keytabFile, principalConf); - } - - /** - * Read and return Hadoop SASL configuration which can be configured using - * "hadoop.rpc.protection" - * @param conf - * @return Hadoop SASL configuration - */ - @Override - public Map getHadoopSaslProperties(Configuration conf) { - // Initialize the SaslRpcServer to ensure QOP parameters are read from conf - SaslRpcServer.init(conf); - return SaslRpcServer.SASL_PROPS; - } - - public static class Client extends HadoopThriftAuthBridge.Client { - /** - * Create a client-side SASL transport that wraps an underlying transport. - * - * @param method The authentication method to use. Currently only KERBEROS is - * supported. - * @param serverPrincipal The Kerberos principal of the target server. - * @param underlyingTransport The underlying transport mechanism, usually a TSocket. - * @param saslProps the sasl properties to create the client with - */ - - @Override - public TTransport createClientTransport( - String principalConfig, String host, - String methodStr, String tokenStrForm, TTransport underlyingTransport, - Map saslProps) throws IOException { - AuthMethod method = AuthMethod.valueOf(AuthMethod.class, methodStr); - - TTransport saslTransport = null; - switch (method) { - case DIGEST: - Token t= new Token(); - t.decodeFromUrlString(tokenStrForm); - saslTransport = new TSaslClientTransport( +/** + * Functions that bridge Thrift's SASL transports to Hadoop's + * SASL callback handlers and authentication classes. + */ +public class HadoopThriftAuthBridge20S extends HadoopThriftAuthBridge { + static final Log LOG = LogFactory.getLog(HadoopThriftAuthBridge.class); + + @Override + public Client createClient() { + return new Client(); + } + + @Override + public Client createClientWithConf(String authType) { + Configuration conf = new Configuration(); + conf.set(HADOOP_SECURITY_AUTHENTICATION, authType); + UserGroupInformation.setConfiguration(conf); + return new Client(); + } + + @Override + public Server createServer(String keytabFile, String principalConf) throws TTransportException { + return new Server(keytabFile, principalConf); + } + + @Override + public String getServerPrincipal(String principalConfig, String host) + throws IOException { + String serverPrincipal = SecurityUtil.getServerPrincipal(principalConfig, host); + String names[] = SaslRpcServer.splitKerberosName(serverPrincipal); + if (names.length != 3) { + throw new IOException( + "Kerberos principal name does NOT have the expected hostname part: " + + serverPrincipal); + } + return serverPrincipal; + } + + @Override + public UserGroupInformation getCurrentUGIWithConf(String authType) + throws IOException { + Configuration conf = new Configuration(); + conf.set(HADOOP_SECURITY_AUTHENTICATION, authType); + UserGroupInformation.setConfiguration(conf); + return UserGroupInformation.getCurrentUser(); + } + + /** + * Read and return Hadoop SASL configuration which can be configured using + * "hadoop.rpc.protection" + * @param conf + * @return Hadoop SASL configuration + */ + @Override + public Map getHadoopSaslProperties(Configuration conf) { + // Initialize the SaslRpcServer to ensure QOP parameters are read from conf + SaslRpcServer.init(conf); + return SaslRpcServer.SASL_PROPS; + } + + public static class Client extends HadoopThriftAuthBridge.Client { + /** + * Create a client-side SASL transport that wraps an underlying transport. + * + * @param method The authentication method to use. Currently only KERBEROS is + * supported. + * @param serverPrincipal The Kerberos principal of the target server. + * @param underlyingTransport The underlying transport mechanism, usually a TSocket. + * @param saslProps the sasl properties to create the client with + */ + + @Override + public TTransport createClientTransport( + String principalConfig, String host, + String methodStr, String tokenStrForm, TTransport underlyingTransport, + Map saslProps) throws IOException { + AuthMethod method = AuthMethod.valueOf(AuthMethod.class, methodStr); + + TTransport saslTransport = null; + switch (method) { + case DIGEST: + Token t= new Token(); + t.decodeFromUrlString(tokenStrForm); + saslTransport = new TSaslClientTransport( method.getMechanismName(), null, null, SaslRpcServer.SASL_DEFAULT_REALM, saslProps, new SaslClientCallbackHandler(t), underlyingTransport); - return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser()); - - case KERBEROS: - String serverPrincipal = SecurityUtil.getServerPrincipal(principalConfig, host); - String names[] = SaslRpcServer.splitKerberosName(serverPrincipal); - if (names.length != 3) { - throw new IOException( - "Kerberos principal name does NOT have the expected hostname part: " - + serverPrincipal); - } - try { - saslTransport = new TSaslClientTransport( - method.getMechanismName(), - null, - names[0], names[1], - saslProps, null, - underlyingTransport); - return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser()); - } catch (SaslException se) { - throw new IOException("Could not instantiate SASL transport", se); - } - - default: - throw new IOException("Unsupported authentication method: " + method); - } - } + return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser()); + + case KERBEROS: + String serverPrincipal = SecurityUtil.getServerPrincipal(principalConfig, host); + String names[] = SaslRpcServer.splitKerberosName(serverPrincipal); + if (names.length != 3) { + throw new IOException( + "Kerberos principal name does NOT have the expected hostname part: " + + serverPrincipal); + } + try { + saslTransport = new TSaslClientTransport( + method.getMechanismName(), + null, + names[0], names[1], + saslProps, null, + underlyingTransport); + return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser()); + } catch (SaslException se) { + throw new IOException("Could not instantiate SASL transport", se); + } + + default: + throw new IOException("Unsupported authentication method: " + method); + } + } private static class SaslClientCallbackHandler implements CallbackHandler { private final String userName; private final char[] userPassword; @@ -168,8 +190,9 @@ public SaslClientCallbackHandler(Token token) { this.userPassword = encodePassword(token.getPassword()); } + @Override public void handle(Callback[] callbacks) - throws UnsupportedCallbackException { + throws UnsupportedCallbackException { NameCallback nc = null; PasswordCallback pc = null; RealmCallback rc = null; @@ -214,253 +237,254 @@ static String encodeIdentifier(byte[] identifier) { static char[] encodePassword(byte[] password) { return new String(Base64.encodeBase64(password)).toCharArray(); - } - } - } - - public static class Server extends HadoopThriftAuthBridge.Server { - final UserGroupInformation realUgi; - DelegationTokenSecretManager secretManager; - private final static long DELEGATION_TOKEN_GC_INTERVAL = 3600000; // 1 hour - //Delegation token related keys - public static final String DELEGATION_KEY_UPDATE_INTERVAL_KEY = - "hive.cluster.delegation.key.update-interval"; - public static final long DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT = - 24*60*60*1000; // 1 day - public static final String DELEGATION_TOKEN_RENEW_INTERVAL_KEY = - "hive.cluster.delegation.token.renew-interval"; - public static final long DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT = - 24*60*60*1000; // 1 day - public static final String DELEGATION_TOKEN_MAX_LIFETIME_KEY = - "hive.cluster.delegation.token.max-lifetime"; - public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT = - 7*24*60*60*1000; // 7 days - public static final String DELEGATION_TOKEN_STORE_CLS = - "hive.cluster.delegation.token.store.class"; - public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR = - "hive.cluster.delegation.token.store.zookeeper.connectString"; - public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS = - "hive.cluster.delegation.token.store.zookeeper.connectTimeoutMillis"; - public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE = - "hive.cluster.delegation.token.store.zookeeper.znode"; - public static final String DELEGATION_TOKEN_STORE_ZK_ACL = - "hive.cluster.delegation.token.store.zookeeper.acl"; - public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT = - "/hive/cluster/delegation"; - - public Server() throws TTransportException { - try { - realUgi = UserGroupInformation.getCurrentUser(); - } catch (IOException ioe) { - throw new TTransportException(ioe); - } - } - /** - * Create a server with a kerberos keytab/principal. - */ - protected Server(String keytabFile, String principalConf) - throws TTransportException { - if (keytabFile == null || keytabFile.isEmpty()) { - throw new TTransportException("No keytab specified"); - } - if (principalConf == null || principalConf.isEmpty()) { - throw new TTransportException("No principal specified"); - } - - // Login from the keytab - String kerberosName; - try { - kerberosName = - SecurityUtil.getServerPrincipal(principalConf, "0.0.0.0"); - UserGroupInformation.loginUserFromKeytab( - kerberosName, keytabFile); - realUgi = UserGroupInformation.getLoginUser(); - assert realUgi.isFromKeytab(); - } catch (IOException ioe) { - throw new TTransportException(ioe); - } - } - - /** - * Create a TTransportFactory that, upon connection of a client socket, - * negotiates a Kerberized SASL transport. The resulting TTransportFactory - * can be passed as both the input and output transport factory when - * instantiating a TThreadPoolServer, for example. - * - * @param saslProps Map of SASL properties - */ - @Override - public TTransportFactory createTransportFactory(Map saslProps) - throws TTransportException { - // Parse out the kerberos principal, host, realm. - String kerberosName = realUgi.getUserName(); - final String names[] = SaslRpcServer.splitKerberosName(kerberosName); - if (names.length != 3) { - throw new TTransportException("Kerberos principal should have 3 parts: " + kerberosName); - } - - TSaslServerTransport.Factory transFactory = new TSaslServerTransport.Factory(); - transFactory.addServerDefinition( - AuthMethod.KERBEROS.getMechanismName(), - names[0], names[1], // two parts of kerberos principal - saslProps, - new SaslRpcServer.SaslGssCallbackHandler()); - transFactory.addServerDefinition(AuthMethod.DIGEST.getMechanismName(), + } + } + } + + public static class Server extends HadoopThriftAuthBridge.Server { + final UserGroupInformation realUgi; + DelegationTokenSecretManager secretManager; + private final static long DELEGATION_TOKEN_GC_INTERVAL = 3600000; // 1 hour + //Delegation token related keys + public static final String DELEGATION_KEY_UPDATE_INTERVAL_KEY = + "hive.cluster.delegation.key.update-interval"; + public static final long DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT = + 24*60*60*1000; // 1 day + public static final String DELEGATION_TOKEN_RENEW_INTERVAL_KEY = + "hive.cluster.delegation.token.renew-interval"; + public static final long DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT = + 24*60*60*1000; // 1 day + public static final String DELEGATION_TOKEN_MAX_LIFETIME_KEY = + "hive.cluster.delegation.token.max-lifetime"; + public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT = + 7*24*60*60*1000; // 7 days + public static final String DELEGATION_TOKEN_STORE_CLS = + "hive.cluster.delegation.token.store.class"; + public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR = + "hive.cluster.delegation.token.store.zookeeper.connectString"; + public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS = + "hive.cluster.delegation.token.store.zookeeper.connectTimeoutMillis"; + public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE = + "hive.cluster.delegation.token.store.zookeeper.znode"; + public static final String DELEGATION_TOKEN_STORE_ZK_ACL = + "hive.cluster.delegation.token.store.zookeeper.acl"; + public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT = + "/hive/cluster/delegation"; + + public Server() throws TTransportException { + try { + realUgi = UserGroupInformation.getCurrentUser(); + } catch (IOException ioe) { + throw new TTransportException(ioe); + } + } + /** + * Create a server with a kerberos keytab/principal. + */ + protected Server(String keytabFile, String principalConf) + throws TTransportException { + if (keytabFile == null || keytabFile.isEmpty()) { + throw new TTransportException("No keytab specified"); + } + if (principalConf == null || principalConf.isEmpty()) { + throw new TTransportException("No principal specified"); + } + + // Login from the keytab + String kerberosName; + try { + kerberosName = + SecurityUtil.getServerPrincipal(principalConf, "0.0.0.0"); + UserGroupInformation.loginUserFromKeytab( + kerberosName, keytabFile); + realUgi = UserGroupInformation.getLoginUser(); + assert realUgi.isFromKeytab(); + } catch (IOException ioe) { + throw new TTransportException(ioe); + } + } + + /** + * Create a TTransportFactory that, upon connection of a client socket, + * negotiates a Kerberized SASL transport. The resulting TTransportFactory + * can be passed as both the input and output transport factory when + * instantiating a TThreadPoolServer, for example. + * + * @param saslProps Map of SASL properties + */ + @Override + public TTransportFactory createTransportFactory(Map saslProps) + throws TTransportException { + // Parse out the kerberos principal, host, realm. + String kerberosName = realUgi.getUserName(); + final String names[] = SaslRpcServer.splitKerberosName(kerberosName); + if (names.length != 3) { + throw new TTransportException("Kerberos principal should have 3 parts: " + kerberosName); + } + + TSaslServerTransport.Factory transFactory = new TSaslServerTransport.Factory(); + transFactory.addServerDefinition( + AuthMethod.KERBEROS.getMechanismName(), + names[0], names[1], // two parts of kerberos principal + saslProps, + new SaslRpcServer.SaslGssCallbackHandler()); + transFactory.addServerDefinition(AuthMethod.DIGEST.getMechanismName(), null, SaslRpcServer.SASL_DEFAULT_REALM, saslProps, new SaslDigestCallbackHandler(secretManager)); - return new TUGIAssumingTransportFactory(transFactory, realUgi); - } - - /** - * Wrap a TProcessor in such a way that, before processing any RPC, it - * assumes the UserGroupInformation of the user authenticated by - * the SASL transport. - */ - @Override - public TProcessor wrapProcessor(TProcessor processor) { - return new TUGIAssumingProcessor(processor, secretManager, true); - } - - /** - * Wrap a TProcessor to capture the client information like connecting userid, ip etc - */ - @Override - public TProcessor wrapNonAssumingProcessor(TProcessor processor) { + return new TUGIAssumingTransportFactory(transFactory, realUgi); + } + + /** + * Wrap a TProcessor in such a way that, before processing any RPC, it + * assumes the UserGroupInformation of the user authenticated by + * the SASL transport. + */ + @Override + public TProcessor wrapProcessor(TProcessor processor) { + return new TUGIAssumingProcessor(processor, secretManager, true); + } + + /** + * Wrap a TProcessor to capture the client information like connecting userid, ip etc + */ + @Override + public TProcessor wrapNonAssumingProcessor(TProcessor processor) { return new TUGIAssumingProcessor(processor, secretManager, false); - } + } protected DelegationTokenStore getTokenStore(Configuration conf) throws IOException { - String tokenStoreClassName = conf.get(DELEGATION_TOKEN_STORE_CLS, ""); - if (StringUtils.isBlank(tokenStoreClassName)) { - return new MemoryTokenStore(); - } - try { + String tokenStoreClassName = conf.get(DELEGATION_TOKEN_STORE_CLS, ""); + if (StringUtils.isBlank(tokenStoreClassName)) { + return new MemoryTokenStore(); + } + try { Class storeClass = Class .forName(tokenStoreClassName).asSubclass( DelegationTokenStore.class); return ReflectionUtils.newInstance(storeClass, conf); - } catch (ClassNotFoundException e) { + } catch (ClassNotFoundException e) { throw new IOException("Error initializing delegation token store: " + tokenStoreClassName, e); - } - } - - @Override - public void startDelegationTokenSecretManager(Configuration conf, Object hms) - throws IOException{ - long secretKeyInterval = - conf.getLong(DELEGATION_KEY_UPDATE_INTERVAL_KEY, - DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT); - long tokenMaxLifetime = - conf.getLong(DELEGATION_TOKEN_MAX_LIFETIME_KEY, - DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT); - long tokenRenewInterval = - conf.getLong(DELEGATION_TOKEN_RENEW_INTERVAL_KEY, - DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT); - - DelegationTokenStore dts = getTokenStore(conf); - dts.setStore(hms); - secretManager = new TokenStoreDelegationTokenSecretManager(secretKeyInterval, - tokenMaxLifetime, - tokenRenewInterval, - DELEGATION_TOKEN_GC_INTERVAL, dts); - secretManager.startThreads(); - } - - @Override - public String getDelegationToken(final String owner, final String renewer) - throws IOException, InterruptedException { - if (!authenticationMethod.get().equals(AuthenticationMethod.KERBEROS)) { - throw new AuthorizationException( - "Delegation Token can be issued only with kerberos authentication. " + - "Current AuthenticationMethod: " + authenticationMethod.get() - ); - } - //if the user asking the token is same as the 'owner' then don't do - //any proxy authorization checks. For cases like oozie, where it gets - //a delegation token for another user, we need to make sure oozie is - //authorized to get a delegation token. - //Do all checks on short names - UserGroupInformation currUser = UserGroupInformation.getCurrentUser(); - UserGroupInformation ownerUgi = UserGroupInformation.createRemoteUser(owner); - if (!ownerUgi.getShortUserName().equals(currUser.getShortUserName())) { - //in the case of proxy users, the getCurrentUser will return the - //real user (for e.g. oozie) due to the doAs that happened just before the - //server started executing the method getDelegationToken in the MetaStore - ownerUgi = UserGroupInformation.createProxyUser(owner, - UserGroupInformation.getCurrentUser()); - InetAddress remoteAddr = getRemoteAddress(); - ProxyUsers.authorize(ownerUgi,remoteAddr.getHostAddress(), null); - } - return ownerUgi.doAs(new PrivilegedExceptionAction() { - public String run() throws IOException { - return secretManager.getDelegationToken(renewer); - } - }); - } - - @Override - public String getDelegationTokenWithService(String owner, String renewer, String service) - throws IOException, InterruptedException { - String token = getDelegationToken(owner, renewer); - return ShimLoader.getHadoopShims().addServiceToToken(token, service); - } - - @Override - public long renewDelegationToken(String tokenStrForm) throws IOException { - if (!authenticationMethod.get().equals(AuthenticationMethod.KERBEROS)) { - throw new AuthorizationException( - "Delegation Token can be issued only with kerberos authentication. " + - "Current AuthenticationMethod: " + authenticationMethod.get() - ); - } - return secretManager.renewDelegationToken(tokenStrForm); - } - - @Override - public String getUserFromToken(String tokenStr) throws IOException { - return secretManager.getUserFromToken(tokenStr); - } - - @Override - public void cancelDelegationToken(String tokenStrForm) throws IOException { - secretManager.cancelDelegationToken(tokenStrForm); - } - - final static ThreadLocal remoteAddress = - new ThreadLocal() { - @Override - protected synchronized InetAddress initialValue() { - return null; - } - }; - - @Override - public InetAddress getRemoteAddress() { - return remoteAddress.get(); - } - - final static ThreadLocal authenticationMethod = - new ThreadLocal() { - @Override - protected synchronized AuthenticationMethod initialValue() { - return AuthenticationMethod.TOKEN; - } - }; - - private static ThreadLocal remoteUser = new ThreadLocal () { - @Override - protected synchronized String initialValue() { - return null; - } - }; - - @Override - public String getRemoteUser() { - return remoteUser.get(); - } + } + } + + @Override + public void startDelegationTokenSecretManager(Configuration conf, Object hms) + throws IOException{ + long secretKeyInterval = + conf.getLong(DELEGATION_KEY_UPDATE_INTERVAL_KEY, + DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT); + long tokenMaxLifetime = + conf.getLong(DELEGATION_TOKEN_MAX_LIFETIME_KEY, + DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT); + long tokenRenewInterval = + conf.getLong(DELEGATION_TOKEN_RENEW_INTERVAL_KEY, + DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT); + + DelegationTokenStore dts = getTokenStore(conf); + dts.setStore(hms); + secretManager = new TokenStoreDelegationTokenSecretManager(secretKeyInterval, + tokenMaxLifetime, + tokenRenewInterval, + DELEGATION_TOKEN_GC_INTERVAL, dts); + secretManager.startThreads(); + } + + @Override + public String getDelegationToken(final String owner, final String renewer) + throws IOException, InterruptedException { + if (!authenticationMethod.get().equals(AuthenticationMethod.KERBEROS)) { + throw new AuthorizationException( + "Delegation Token can be issued only with kerberos authentication. " + + "Current AuthenticationMethod: " + authenticationMethod.get() + ); + } + //if the user asking the token is same as the 'owner' then don't do + //any proxy authorization checks. For cases like oozie, where it gets + //a delegation token for another user, we need to make sure oozie is + //authorized to get a delegation token. + //Do all checks on short names + UserGroupInformation currUser = UserGroupInformation.getCurrentUser(); + UserGroupInformation ownerUgi = UserGroupInformation.createRemoteUser(owner); + if (!ownerUgi.getShortUserName().equals(currUser.getShortUserName())) { + //in the case of proxy users, the getCurrentUser will return the + //real user (for e.g. oozie) due to the doAs that happened just before the + //server started executing the method getDelegationToken in the MetaStore + ownerUgi = UserGroupInformation.createProxyUser(owner, + UserGroupInformation.getCurrentUser()); + InetAddress remoteAddr = getRemoteAddress(); + ProxyUsers.authorize(ownerUgi,remoteAddr.getHostAddress(), null); + } + return ownerUgi.doAs(new PrivilegedExceptionAction() { + @Override + public String run() throws IOException { + return secretManager.getDelegationToken(renewer); + } + }); + } + + @Override + public String getDelegationTokenWithService(String owner, String renewer, String service) + throws IOException, InterruptedException { + String token = getDelegationToken(owner, renewer); + return ShimLoader.getHadoopShims().addServiceToToken(token, service); + } + + @Override + public long renewDelegationToken(String tokenStrForm) throws IOException { + if (!authenticationMethod.get().equals(AuthenticationMethod.KERBEROS)) { + throw new AuthorizationException( + "Delegation Token can be issued only with kerberos authentication. " + + "Current AuthenticationMethod: " + authenticationMethod.get() + ); + } + return secretManager.renewDelegationToken(tokenStrForm); + } + + @Override + public String getUserFromToken(String tokenStr) throws IOException { + return secretManager.getUserFromToken(tokenStr); + } + + @Override + public void cancelDelegationToken(String tokenStrForm) throws IOException { + secretManager.cancelDelegationToken(tokenStrForm); + } + + final static ThreadLocal remoteAddress = + new ThreadLocal() { + @Override + protected synchronized InetAddress initialValue() { + return null; + } + }; + + @Override + public InetAddress getRemoteAddress() { + return remoteAddress.get(); + } + + final static ThreadLocal authenticationMethod = + new ThreadLocal() { + @Override + protected synchronized AuthenticationMethod initialValue() { + return AuthenticationMethod.TOKEN; + } + }; + + private static ThreadLocal remoteUser = new ThreadLocal () { + @Override + protected synchronized String initialValue() { + return null; + } + }; + + @Override + public String getRemoteUser() { + return remoteUser.get(); + } /** CallbackHandler for SASL DIGEST-MD5 mechanism */ // This code is pretty much completely based on Hadoop's @@ -501,12 +525,12 @@ public void handle(Callback[] callbacks) throws InvalidToken, continue; // realm is ignored } else { throw new UnsupportedCallbackException(callback, - "Unrecognized SASL DIGEST-MD5 Callback"); + "Unrecognized SASL DIGEST-MD5 Callback"); } } if (pc != null) { DelegationTokenIdentifier tokenIdentifier = SaslRpcServer. - getIdentifier(nc.getDefaultName(), secretManager); + getIdentifier(nc.getDefaultName(), secretManager); char[] password = getPassword(tokenIdentifier); if (LOG.isDebugEnabled()) { @@ -526,7 +550,7 @@ public void handle(Callback[] callbacks) throws InvalidToken, if (ac.isAuthorized()) { if (LOG.isDebugEnabled()) { String username = - SaslRpcServer.getIdentifier(authzid, secretManager).getUser().getUserName(); + SaslRpcServer.getIdentifier(authzid, secretManager).getUser().getUserName(); LOG.debug("SASL server DIGEST-MD5 callback: setting " + "canonicalized client ID: " + username); } @@ -534,117 +558,120 @@ public void handle(Callback[] callbacks) throws InvalidToken, } } } - } - - /** - * Processor that pulls the SaslServer object out of the transport, and - * assumes the remote user's UGI before calling through to the original - * processor. - * - * This is used on the server side to set the UGI for each specific call. - */ - protected class TUGIAssumingProcessor implements TProcessor { - final TProcessor wrapped; - DelegationTokenSecretManager secretManager; - boolean useProxy; - TUGIAssumingProcessor(TProcessor wrapped, DelegationTokenSecretManager secretManager, - boolean useProxy) { - this.wrapped = wrapped; - this.secretManager = secretManager; - this.useProxy = useProxy; - } - - public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException { - TTransport trans = inProt.getTransport(); - if (!(trans instanceof TSaslServerTransport)) { - throw new TException("Unexpected non-SASL transport " + trans.getClass()); - } - TSaslServerTransport saslTrans = (TSaslServerTransport)trans; - SaslServer saslServer = saslTrans.getSaslServer(); - String authId = saslServer.getAuthorizationID(); - authenticationMethod.set(AuthenticationMethod.KERBEROS); - LOG.debug("AUTH ID ======>" + authId); - String endUser = authId; - - if(saslServer.getMechanismName().equals("DIGEST-MD5")) { - try { - TokenIdentifier tokenId = SaslRpcServer.getIdentifier(authId, - secretManager); - endUser = tokenId.getUser().getUserName(); - authenticationMethod.set(AuthenticationMethod.TOKEN); - } catch (InvalidToken e) { - throw new TException(e.getMessage()); - } - } - Socket socket = ((TSocket)(saslTrans.getUnderlyingTransport())).getSocket(); - remoteAddress.set(socket.getInetAddress()); - UserGroupInformation clientUgi = null; - try { - if (useProxy) { - clientUgi = UserGroupInformation.createProxyUser( - endUser, UserGroupInformation.getLoginUser()); - remoteUser.set(clientUgi.getShortUserName()); - return clientUgi.doAs(new PrivilegedExceptionAction() { - public Boolean run() { - try { - return wrapped.process(inProt, outProt); - } catch (TException te) { - throw new RuntimeException(te); - } - } - }); - } else { - remoteUser.set(endUser); - return wrapped.process(inProt, outProt); - } - } catch (RuntimeException rte) { - if (rte.getCause() instanceof TException) { - throw (TException)rte.getCause(); - } - throw rte; - } catch (InterruptedException ie) { - throw new RuntimeException(ie); // unexpected! - } catch (IOException ioe) { - throw new RuntimeException(ioe); // unexpected! - } - finally { - if (clientUgi != null) { - try { FileSystem.closeAllForUGI(clientUgi); } - catch(IOException exception) { - LOG.error("Could not clean up file-system handles for UGI: " + clientUgi, exception); + } + + /** + * Processor that pulls the SaslServer object out of the transport, and + * assumes the remote user's UGI before calling through to the original + * processor. + * + * This is used on the server side to set the UGI for each specific call. + */ + protected class TUGIAssumingProcessor implements TProcessor { + final TProcessor wrapped; + DelegationTokenSecretManager secretManager; + boolean useProxy; + TUGIAssumingProcessor(TProcessor wrapped, DelegationTokenSecretManager secretManager, + boolean useProxy) { + this.wrapped = wrapped; + this.secretManager = secretManager; + this.useProxy = useProxy; + } + + @Override + public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException { + TTransport trans = inProt.getTransport(); + if (!(trans instanceof TSaslServerTransport)) { + throw new TException("Unexpected non-SASL transport " + trans.getClass()); + } + TSaslServerTransport saslTrans = (TSaslServerTransport)trans; + SaslServer saslServer = saslTrans.getSaslServer(); + String authId = saslServer.getAuthorizationID(); + authenticationMethod.set(AuthenticationMethod.KERBEROS); + LOG.debug("AUTH ID ======>" + authId); + String endUser = authId; + + if(saslServer.getMechanismName().equals("DIGEST-MD5")) { + try { + TokenIdentifier tokenId = SaslRpcServer.getIdentifier(authId, + secretManager); + endUser = tokenId.getUser().getUserName(); + authenticationMethod.set(AuthenticationMethod.TOKEN); + } catch (InvalidToken e) { + throw new TException(e.getMessage()); + } + } + Socket socket = ((TSocket)(saslTrans.getUnderlyingTransport())).getSocket(); + remoteAddress.set(socket.getInetAddress()); + UserGroupInformation clientUgi = null; + try { + if (useProxy) { + clientUgi = UserGroupInformation.createProxyUser( + endUser, UserGroupInformation.getLoginUser()); + remoteUser.set(clientUgi.getShortUserName()); + return clientUgi.doAs(new PrivilegedExceptionAction() { + @Override + public Boolean run() { + try { + return wrapped.process(inProt, outProt); + } catch (TException te) { + throw new RuntimeException(te); + } } + }); + } else { + remoteUser.set(endUser); + return wrapped.process(inProt, outProt); + } + } catch (RuntimeException rte) { + if (rte.getCause() instanceof TException) { + throw (TException)rte.getCause(); + } + throw rte; + } catch (InterruptedException ie) { + throw new RuntimeException(ie); // unexpected! + } catch (IOException ioe) { + throw new RuntimeException(ioe); // unexpected! + } + finally { + if (clientUgi != null) { + try { FileSystem.closeAllForUGI(clientUgi); } + catch(IOException exception) { + LOG.error("Could not clean up file-system handles for UGI: " + clientUgi, exception); + } } - } - } - } + } + } + } /** - * A TransportFactory that wraps another one, but assumes a specified UGI - * before calling through. - * - * This is used on the server side to assume the server's Principal when accepting - * clients. - */ - static class TUGIAssumingTransportFactory extends TTransportFactory { - private final UserGroupInformation ugi; - private final TTransportFactory wrapped; - - public TUGIAssumingTransportFactory(TTransportFactory wrapped, UserGroupInformation ugi) { - assert wrapped != null; - assert ugi != null; - - this.wrapped = wrapped; - this.ugi = ugi; - } - - @Override - public TTransport getTransport(final TTransport trans) { - return ugi.doAs(new PrivilegedAction() { - public TTransport run() { - return wrapped.getTransport(trans); - } - }); - } - } - } - } + * A TransportFactory that wraps another one, but assumes a specified UGI + * before calling through. + * + * This is used on the server side to assume the server's Principal when accepting + * clients. + */ + static class TUGIAssumingTransportFactory extends TTransportFactory { + private final UserGroupInformation ugi; + private final TTransportFactory wrapped; + + public TUGIAssumingTransportFactory(TTransportFactory wrapped, UserGroupInformation ugi) { + assert wrapped != null; + assert ugi != null; + + this.wrapped = wrapped; + this.ugi = ugi; + } + + @Override + public TTransport getTransport(final TTransport trans) { + return ugi.doAs(new PrivilegedAction() { + @Override + public TTransport run() { + return wrapped.getTransport(trans); + } + }); + } + } + } +} diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index 1f24a94..5f00d28 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -43,7 +43,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -80,15 +79,15 @@ * @return TaskAttempt Log Url */ String getTaskAttemptLogUrl(JobConf conf, - String taskTrackerHttpAddress, - String taskAttemptId) - throws MalformedURLException; + String taskTrackerHttpAddress, + String taskAttemptId) + throws MalformedURLException; /** * Returns a shim to wrap MiniMrCluster */ public MiniMrShim getMiniMrCluster(Configuration conf, int numberOfTaskTrackers, - String nameNode, int numDir) throws IOException; + String nameNode, int numDir) throws IOException; /** * Shim for MiniMrCluster @@ -125,7 +124,7 @@ int createHadoopArchive(Configuration conf, Path parentDir, Path destDir, String archiveName) throws Exception; public URI getHarUri(URI original, URI base, URI originalBase) - throws URISyntaxException; + throws URISyntaxException; /** * Hive uses side effect files exclusively for it's output. It also manages * the setup/cleanup/commit of output from the hive client. As a result it does @@ -165,7 +164,7 @@ public URI getHarUri(URI original, URI base, URI originalBase) * @throws InterruptedException */ public T doAs(UserGroupInformation ugi, PrivilegedExceptionAction pvea) throws - IOException, InterruptedException; + IOException, InterruptedException; /** * Once a delegation token is stored in a file, the location is specified @@ -188,13 +187,13 @@ public URI getHarUri(URI original, URI base, URI originalBase) /** - * Used by metastore server to creates UGI object for a remote user. + * Used to creates UGI object for a remote user. * @param userName remote User Name * @param groupNames group names associated with remote user name * @return UGI created for the remote user. */ - public UserGroupInformation createRemoteUser(String userName, List groupNames); + /** * Get the short name corresponding to the subject in the passed UGI * @@ -240,7 +239,7 @@ public URI getHarUri(URI original, URI base, URI originalBase) * @throws IOException */ public void setTokenStr(UserGroupInformation ugi, String tokenStr, String tokenService) - throws IOException; + throws IOException; /** * Add given service to the string format token @@ -250,7 +249,7 @@ public void setTokenStr(UserGroupInformation ugi, String tokenStr, String tokenS * @throws IOException */ public String addServiceToToken(String tokenStr, String tokenService) - throws IOException; + throws IOException; enum JobTrackerState { INITIALIZING, RUNNING }; @@ -331,7 +330,7 @@ public String addServiceToToken(String tokenStr, String tokenService) * @throws IOException */ public boolean moveToAppropriateTrash(FileSystem fs, Path path, Configuration conf) - throws IOException; + throws IOException; /** * Get the default block size for the path. FileSystem alone is not sufficient to @@ -382,6 +381,7 @@ public void authorizeProxyAccess(String proxyUser, UserGroupInformation realUser public interface InputSplitShim extends InputSplit { JobConf getJob(); + @Override long getLength(); /** Returns an array containing the startoffsets of the files in the split. */ @@ -406,14 +406,18 @@ public void authorizeProxyAccess(String proxyUser, UserGroupInformation realUser Path[] getPaths(); /** Returns all the Paths where this input-split resides. */ + @Override String[] getLocations() throws IOException; void shrinkSplit(long length); + @Override String toString(); + @Override void readFields(DataInput in) throws IOException; + @Override void write(DataOutput out) throws IOException; } @@ -445,7 +449,7 @@ RecordReader getRecordReader(JobConf job, InputSplitShim split, Reporter reporte * @throws IOException */ Iterator listLocatedStatus(FileSystem fs, Path path, - PathFilter filter) throws IOException; + PathFilter filter) throws IOException; /** * For file status returned by listLocatedStatus, convert them into a list @@ -456,7 +460,7 @@ RecordReader getRecordReader(JobConf job, InputSplitShim split, Reporter reporte * @throws IOException */ BlockLocation[] getLocations(FileSystem fs, - FileStatus status) throws IOException; + FileStatus status) throws IOException; public HCatHadoopShims getHCatShim(); public interface HCatHadoopShims { @@ -468,10 +472,10 @@ RecordReader getRecordReader(JobConf job, InputSplitShim split, Reporter reporte public TaskAttemptID createTaskAttemptID(); public org.apache.hadoop.mapreduce.TaskAttemptContext createTaskAttemptContext(Configuration conf, - TaskAttemptID taskId); + TaskAttemptID taskId); public org.apache.hadoop.mapred.TaskAttemptContext createTaskAttemptContext(JobConf conf, - org.apache.hadoop.mapred.TaskAttemptID taskId, Progressable progressable); + org.apache.hadoop.mapred.TaskAttemptID taskId, Progressable progressable); public JobContext createJobContext(Configuration conf, JobID jobId); @@ -603,7 +607,7 @@ RecordReader getRecordReader(JobConf job, InputSplitShim split, Reporter reporte } public DirectDecompressorShim getDirectDecompressor(DirectCompressionType codec); - + /** * Get configuration from JobContext */ diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java index e69373a..d0d6c7b 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java @@ -16,39 +16,53 @@ * limitations under the License. */ - package org.apache.hadoop.hive.thrift; +package org.apache.hadoop.hive.thrift; - import java.io.IOException; +import java.io.IOException; import java.net.InetAddress; import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.thrift.TProcessor; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; import org.apache.thrift.transport.TTransportFactory; - /** - * This class is only overridden by the secure hadoop shim. It allows - * the Thrift SASL support to bridge to Hadoop's UserGroupInformation - * & DelegationToken infrastructure. - */ - public class HadoopThriftAuthBridge { - public Client createClient() { - throw new UnsupportedOperationException( - "The current version of Hadoop does not support Authentication"); - } +/** + * This class is only overridden by the secure hadoop shim. It allows + * the Thrift SASL support to bridge to Hadoop's UserGroupInformation + * & DelegationToken infrastructure. + */ +public class HadoopThriftAuthBridge { + public Client createClient() { + throw new UnsupportedOperationException( + "The current version of Hadoop does not support Authentication"); + } + + public Client createClientWithConf(String authType) { + throw new UnsupportedOperationException( + "The current version of Hadoop does not support Authentication"); + } + + public UserGroupInformation getCurrentUGIWithConf(String authType) + throws IOException { + throw new UnsupportedOperationException( + "The current version of Hadoop does not support Authentication"); + } + - public Client createClientWithConf(String authType) { - throw new UnsupportedOperationException( - "The current version of Hadoop does not support Authentication"); - } + public String getServerPrincipal(String principalConfig, String host) + throws IOException { + throw new UnsupportedOperationException( + "The current version of Hadoop does not support Authentication"); + } - public Server createServer(String keytabFile, String principalConf) - throws TTransportException { - throw new UnsupportedOperationException( - "The current version of Hadoop does not support Authentication"); - } + public Server createServer(String keytabFile, String principalConf) + throws TTransportException { + throw new UnsupportedOperationException( + "The current version of Hadoop does not support Authentication"); + } /** @@ -58,47 +72,47 @@ public Server createServer(String keytabFile, String principalConf) * @param conf * @return Hadoop SASL configuration */ - public Map getHadoopSaslProperties(Configuration conf) { - throw new UnsupportedOperationException( - "The current version of Hadoop does not support Authentication"); - } + public Map getHadoopSaslProperties(Configuration conf) { + throw new UnsupportedOperationException( + "The current version of Hadoop does not support Authentication"); + } - public static abstract class Client { - /** - * - * @param principalConfig In the case of Kerberos authentication this will - * be the kerberos principal name, for DIGEST-MD5 (delegation token) based - * authentication this will be null - * @param host The metastore server host name - * @param methodStr "KERBEROS" or "DIGEST" - * @param tokenStrForm This is url encoded string form of - * org.apache.hadoop.security.token. - * @param underlyingTransport the underlying transport - * @return the transport - * @throws IOException - */ - public abstract TTransport createClientTransport( - String principalConfig, String host, - String methodStr, String tokenStrForm, TTransport underlyingTransport, - Map saslProps) - throws IOException; - } + public static abstract class Client { + /** + * + * @param principalConfig In the case of Kerberos authentication this will + * be the kerberos principal name, for DIGEST-MD5 (delegation token) based + * authentication this will be null + * @param host The metastore server host name + * @param methodStr "KERBEROS" or "DIGEST" + * @param tokenStrForm This is url encoded string form of + * org.apache.hadoop.security.token. + * @param underlyingTransport the underlying transport + * @return the transport + * @throws IOException + */ + public abstract TTransport createClientTransport( + String principalConfig, String host, + String methodStr, String tokenStrForm, TTransport underlyingTransport, + Map saslProps) + throws IOException; + } - public static abstract class Server { - public abstract TTransportFactory createTransportFactory(Map saslProps) throws TTransportException; - public abstract TProcessor wrapProcessor(TProcessor processor); - public abstract TProcessor wrapNonAssumingProcessor(TProcessor processor); - public abstract InetAddress getRemoteAddress(); - public abstract void startDelegationTokenSecretManager(Configuration conf, - Object hmsHandler) throws IOException; - public abstract String getDelegationToken(String owner, String renewer) - throws IOException, InterruptedException; - public abstract String getDelegationTokenWithService(String owner, String renewer, String service) - throws IOException, InterruptedException; - public abstract String getRemoteUser(); - public abstract long renewDelegationToken(String tokenStrForm) throws IOException; - public abstract void cancelDelegationToken(String tokenStrForm) throws IOException; - public abstract String getUserFromToken(String tokenStr) throws IOException; - } - } + public static abstract class Server { + public abstract TTransportFactory createTransportFactory(Map saslProps) throws TTransportException; + public abstract TProcessor wrapProcessor(TProcessor processor); + public abstract TProcessor wrapNonAssumingProcessor(TProcessor processor); + public abstract InetAddress getRemoteAddress(); + public abstract void startDelegationTokenSecretManager(Configuration conf, + Object hmsHandler) throws IOException; + public abstract String getDelegationToken(String owner, String renewer) + throws IOException, InterruptedException; + public abstract String getDelegationTokenWithService(String owner, String renewer, String service) + throws IOException, InterruptedException; + public abstract String getRemoteUser(); + public abstract long renewDelegationToken(String tokenStrForm) throws IOException; + public abstract void cancelDelegationToken(String tokenStrForm) throws IOException; + public abstract String getUserFromToken(String tokenStr) throws IOException; + } +}