diff --git a/itests/hive-unit/pom.xml b/itests/hive-unit/pom.xml index 326d646..9ad5f53 100644 --- a/itests/hive-unit/pom.xml +++ b/itests/hive-unit/pom.xml @@ -103,6 +103,12 @@ org.apache.hive + hive-jdbc + ${project.version} + test + + + org.apache.hive hive-metastore ${project.version} tests @@ -251,6 +257,12 @@ ${hadoop.version} test + + org.apache.curator + curator-test + ${curator.version} + test + diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestServiceDiscovery.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestServiceDiscovery.java new file mode 100644 index 0000000..b153679 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestServiceDiscovery.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.jdbc; + +import com.google.common.base.Joiner; +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.TestingServer; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.junit.After; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Test Hive dynamic service discovery. + */ +public class TestServiceDiscovery { + private static TestingServer server; + private static CuratorFramework client; + private static String rootNamespace = "hiveserver2"; + + @BeforeClass + public static void setup() throws Exception { + server = new TestingServer(); + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); + client = builder.connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).build(); + client.start(); + } + + @After + public void teardown() throws Exception { + client.close(); + server.close(); + server = null; + } + + @Test + public void testConnect() throws Exception { + Map confs = new HashMap(); + confs.put("hive.server2.thrift.bind.host", "host-1"); + confs.put("hive.server2.transport.mode", "binary"); + confs.put("hive.server2.thrift.port", "8000"); + confs.put("hive.server2.authentication", "PLAIN"); + publishConfsToZk(confs, "uri1"); + + confs.clear(); + confs.put("hive.server2.thrift.bind.host", "host-2"); + confs.put("hive.server2.transport.mode", "binary"); + confs.put("hive.server2.thrift.port", "9000"); + confs.put("hive.server2.authentication", "PLAIN"); + publishConfsToZk(confs, "uri2"); + + confs.clear(); + confs.put("hive.server2.thrift.bind.host", "host-3"); + confs.put("hive.server2.transport.mode", "binary"); + confs.put("hive.server2.thrift.port", "10000"); + confs.put("hive.server2.authentication", "PLAIN"); + publishConfsToZk(confs, "uri3"); + + Utils.JdbcConnectionParams connParams = new Utils.JdbcConnectionParams(); + connParams.setZooKeeperEnsemble(server.getConnectString()); + connParams.getSessionVars().put(Utils.JdbcConnectionParams.ZOOKEEPER_NAMESPACE, "hiveserver2"); + + List allConnectParams = new ArrayList<>(); + + while (true) { + //Reject all paths to force it to continue. When no more paths, should throw an exception. + try { + ZooKeeperHiveClientHelper.configureConnParams(connParams); + } catch (ZooKeeperHiveClientException e) { + break; + } + connParams.getRejectedHostZnodePaths().add(connParams.getCurrentHostZnodePath()); + allConnectParams.add(new ConnParamInfo(connParams.getHost(), connParams.getPort(), + connParams.getCurrentHostZnodePath())); + } + + //Make sure it itereated through all possible ConnParams + Collection cp1 = Collections2.filter(allConnectParams, + new ConnParamInfoPred("host-1", 8000, "serverUri=uri1")); + Collection cp2 = Collections2.filter(allConnectParams, + new ConnParamInfoPred("host-2", 9000, "serverUri=uri2")); + Collection cp3 = Collections2.filter(allConnectParams, + new ConnParamInfoPred("host-3", 10000, "serverUri=uri3")); + + Assert.assertEquals(cp1.size(), 1); + Assert.assertEquals(cp2.size(), 1); + Assert.assertEquals(cp3.size(), 1); + } + + //Helper classes for ConnParam comparison logics. + private class ConnParamInfo { + String host; + int port; + String path; + + public ConnParamInfo(String host, int port, String path) { + this.host = host; + this.port = port; + this.path = path; + } + } + + private class ConnParamInfoPred implements Predicate { + String host; + int port; + String pathPrefix; + + ConnParamInfoPred(String host, int port, String pathPrefix) { + this.host = host; + this.port = port; + this.pathPrefix = pathPrefix; + } + + @Override + public boolean apply(ConnParamInfo inputParam) { + return inputParam.host.equals(host) && inputParam.port == port && + inputParam.path.startsWith(pathPrefix); + } + } + + //Mocks HS2 publishing logic. + private void publishConfsToZk(Map confs, String uri) throws Exception { + try { + client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) + .forPath(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace); + } catch (KeeperException e) { + Assert.assertTrue(e.code() == KeeperException.Code.NODEEXISTS); + } + String pathPrefix = ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace + + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "serverUri=" + uri + ";" + + "sequence="; + String znodeData = ""; + // Publish configs for this instance as the data on the node + znodeData = Joiner.on(';').withKeyValueSeparator("=").join(confs); + byte[] znodeDataUTF8 = znodeData.getBytes(Charset.forName("UTF-8")); + PersistentEphemeralNode znode = + new PersistentEphemeralNode(client, + PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, znodeDataUTF8); + znode.start(); + // We'll wait for 120s for node creation + long znodeCreationTimeout = 120; + if (!znode.waitForInitialCreate(znodeCreationTimeout, TimeUnit.SECONDS)) { + throw new Exception("Max znode creation wait time: " + znodeCreationTimeout + "s exhausted"); + } + } +} diff --git a/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java b/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java index 6c21423..44c2eb7 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java +++ b/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java @@ -105,8 +105,7 @@ private static void applyConfs(String serverConfStr, JdbcConnectionParams connPa + " published by the server."); } // Set host - if ((matcher.group(1).equals("hive.server2.thrift.bind.host")) - && (connParams.getHost() == null)) { + if (matcher.group(1).equals("hive.server2.thrift.bind.host")) { connParams.setHost(matcher.group(2)); } // Set transportMode @@ -115,7 +114,7 @@ private static void applyConfs(String serverConfStr, JdbcConnectionParams connPa connParams.getSessionVars().put(JdbcConnectionParams.TRANSPORT_MODE, matcher.group(2)); } // Set port - if ((matcher.group(1).equals("hive.server2.thrift.port")) && !(connParams.getPort() > 0)) { + if (matcher.group(1).equals("hive.server2.thrift.port")) { connParams.setPort(Integer.parseInt(matcher.group(2))); } if ((matcher.group(1).equals("hive.server2.thrift.http.port"))