diff --git itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/MiniHiveKdc.java itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/MiniHiveKdc.java index dedbf35..eba0646 100644 --- itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/MiniHiveKdc.java +++ itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/MiniHiveKdc.java @@ -101,7 +101,7 @@ public MiniHiveKdc(Configuration conf) miniKdc.start(); // create default users - addUserPrincipal(getServicePrincipalForUser(HIVE_SERVICE_PRINCIPAL)); + addUserPrincipal(getServicePrincipalForUser(HIVE_SERVICE_PRINCIPAL, null)); addUserPrincipal(HIVE_TEST_USER_1); addUserPrincipal(HIVE_TEST_USER_2); addUserPrincipal(HIVE_TEST_SUPER_USER); @@ -121,6 +121,10 @@ public void addUserPrincipal(String principal) throws Exception { userPrincipals.put(principal, keytab.getPath()); } + public void addServicePrincipalWithHost(String host) throws Exception { + addUserPrincipal(getServicePrincipalForUser(HIVE_SERVICE_PRINCIPAL, host)); + } + /** * Login the given principal, using corresponding keytab file from internal map * @param principal @@ -142,20 +146,31 @@ public String getFullyQualifiedUserPrincipal(String shortUserName) { return shortUserName + "@" + miniKdc.getRealm(); } - public String getFullyQualifiedServicePrincipal(String shortUserName) { - return getServicePrincipalForUser(shortUserName) + "@" + miniKdc.getRealm(); + public String getFullyQualifiedServicePrincipal(String shortUserName, String host) { + return getServicePrincipalForUser(shortUserName, host) + "@" + miniKdc.getRealm(); } - public String getServicePrincipalForUser(String shortUserName) { - return shortUserName + "/" + miniKdc.getHost(); + public String getServicePrincipalForUser(String shortUserName, String host) { + if (host == null) + return shortUserName + "/" + miniKdc.getHost(); + else + return shortUserName + "/" + host; } public String getHiveServicePrincipal() { - return getServicePrincipalForUser(HIVE_SERVICE_PRINCIPAL); + return getHiveServicePrincipal(null); + } + + public String getHiveServicePrincipal(String host) { + return getServicePrincipalForUser(HIVE_SERVICE_PRINCIPAL, host); } public String getFullHiveServicePrincipal() { - return getServicePrincipalForUser(HIVE_SERVICE_PRINCIPAL) + "@" + miniKdc.getRealm(); + return getFullHiveServicePrincipal(null); + } + + public String getFullHiveServicePrincipal(String host) { + return getServicePrincipalForUser(HIVE_SERVICE_PRINCIPAL, host) + "@" + miniKdc.getRealm(); } public String getDefaultUserPrincipal() { @@ -171,13 +186,12 @@ public String getDefaultUserPrincipal() { */ public static MiniHS2 getMiniHS2WithKerb(MiniHiveKdc miniHiveKdc, HiveConf hiveConf) throws Exception { String hivePrincipal = - miniHiveKdc.getFullyQualifiedServicePrincipal(MiniHiveKdc.HIVE_SERVICE_PRINCIPAL); + miniHiveKdc.getFullyQualifiedServicePrincipal(MiniHiveKdc.HIVE_SERVICE_PRINCIPAL, null); String hiveKeytab = miniHiveKdc.getKeyTabFile( - miniHiveKdc.getServicePrincipalForUser(MiniHiveKdc.HIVE_SERVICE_PRINCIPAL)); + miniHiveKdc.getServicePrincipalForUser(MiniHiveKdc.HIVE_SERVICE_PRINCIPAL, null)); return new MiniHS2.Builder().withConf(hiveConf). withMiniKdc(hivePrincipal, hiveKeytab).build(); } - } diff --git itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestHiveMetaStoreClientWithMiniKdc.java itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestHiveMetaStoreClientWithMiniKdc.java new file mode 100644 index 0000000..9c6679c --- /dev/null +++ itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestHiveMetaStoreClientWithMiniKdc.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.minikdc; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.Enumeration; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.util.StringUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestHiveMetaStoreClientWithMiniKdc { + private MiniHiveKdc miniHiveKdc; + private HiveConf hiveConf; + private ServerSocket serverSock; + private int port; + private HiveMetaStoreClient msc; + private String mscIP; + + @Before + public void setUp() throws Exception { + hiveConf = new HiveConf(); + port = MetaStoreUtils.findFreePort(); + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + port); + + miniHiveKdc = MiniHiveKdc.getMiniHiveKdc(hiveConf); + mscIP = getInterface(); + // add service principal in miniHiveKdc + miniHiveKdc.addServicePrincipalWithHost(mscIP); + } + + @After + public void tearDown() throws Exception { + miniHiveKdc.shutDown(); + } + + @Test + public void testHiveMetaStoreClientConnection() throws Exception { + Socket clientSock = null; + try { + serverSock = new ServerSocket(port); + + setUGI(); + + /* HiveMetastoreClient */ + HMClient client = new HMClient(); + client.start(); + + clientSock = serverSock.accept(); + assertEquals(mscIP, clientSock.getInetAddress().getHostAddress()); + + } catch (IOException e) { + e.printStackTrace(); + } finally { + serverSock.close(); + } + } + + private void setUGI() throws Exception { + String servicePrinc = miniHiveKdc.getHiveServicePrincipal(mscIP); + miniHiveKdc.loginUser(servicePrinc); + } + + private String getInterface() throws Exception { + Enumeration nics; + nics = NetworkInterface.getNetworkInterfaces(); + + while(nics.hasMoreElements()) + { + NetworkInterface nic = (NetworkInterface) nics.nextElement(); + Enumeration nicaddrs = nic.getInetAddresses(); + while (nicaddrs.hasMoreElements()) + { + InetAddress addr = (InetAddress) nicaddrs.nextElement(); + System.out.println(addr.getHostAddress()); + // only check IPv4 + if (addr.getHostAddress().contains(".") + && !addr.getHostAddress().equals("127.0.0.1")) { + return addr.getHostAddress(); + } + } + } + + /* If the system does not have other network address, + * it returns default localhost */ + return "127.0.0.1"; + } + + /** + * HiveMetaStore client: It will open the socket connection when we create new object. + */ + private class HMClient extends Thread { + public HMClient() { + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + port); + } + + @Override + public void run() { + try { + Thread.sleep(1000); + msc = new HiveMetaStoreClient(hiveConf, null); + } catch (Throwable e) { + System.err.println("Unable to open the metastore"); + System.err.println(StringUtils.stringifyException(e)); + } + } + } +} diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index a5f5053..716a54c 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -28,8 +28,10 @@ import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.URI; import java.net.UnknownHostException; +import java.net.Socket; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -137,6 +139,8 @@ import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.apache.thrift.TApplicationException; @@ -375,11 +379,37 @@ private void open() throws MetaException { int clientSocketTimeout = (int) conf.getTimeVar( ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS); + Socket socket = null; + String hiveHost = null; + try { + UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); + if (loginUser != null && loginUser.hasKerberosCredentials()) { + hiveHost = SecurityUtil.getHostFromPrincipal(loginUser.getUserName()); + } + } catch (IOException ioe) { + LOG.error("Couldn't create client transport", ioe); + throw new MetaException(ioe.toString()); + } + for (int attempt = 0; !isConnected && attempt < retries; ++attempt) { for (URI store : metastoreUris) { LOG.info("Trying to connect to metastore with URI " + store); try { - transport = new TSocket(store.getHost(), store.getPort(), clientSocketTimeout); + try { + TSocket tsocket = new TSocket(store.getHost(), store.getPort(), 1000 * clientSocketTimeout); + if (hiveHost != null && !hiveHost.isEmpty()) { + socket = tsocket.getSocket(); + InetAddress localAddr = NetUtils.getLocalInetAddress(hiveHost); + if (localAddr != null) + socket.bind(new InetSocketAddress(localAddr, 0)); + } + transport = tsocket; + } catch (IOException ioe) { + LOG.error("Couldn't create client transport to HiveMetastore(" + + store.getHost() + ":" + store.getPort() + ")", ioe); + throw new MetaException(ioe.toString()); + } + if (useSasl) { // Wrap thrift connection with SASL for secure connection. try {