diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 4f2e6d3..dfb91aa 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1911,6 +1911,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal new TimeValidator(TimeUnit.MILLISECONDS), "ZooKeeper client's session timeout (in milliseconds). The client is disconnected, and as a result, all locks released, \n" + "if a heartbeat is not sent in the timeout."), + HIVE_ZOOKEEPER_CONNECTION_TIMEOUT("hive.zookeeper.connection.timeout", "15s", + new TimeValidator(TimeUnit.SECONDS), + "ZooKeeper client's connection timeout in seconds. Connection timeout * hive.zookeeper.connection.max.retries\n" + + "with exponential backoff is when curator client deems connection is lost to zookeeper."), HIVE_ZOOKEEPER_NAMESPACE("hive.zookeeper.namespace", "hive_zookeeper_namespace", "The parent node under which all ZooKeeper nodes are created."), HIVE_ZOOKEEPER_CLEAN_EXTRA_NODES("hive.zookeeper.clean.extra.nodes", false, @@ -2470,6 +2474,13 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "The maximum number of past queries to show in HiverSever2 WebUI."), // Tez session settings + HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE("hive.server2.active.passive.ha.enable", false, + "Whether HiveServer2 Active/Passive High Availability be enabled when Hive Interactive sessions are enabled." + + "This will also require hive.server2.support.dynamic.service.discovery to be enabled."), + HIVE_SERVER2_ACTIVE_PASSIVE_HA_REGISTRY_NAMESPACE("hive.server2.active.passive.ha.registry.namespace", + "hs2ActivePassiveHA", + "When HiveServer2 Active/Passive High Availability is enabled, uses this namespace for registering HS2\n" + + "instances with zookeeper"), HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE("hive.server2.tez.interactive.queue", "", "A single YARN queues to use for Hive Interactive sessions. When this is specified,\n" + "workload management is enabled and used for these sessions."), diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java new file mode 100644 index 0000000..861f4d2 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.URL; +import java.sql.Connection; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.curator.test.TestingServer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hive.jdbc.miniHS2.MiniHS2; +import org.apache.hive.service.server.HS2ActivePassiveHARegistry; +import org.apache.hive.service.server.HS2ActivePassiveHARegistryClient; +import org.apache.hive.service.server.HiveServer2Instance; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestActivePassiveHA { + private MiniHS2 miniHS2_1 = null; + private MiniHS2 miniHS2_2 = null; + private static TestingServer zkServer; + private Connection hs2Conn = null; + private HiveConf hiveConf1; + private HiveConf hiveConf2; + + @BeforeClass + public static void beforeTest() throws Exception { + MiniHS2.cleanupLocalDir(); + zkServer = new TestingServer(); + Class.forName(MiniHS2.getJdbcDriverName()); + } + + @AfterClass + public static void afterTest() throws Exception { + if (zkServer != null) { + zkServer.close(); + zkServer = null; + } + MiniHS2.cleanupLocalDir(); + } + + @Before + public void setUp() throws Exception { + hiveConf1 = new HiveConf(); + hiveConf1.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + // Set up zookeeper dynamic service discovery configs + setHAConfigs(hiveConf1); + miniHS2_1 = new MiniHS2.Builder().withConf(hiveConf1).cleanupLocalDirOnStartup(false).build(); + hiveConf2 = new HiveConf(); + hiveConf2.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + // Set up zookeeper dynamic service discovery configs + setHAConfigs(hiveConf2); + miniHS2_2 = new MiniHS2.Builder().withConf(hiveConf2).cleanupLocalDirOnStartup(false).build(); + } + + @After + public void tearDown() throws Exception { + if (hs2Conn != null) { + hs2Conn.close(); + } + if ((miniHS2_1 != null) && miniHS2_1.isStarted()) { + miniHS2_1.stop(); + } + if ((miniHS2_2 != null) && miniHS2_2.isStarted()) { + miniHS2_2.stop(); + } + } + + private static void setHAConfigs(Configuration conf) { + conf.setBoolean(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY.varname, true); + conf.set(ConfVars.HIVE_ZOOKEEPER_QUORUM.varname, zkServer.getConnectString()); + final String zkRootNamespace = "hs2test"; + conf.set(ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE.varname, zkRootNamespace); + conf.setBoolean(ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE.varname, true); + conf.setTimeDuration(ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT.varname, 2, TimeUnit.SECONDS); + conf.setTimeDuration(ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME.varname, 100, TimeUnit.MILLISECONDS); + conf.setInt(ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES.varname, 1); + } + + @Test(timeout = 60000) + public void testActivePassive() throws Exception { + Map confOverlay = new HashMap<>(); + final Exception[] e1 = new Exception[1]; + Thread t1 = new Thread(() -> { + try { + miniHS2_1.start(confOverlay); + } catch (Exception e) { + e1[0] = e; + } + }); + t1.start(); + t1.join(); + + assertNull(e1[0]); + while(!miniHS2_1.isLeader()) { + Thread.sleep(100); + } + assertEquals(true, miniHS2_1.isLeader()); + String url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; + assertEquals("true", sendGet(url)); + + final Exception[] e2 = new Exception[1]; + Thread t2 = new Thread(() -> { + try { + miniHS2_2.start(confOverlay); + } catch (Exception e) { + e2[0] = e; + } + }); + + t2.start(); + t2.join(); + assertNull(e2[0]); + while(miniHS2_2.isLeader()) { + Thread.sleep(100); + } + assertEquals(false, miniHS2_2.isLeader()); + url = "http://localhost:" + hiveConf2.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; + assertEquals("false", sendGet(url)); + + Configuration conf = new Configuration(); + setHAConfigs(conf); + HS2ActivePassiveHARegistry client = HS2ActivePassiveHARegistryClient.getClient(conf); + List hs2Instances = new ArrayList<>(client.getAll()); + assertEquals(2, hs2Instances.size()); + List leaders = new ArrayList<>(); + List standby = new ArrayList<>(); + for (HiveServer2Instance instance : hs2Instances) { + if (instance.isLeader()) { + leaders.add(instance); + } else { + standby.add(instance); + } + } + assertEquals(1, leaders.size()); + assertEquals(1, standby.size()); + + miniHS2_1.stop(); + + while(!miniHS2_2.isLeader()) { + Thread.sleep(100); + } + assertEquals(true, miniHS2_2.isLeader()); + url = "http://localhost:" + hiveConf2.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; + assertEquals("true", sendGet(url)); + + // give some time for the path children cache to be updated after failover + Thread.sleep(3000); + + client = HS2ActivePassiveHARegistryClient.getClient(conf); + hs2Instances = new ArrayList<>(client.getAll()); + assertEquals(1, hs2Instances.size()); + leaders = new ArrayList<>(); + standby = new ArrayList<>(); + for (HiveServer2Instance instance : hs2Instances) { + if (instance.isLeader()) { + leaders.add(instance); + } else { + standby.add(instance); + } + } + assertEquals(1, leaders.size()); + assertEquals(0, standby.size()); + + + // start 1st instance again + t1 = new Thread(() -> { + try { + miniHS2_1.start(confOverlay); + } catch (Exception e) { + e1[0] = e; + } + }); + t1.start(); + t1.join(); + + while(miniHS2_1.isLeader()) { + Thread.sleep(100); + } + assertEquals(false, miniHS2_1.isLeader()); + url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; + assertEquals("false", sendGet(url)); + + // give some time for the path children cache to receive updates + Thread.sleep(3000); + + client = HS2ActivePassiveHARegistryClient.getClient(conf); + hs2Instances = new ArrayList<>(client.getAll()); + assertEquals(2, hs2Instances.size()); + leaders = new ArrayList<>(); + standby = new ArrayList<>(); + for (HiveServer2Instance instance : hs2Instances) { + if (instance.isLeader()) { + leaders.add(instance); + } else { + standby.add(instance); + } + } + assertEquals(1, leaders.size()); + assertEquals(1, standby.size()); + } + + private String sendGet(String url) throws Exception { + URL obj = new URL(url); + HttpURLConnection con = (HttpURLConnection) obj.openConnection(); + con.setRequestMethod("GET"); + BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream())); + String inputLine; + StringBuilder response = new StringBuilder(); + while ((inputLine = in.readLine()) != null) { + response.append(inputLine); + } + in.close(); + return response.toString(); + } +} \ No newline at end of file diff --git a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/AbstractHiveService.java b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/AbstractHiveService.java index 6cab8cd..d21b764 100644 --- a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/AbstractHiveService.java +++ b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/AbstractHiveService.java @@ -35,14 +35,16 @@ private String hostname; private int binaryPort; private int httpPort; + private int webPort; private boolean startedHiveService = false; private List addedProperties = new ArrayList(); - public AbstractHiveService(HiveConf hiveConf, String hostname, int binaryPort, int httpPort) { + public AbstractHiveService(HiveConf hiveConf, String hostname, int binaryPort, int httpPort, int webPort) { this.hiveConf = hiveConf; this.hostname = hostname; this.binaryPort = binaryPort; this.httpPort = httpPort; + this.webPort = webPort; } /** @@ -136,6 +138,10 @@ public int getHttpPort() { return httpPort; } + public int getWebPort() { + return webPort; + } + public boolean isStarted() { return startedHiveService; } diff --git a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java index 8bbf8a4..997726c 100644 --- a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java +++ b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java @@ -217,6 +217,8 @@ private MiniHS2(HiveConf hiveConf, MiniClusterType miniClusterType, boolean useM (usePortsFromConf ? hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT) : MetaStoreTestUtils .findFreePort()), (usePortsFromConf ? hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT) : MetaStoreTestUtils + .findFreePort()), + (usePortsFromConf ? hiveConf.getIntVar(ConfVars.HIVE_SERVER2_WEBUI_PORT) : MetaStoreTestUtils .findFreePort())); hiveConf.setLongVar(ConfVars.HIVE_SERVER2_MAX_START_ATTEMPTS, 3l); hiveConf.setTimeVar(ConfVars.HIVE_SERVER2_SLEEP_INTERVAL_BETWEEN_START_ATTEMPTS, 10, @@ -306,6 +308,7 @@ private MiniHS2(HiveConf hiveConf, MiniClusterType miniClusterType, boolean useM hiveConf.setVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST, getHost()); hiveConf.setIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT, getBinaryPort()); hiveConf.setIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT, getHttpPort()); + hiveConf.setIntVar(ConfVars.HIVE_SERVER2_WEBUI_PORT, getWebPort()); Path scratchDir = new Path(baseFsDir, "scratch"); // Create root scratchdir with write all, so that user impersonation has no issues. @@ -404,6 +407,10 @@ public void cleanup() { } + public boolean isLeader() { + return hiveServer2.isLeader(); + } + public CLIServiceClient getServiceClient() { verifyStarted(); return getServiceClientInternal(); diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java index 5d7f813..6178b4b 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java @@ -14,13 +14,16 @@ package org.apache.hadoop.hive.llap.registry; import java.io.IOException; + +import org.apache.hadoop.hive.registry.ServiceInstance; +import org.apache.hadoop.hive.registry.ServiceInstanceSet; import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; import org.apache.hadoop.yarn.api.records.ApplicationId; /** * ServiceRegistry interface for switching between fixed host and dynamic registry implementations. */ -public interface ServiceRegistry { +public interface ServiceRegistry { /** * Start the service registry @@ -49,14 +52,14 @@ * @param clusterReadyTimeoutMs The time to wait for the cluster to be ready, if it's not * started yet. 0 means do not wait. */ - LlapServiceInstanceSet getInstances(String component, long clusterReadyTimeoutMs) throws IOException; + ServiceInstanceSet getInstances(String component, long clusterReadyTimeoutMs) throws + IOException; /** * Adds state change listeners for service instances. * @param listener - state change listener */ - void registerStateChangeListener( - ServiceInstanceStateChangeListener listener) throws IOException; + void registerStateChangeListener(ServiceInstanceStateChangeListener listener) throws IOException; /** * @return The application ID of the LLAP cluster. diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java index c88198f..f99d86c 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.ServiceRegistry; +import org.apache.hadoop.hive.registry.ServiceInstance; import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.StringUtils; @@ -267,8 +268,7 @@ public LlapServiceInstanceSet getInstances(String component, long timeoutMs) thr } @Override - public void registerStateChangeListener( - final ServiceInstanceStateChangeListener listener) { + public void registerStateChangeListener(final ServiceInstanceStateChangeListener listener) throws IOException { // nothing to set LOG.warn("Callbacks for instance state changes are not supported in fixed registry."); } diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java index 80a6aba..6d0a552 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.ServiceRegistry; +import org.apache.hadoop.hive.registry.ServiceInstanceSet; import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; import org.apache.hadoop.registry.client.binding.RegistryUtils; import org.apache.hadoop.service.AbstractService; @@ -35,7 +36,7 @@ private static final Logger LOG = LoggerFactory.getLogger(LlapRegistryService.class); - private ServiceRegistry registry = null; + private ServiceRegistry registry = null; private final boolean isDaemon; private boolean isDynamic = false; private String identity = "(pending)"; @@ -131,11 +132,11 @@ private void unregisterWorker() throws IOException { } } - public LlapServiceInstanceSet getInstances() throws IOException { + public ServiceInstanceSet getInstances() throws IOException { return getInstances(0); } - public LlapServiceInstanceSet getInstances(long clusterReadyTimeoutMs) throws IOException { + public ServiceInstanceSet getInstances(long clusterReadyTimeoutMs) throws IOException { return this.registry.getInstances("LLAP", clusterReadyTimeoutMs); } diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java index 8339230..b5a79c6 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java @@ -53,7 +53,7 @@ import org.slf4j.LoggerFactory; public class LlapZookeeperRegistryImpl - extends ZkRegistryBase implements ServiceRegistry { + extends ZkRegistryBase implements ServiceRegistry { private static final Logger LOG = LoggerFactory.getLogger(LlapZookeeperRegistryImpl.class); /** @@ -79,7 +79,7 @@ public LlapZookeeperRegistryImpl(String instanceName, Configuration conf) { super(instanceName, conf, HiveConf.getVar(conf, ConfVars.LLAP_ZK_REGISTRY_NAMESPACE), NAMESPACE_PREFIX, - USER_SCOPE_PATH_PREFIX, WORKER_PREFIX, + USER_SCOPE_PATH_PREFIX, WORKER_PREFIX, WORKER_GROUP, LlapProxy.isDaemon() ? SASL_LOGIN_CONTEXT_NAME : null, HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL), HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE), @@ -327,9 +327,9 @@ private static String extractWorkerIdFromSlot(ChildData childData) { if (data == null) continue; String nodeName = extractNodeName(childData); if (nodeName.startsWith(WORKER_PREFIX)) { - Set instances = getInstancesByPath(childData.getPath()); + LlapServiceInstance instances = getInstancesByPath(childData.getPath()); if (instances != null) { - unsorted.addAll(instances); + unsorted.add(instances); } } else if (nodeName.startsWith(SLOT_PREFIX)) { slotByWorker.put(extractWorkerIdFromSlot(childData), diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java index 32d5caa..3208e21 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; +import org.apache.hadoop.hive.registry.ServiceInstanceSet; import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; @@ -55,7 +56,7 @@ private final SocketFactory socketFactory; private final RetryPolicy retryPolicy; private final Configuration conf; - private LlapServiceInstanceSet activeInstances; + private ServiceInstanceSet activeInstances; private Collection lastKnownInstances; private LlapManagementProtocolClientImpl client; private LlapServiceInstance clientInstance; diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/RegistryUtilities.java b/llap-client/src/java/org/apache/hadoop/hive/registry/RegistryUtilities.java new file mode 100644 index 0000000..e069e43 --- /dev/null +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/RegistryUtilities.java @@ -0,0 +1,52 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.registry; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.UUID; + +public class RegistryUtilities { + private static final String LOCALHOST = "localhost"; + + /** + * Will return hostname stored in InetAddress. + * + * @return hostname + */ + public static String getHostName() { + try { + return InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + return LOCALHOST; + } + } + + /** + * Will return FQDN of the host after doing reverse DNS lookip. + * + * @return FQDN of host + */ + public static String getCanonicalHostName() { + try { + return InetAddress.getLocalHost().getCanonicalHostName(); + } catch (UnknownHostException e) { + return LOCALHOST; + } + } + + public static String getUUID() { + return String.valueOf(UUID.randomUUID()); + } +} \ No newline at end of file diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java index 908b3bb..4493e99 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java @@ -21,27 +21,27 @@ * Worker identity is a UUID (unique across restarts), to identify a node which died & was brought * back on the same host/port */ - public abstract String getWorkerIdentity(); + String getWorkerIdentity(); /** * Hostname of the service instance * - * @return + * @return service hostname */ - public abstract String getHost(); + String getHost(); /** * RPC Endpoint for service instance - * - * @return + * + * @return rpc port */ - public int getRpcPort(); + int getRpcPort(); /** * Config properties of the Service Instance (llap.daemon.*) - * - * @return + * + * @return properties */ - public abstract Map getProperties(); + Map getProperties(); } \ No newline at end of file diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java index 34fba5c..63178cc 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java @@ -29,15 +29,15 @@ * The worker identity does not collide between restarts, so each restart will have a unique id, * while having the same host/ip pair. * - * @return + * @return instance list */ Collection getAll(); /** * Get an instance by worker identity. * - * @param name - * @return + * @param name worker id + * @return instance */ InstanceType getInstance(String name); @@ -46,13 +46,13 @@ * * The list could include dead and alive instances. * - * @param host - * @return + * @param host hostname + * @return instance list */ Set getByHost(String host); /** - * Get number of instances in the currently availabe. + * Get number of instances in the currently available. * * @return - number of instances */ diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java index db3d788..b663b8e 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java @@ -15,6 +15,8 @@ import java.io.IOException; import java.util.Map; +import java.util.Objects; + import org.apache.hadoop.hive.registry.ServiceInstance; import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; import org.apache.hadoop.registry.client.types.AddressTypes; @@ -38,13 +40,12 @@ public ServiceInstanceBase(ServiceRecord srv, String rpcName) throws IOException } final Endpoint rpc = srv.getInternalEndpoint(rpcName); - this.host = - RegistryTypeUtils.getAddressField(rpc.addresses.get(0), - AddressTypes.ADDRESS_HOSTNAME_FIELD); + RegistryTypeUtils.getAddressField(rpc.addresses.get(0), + AddressTypes.ADDRESS_HOSTNAME_FIELD); this.rpcPort = - Integer.parseInt(RegistryTypeUtils.getAddressField(rpc.addresses.get(0), - AddressTypes.ADDRESS_PORT_FIELD)); + Integer.parseInt(RegistryTypeUtils.getAddressField(rpc.addresses.get(0), + AddressTypes.ADDRESS_PORT_FIELD)); } @Override @@ -57,12 +58,14 @@ public boolean equals(Object o) { } ServiceInstanceBase other = (ServiceInstanceBase) o; - return this.getWorkerIdentity().equals(other.getWorkerIdentity()); + return Objects.equals(getWorkerIdentity(), other.getWorkerIdentity()) + && Objects.equals(host, other.host) + && Objects.equals(rpcPort, other.rpcPort); } @Override public int hashCode() { - return getWorkerIdentity().hashCode(); + return getWorkerIdentity().hashCode() + host.hashCode() + (31 * rpcPort); } @Override @@ -88,6 +91,6 @@ public int getRpcPort() { @Override public String toString() { return "DynamicServiceInstance [id=" + getWorkerIdentity() + ", host=" - + host + ":" + rpcPort + "]"; + + host + ":" + rpcPort + "]"; } } \ No newline at end of file diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java index 0724cf5..6469adf 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java @@ -13,29 +13,26 @@ */ package org.apache.hadoop.hive.registry.impl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.IOException; import org.apache.commons.codec.binary.Base64; - -import com.google.common.io.ByteStreams; - -import org.apache.tez.common.security.JobTokenIdentifier; - -import org.apache.hadoop.security.token.Token; - -import java.io.IOException; import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; import org.apache.hadoop.registry.client.types.AddressTypes; import org.apache.hadoop.registry.client.types.Endpoint; import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.security.token.Token; +import org.apache.tez.common.security.JobTokenIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.io.ByteStreams; public class TezAmInstance extends ServiceInstanceBase { private static final Logger LOG = LoggerFactory.getLogger(TezAmInstance.class); private final int pluginPort; private Token token; - public TezAmInstance(ServiceRecord srv) throws IOException { + TezAmInstance(ServiceRecord srv) throws IOException { super(srv, TezAmRegistryImpl.IPC_TEZCLIENT); final Endpoint plugin = srv.getInternalEndpoint(TezAmRegistryImpl.IPC_PLUGIN); if (plugin != null) { diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java index 417e571..ab02cf4 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java @@ -37,21 +37,19 @@ static final String AM_SESSION_ID = "am.session.id", AM_PLUGIN_TOKEN = "am.plugin.token", AM_PLUGIN_JOBID = "am.plugin.jobid"; private final static String NAMESPACE_PREFIX = "tez-am-"; - private final static String USER_SCOPE_PATH_PREFIX = "user-"; - private static final String WORKER_PREFIX = "worker-"; private static final String SASL_LOGIN_CONTEXT_NAME = "TezAmZooKeeperClient"; private final String registryName; - public static TezAmRegistryImpl create(Configuration conf, boolean b) { + public static TezAmRegistryImpl create(Configuration conf, boolean useSecureZk) { String amRegistryName = HiveConf.getVar(conf, ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME); return StringUtils.isBlank(amRegistryName) ? null - : new TezAmRegistryImpl(amRegistryName, conf, true); + : new TezAmRegistryImpl(amRegistryName, conf, useSecureZk); } private TezAmRegistryImpl(String instanceName, Configuration conf, boolean useSecureZk) { - super(instanceName, conf, null, NAMESPACE_PREFIX, USER_SCOPE_PATH_PREFIX, WORKER_PREFIX, + super(instanceName, conf, null, NAMESPACE_PREFIX, USER_SCOPE_PATH_PREFIX, WORKER_PREFIX, WORKER_GROUP, useSecureZk ? SASL_LOGIN_CONTEXT_NAME : null, HiveConf.getVar(conf, ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_PRINCIPAL), HiveConf.getVar(conf, ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_KEYTAB_FILE), diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java index 17269dd..04a70c5 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java @@ -13,13 +13,7 @@ */ package org.apache.hadoop.hive.registry.impl; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; @@ -34,6 +28,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; + import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.ACLProvider; @@ -50,6 +45,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.LlapUtil; +import org.apache.hadoop.hive.registry.RegistryUtilities; import org.apache.hadoop.hive.registry.ServiceInstance; import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; import org.apache.hadoop.registry.client.binding.RegistryUtils; @@ -65,6 +61,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * This is currently used for implementation inheritance only; it doesn't provide a unified flow * into which one can just plug a few abstract method implementations, because providing one with @@ -77,16 +79,19 @@ private static final Logger LOG = LoggerFactory.getLogger(ZkRegistryBase.class); private final static String SASL_NAMESPACE = "sasl"; private final static String UNSECURE_NAMESPACE = "unsecure"; - - static final String UNIQUE_IDENTIFIER = "registry.unique.id"; - private static final UUID uniq = UUID.randomUUID(); + final static String USER_SCOPE_PATH_PREFIX = "user-"; + static final String WORKER_PREFIX = "worker-"; + protected static final String WORKER_GROUP = "workers"; + public static final String UNIQUE_IDENTIFIER = "registry.unique.id"; + protected static final UUID UNIQUE_ID = UUID.randomUUID(); + private static final Joiner PATH_JOINER = Joiner.on("/").skipNulls(); protected final Configuration conf; protected final CuratorFramework zooKeeperClient; // userPathPrefix is the path specific to the user for which ACLs should be restrictive. // workersPath is the directory path where all the worker znodes are located. protected final String workersPath; - private final String userPathPrefix, workerNodePrefix; + private final String workerNodePrefix; protected final ServiceRecordMarshal encoder; // to marshal/unmarshal znode data @@ -99,7 +104,9 @@ private final String disableMessage; private final Lock instanceCacheLock = new ReentrantLock(); - private final Map> pathToInstanceCache; + // there can be only one instance per path + private final Map pathToInstanceCache; + // there can be multiple instances per node private final Map> nodeToInstanceCache; // The registration znode. @@ -109,29 +116,22 @@ private PathChildrenCache instancesCache; // Created on demand. /** Local hostname. */ - protected static final String hostname; - static { - String localhost = "localhost"; - try { - localhost = InetAddress.getLocalHost().getCanonicalHostName(); - } catch (UnknownHostException uhe) { - // ignore - } - hostname = localhost; - } + protected static final String hostname = RegistryUtilities.getCanonicalHostName(); /** * @param rootNs A single root namespace override. Not recommended. - * @param nsPrefix The namespace prefix to use with default namespaces. + * @param nsPrefix The namespace prefix to use with default namespaces (appends 'sasl' for secure else 'unsecure' + * to namespace prefix to get effective root namespace). * @param userScopePathPrefix The prefix to use for the user-specific part of the path. * @param workerPrefix The prefix to use for each worker znode. + * @param workerGroup group name to use for all workers * @param zkSaslLoginContextName SASL login context name for ZK security; null if not needed. * @param zkPrincipal ZK security principal. * @param zkKeytab ZK security keytab. * @param aclsConfig A config setting to use to determine if ACLs should be verified. */ public ZkRegistryBase(String instanceName, Configuration conf, String rootNs, String nsPrefix, - String userScopePathPrefix, String workerPrefix, + String userScopePathPrefix, String workerPrefix, String workerGroup, String zkSaslLoginContextName, String zkPrincipal, String zkKeytab, ConfVars aclsConfig) { this.conf = new Configuration(conf); this.saslLoginContextName = zkSaslLoginContextName; @@ -145,29 +145,51 @@ public ZkRegistryBase(String instanceName, Configuration conf, String rootNs, St this.disableMessage = ""; } this.conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); - String zkEnsemble = getQuorumServers(this.conf); this.encoder = new RegistryUtils.ServiceRecordMarshal(); - int sessionTimeout = (int) HiveConf.getTimeVar(conf, - ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS); - int baseSleepTime = (int) HiveConf.getTimeVar(conf, - ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS); - int maxRetries = HiveConf.getIntVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES); // sample path: /llap-sasl/hiveuser/hostname/workers/worker-0000000 // worker-0000000 is the sequence number which will be retained until session timeout. If a // worker does not respond due to communication interruptions it will retain the same sequence // number when it returns back. If session timeout expires, the node will be deleted and new // addition of the same node (restart) will get next sequence number - this.userPathPrefix = userScopePathPrefix + getZkPathUser(this.conf); - this.workerNodePrefix = workerPrefix; - this.workersPath = "/" + userPathPrefix + "/" + instanceName + "/workers"; + final String userPathPrefix = userScopePathPrefix == null ? null : userScopePathPrefix + getZkPathUser(conf); + this.workerNodePrefix = workerPrefix == null ? WORKER_PREFIX : workerPrefix; + this.workersPath = "/" + PATH_JOINER.join(userPathPrefix, instanceName, workerGroup); this.instancesCache = null; this.stateChangeListeners = new HashSet<>(); this.pathToInstanceCache = new ConcurrentHashMap<>(); this.nodeToInstanceCache = new ConcurrentHashMap<>(); + final String namespace = getRootNamespace(rootNs, nsPrefix); + ACLProvider aclProvider; + // get acl provider for most outer path that is non-null + if (userPathPrefix == null) { + if (instanceName == null) { + if (workerGroup == null) { + aclProvider = getACLProviderForZKPath(namespace); + } else { + aclProvider = getACLProviderForZKPath(workerGroup); + } + } else { + aclProvider = getACLProviderForZKPath(instanceName); + } + } else { + aclProvider = getACLProviderForZKPath(userScopePathPrefix); + } + this.zooKeeperClient = getZookeeperClient(conf, namespace, aclProvider); + } + public static String getRootNamespace(String userProvidedNamespace, String defaultNamespacePrefix) { final boolean isSecure = UserGroupInformation.isSecurityEnabled(); - ACLProvider zooKeeperAclProvider = new ACLProvider() { + String rootNs = userProvidedNamespace; + if (rootNs == null) { + rootNs = defaultNamespacePrefix + (isSecure ? SASL_NAMESPACE : UNSECURE_NAMESPACE); + } + return rootNs; + } + + private ACLProvider getACLProviderForZKPath(String zkPath) { + final boolean isSecure = UserGroupInformation.isSecurityEnabled(); + return new ACLProvider() { @Override public List getDefaultAcl() { // We always return something from getAclForPath so this should not happen. @@ -177,31 +199,40 @@ public ZkRegistryBase(String instanceName, Configuration conf, String rootNs, St @Override public List getAclForPath(String path) { - if (!isSecure || path == null || !path.contains(userPathPrefix)) { + if (!isSecure || path == null || !path.contains(zkPath)) { // No security or the path is below the user path - full access. return Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE); } return createSecureAcls(); } }; - if (rootNs == null) { - rootNs = nsPrefix + (isSecure ? SASL_NAMESPACE : UNSECURE_NAMESPACE); // The normal path. - } + } + + private CuratorFramework getZookeeperClient(Configuration conf, String namespace, ACLProvider zooKeeperAclProvider) { + String zkEnsemble = getQuorumServers(conf); + int sessionTimeout = (int) HiveConf.getTimeVar(conf, + ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS); + int connectionTimeout = (int) HiveConf.getTimeVar(conf, + ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS); + int baseSleepTime = (int) HiveConf.getTimeVar(conf, + ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS); + int maxRetries = HiveConf.getIntVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES); // Create a CuratorFramework instance to be used as the ZooKeeper client // Use the zooKeeperAclProvider to create appropriate ACLs - this.zooKeeperClient = CuratorFrameworkFactory.builder() - .connectString(zkEnsemble) - .sessionTimeoutMs(sessionTimeout) - .aclProvider(zooKeeperAclProvider) - .namespace(rootNs) - .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)) - .build(); + return CuratorFrameworkFactory.builder() + .connectString(zkEnsemble) + .sessionTimeoutMs(sessionTimeout) + .connectionTimeoutMs(connectionTimeout) + .aclProvider(zooKeeperAclProvider) + .namespace(namespace) + .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)) + .build(); } private static List createSecureAcls() { // Read all to the world - List nodeAcls = new ArrayList(ZooDefs.Ids.READ_ACL_UNSAFE); + List nodeAcls = new ArrayList<>(ZooDefs.Ids.READ_ACL_UNSAFE); // Create/Delete/Write/Admin to creator nodeAcls.addAll(ZooDefs.Ids.CREATOR_ALL_ACL); return nodeAcls; @@ -211,9 +242,9 @@ public ZkRegistryBase(String instanceName, Configuration conf, String rootNs, St * Get the ensemble server addresses from the configuration. The format is: host1:port, * host2:port.. * - * @param conf + * @param conf configuration **/ - private String getQuorumServers(Configuration conf) { + private static String getQuorumServers(Configuration conf) { String[] hosts = conf.getTrimmedStrings(ConfVars.HIVE_ZOOKEEPER_QUORUM.varname); String port = conf.get(ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.varname, ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.getDefaultValue()); @@ -238,7 +269,7 @@ private String getQuorumServers(Configuration conf) { protected final String registerServiceRecord(ServiceRecord srv) throws IOException { // restart sensitive instance id - srv.set(UNIQUE_IDENTIFIER, uniq.toString()); + srv.set(UNIQUE_IDENTIFIER, UNIQUE_ID.toString()); // Create a znode under the rootNamespace parent for this instance of the server try { @@ -275,11 +306,28 @@ protected final String registerServiceRecord(ServiceRecord srv) throws IOExcepti CloseableUtils.closeQuietly(znode); throw (e instanceof IOException) ? (IOException)e : new IOException(e); } - return uniq.toString(); + return UNIQUE_ID.toString(); } + protected final void updateServiceRecord(ServiceRecord srv) throws IOException { + try { + znode.setData(encoder.toBytes(srv)); - protected final void initializeWithoutRegisteringInternal() throws IOException { + if (doCheckAcls) { + try { + checkAndSetAcls(); + } catch (Exception ex) { + throw new IOException("Error validating or setting ACLs. " + disableMessage, ex); + } + } + } catch (Exception e) { + LOG.error("Unable to update znode with new service record", e); + CloseableUtils.closeQuietly(znode); + throw (e instanceof IOException) ? (IOException) e : new IOException(e); + } + } + + final void initializeWithoutRegisteringInternal() throws IOException { // Create a znode under the rootNamespace parent for this instance of the server try { try { @@ -345,8 +393,8 @@ private void setUpAcls(String path) throws Exception { private void addToCache(String path, String host, InstanceType instance) { instanceCacheLock.lock(); try { - putInCache(path, pathToInstanceCache, instance); - putInCache(host, nodeToInstanceCache, instance); + putInInstanceCache(path, pathToInstanceCache, instance); + putInNodeCache(host, nodeToInstanceCache, instance); } finally { instanceCacheLock.unlock(); } @@ -368,14 +416,19 @@ private void removeFromCache(String path, String host) { path, host, pathToInstanceCache.size(), nodeToInstanceCache.size()); } - private void putInCache(String key, Map> cache, + private void putInInstanceCache(String key, Map cache, InstanceType instance) { + cache.put(key, instance); + } + + private void putInNodeCache(String key, Map> cache, + InstanceType instance) { Set instanceSet = cache.get(key); if (instanceSet == null) { - instanceSet = Sets.newHashSet(); - cache.put(key, instanceSet); + instanceSet = new HashSet<>(); + instanceSet.add(instance); } - instanceSet.add(instance); + cache.put(key, instanceSet); } protected final void populateCache(PathChildrenCache instancesCache, boolean doInvokeListeners) { @@ -403,7 +456,7 @@ protected final void populateCache(PathChildrenCache instancesCache, boolean doI protected abstract InstanceType createServiceInstance(ServiceRecord srv) throws IOException; - protected static final byte[] getWorkerData(ChildData childData, String workerNodePrefix) { + protected static byte[] getWorkerData(ChildData childData, String workerNodePrefix) { if (childData == null) return null; byte[] data = childData.getData(); if (data == null) return null; @@ -415,8 +468,7 @@ protected final void populateCache(PathChildrenCache instancesCache, boolean doI private final Logger LOG = LoggerFactory.getLogger(InstanceStateChangeListener.class); @Override - public void childEvent(final CuratorFramework client, final PathChildrenCacheEvent event) - throws Exception { + public void childEvent(final CuratorFramework client, final PathChildrenCacheEvent event) { Preconditions.checkArgument(client != null && client.getState() == CuratorFrameworkState.STARTED, "client is not started"); @@ -427,28 +479,30 @@ public void childEvent(final CuratorFramework client, final PathChildrenCacheEve if (!nodeName.startsWith(workerNodePrefix)) return; LOG.info("{} for zknode {}", event.getType(), childData.getPath()); InstanceType instance = extractServiceInstance(event, childData); - int ephSeqVersion = extractSeqNum(nodeName); - switch (event.getType()) { - case CHILD_ADDED: - addToCache(childData.getPath(), instance.getHost(), instance); - for (ServiceInstanceStateChangeListener listener : stateChangeListeners) { - listener.onCreate(instance, ephSeqVersion); - } - break; - case CHILD_UPDATED: - addToCache(childData.getPath(), instance.getHost(), instance); - for (ServiceInstanceStateChangeListener listener : stateChangeListeners) { - listener.onUpdate(instance, ephSeqVersion); - } - break; - case CHILD_REMOVED: - removeFromCache(childData.getPath(), instance.getHost()); - for (ServiceInstanceStateChangeListener listener : stateChangeListeners) { - listener.onRemove(instance, ephSeqVersion); + if (instance != null) { + int ephSeqVersion = extractSeqNum(nodeName); + switch (event.getType()) { + case CHILD_ADDED: + addToCache(childData.getPath(), instance.getHost(), instance); + for (ServiceInstanceStateChangeListener listener : stateChangeListeners) { + listener.onCreate(instance, ephSeqVersion); + } + break; + case CHILD_UPDATED: + addToCache(childData.getPath(), instance.getHost(), instance); + for (ServiceInstanceStateChangeListener listener : stateChangeListeners) { + listener.onUpdate(instance, ephSeqVersion); + } + break; + case CHILD_REMOVED: + removeFromCache(childData.getPath(), instance.getHost()); + for (ServiceInstanceStateChangeListener listener : stateChangeListeners) { + listener.onRemove(instance, ephSeqVersion); + } + break; + default: + // Ignore all the other events; logged above. } - break; - default: - // Ignore all the other events; logged above. } } } @@ -464,7 +518,7 @@ protected final int sizeInternal() { protected final Set getByHostInternal(String host) { Set byHost = nodeToInstanceCache.get(host); - byHost = (byHost == null) ? Sets.newHashSet() : byHost; + byHost = (byHost == null) ? Sets.newHashSet() : byHost; if (LOG.isDebugEnabled()) { LOG.debug("Returning " + byHost.size() + " hosts for locality allocation on " + host); } @@ -472,11 +526,7 @@ protected final int sizeInternal() { } protected final Collection getAllInternal() { - Set instances = new HashSet<>(); - for(Set instanceSet : pathToInstanceCache.values()) { - instances.addAll(instanceSet); - } - return instances; + return new HashSet<>(pathToInstanceCache.values()); } private static String extractNodeName(ChildData childData) { @@ -564,13 +614,17 @@ public void start() throws IOException { CloseableUtils.class.getName(); } + protected void unregisterInternal() { + CloseableUtils.closeQuietly(znode); + } + public void stop() { CloseableUtils.closeQuietly(znode); CloseableUtils.closeQuietly(instancesCache); CloseableUtils.closeQuietly(zooKeeperClient); } - protected final Set getInstancesByPath(String path) { + protected final InstanceType getInstancesByPath(String path) { return pathToInstanceCache.get(path); } diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/metrics/MetricsUtils.java b/llap-common/src/java/org/apache/hadoop/hive/llap/metrics/MetricsUtils.java index 9666517..3adf6e3 100644 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/metrics/MetricsUtils.java +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/metrics/MetricsUtils.java @@ -24,6 +24,7 @@ /** * Utility methods for metrics system. */ +// TODO: replace this with RegistryUtils public class MetricsUtils { private static final String LOCALHOST = "localhost"; public static final String METRICS_PROCESS_NAME = "LlapDaemon"; diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java index 0120639..3aec46b 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; import org.apache.hadoop.hive.llap.tez.Converters; +import org.apache.hadoop.hive.registry.ServiceInstanceSet; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.NullWritable; @@ -343,7 +344,7 @@ private LlapServiceInstance getServiceInstance(JobConf job, LlapInputSplit llapS private LlapServiceInstance getServiceInstanceForHost(LlapRegistryService registryService, String host) throws IOException { InetAddress address = InetAddress.getByName(host); - LlapServiceInstanceSet instanceSet = registryService.getInstances(); + ServiceInstanceSet instanceSet = registryService.getInstances(); LlapServiceInstance serviceInstance = null; // The name used in the service registry may not match the host name we're using. @@ -375,7 +376,7 @@ private LlapServiceInstance getServiceInstanceForHost(LlapRegistryService regist private LlapServiceInstance getServiceInstanceRandom(LlapRegistryService registryService) throws IOException { - LlapServiceInstanceSet instanceSet = registryService.getInstances(); + ServiceInstanceSet instanceSet = registryService.getInstances(); LlapServiceInstance serviceInstance = null; LOG.info("Finding random live service instance"); diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java index 58bf8dc..b944fad 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java @@ -34,8 +34,10 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.llap.registry.impl.LlapZookeeperRegistryImpl; +import org.apache.hadoop.hive.registry.ServiceInstanceSet; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.CompositeService; @@ -230,7 +232,8 @@ public void doGet(HttpServletRequest request, HttpServletResponse response) { } jg.writeStringField("identity", registry.getWorkerIdentity()); jg.writeArrayFieldStart("peers"); - for (LlapServiceInstance s : registry.getInstances().getAllInstancesOrdered(false)) { + ServiceInstanceSet instanceSet = registry.getInstances(); + for (LlapServiceInstance s : ((LlapServiceInstanceSet) instanceSet).getAllInstancesOrdered(false)) { jg.writeStartObject(); jg.writeStringField("identity", s.getWorkerIdentity()); jg.writeStringField("host", s.getHost()); diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index 66de3b8..3e52984 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -14,6 +14,16 @@ package org.apache.hadoop.hive.llap.tezplugins; +import com.google.common.io.ByteArrayDataOutput; + +import org.apache.hadoop.hive.registry.ServiceInstanceSet; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.yarn.api.records.ApplicationId; + +import org.apache.hadoop.hive.registry.impl.TezAmRegistryImpl; + +import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -151,7 +161,7 @@ public void setError(TaskInfo ctx, Throwable t) { private final Configuration conf; // interface into the registry service - private LlapServiceInstanceSet activeInstances; + private ServiceInstanceSet activeInstances; // Tracks all instances, including ones which have been disabled in the past. // LinkedHashMap to provide the same iteration order when selecting a random host. @@ -1362,7 +1372,8 @@ private SelectHostResult selectHost(TaskInfo request) { } /* fall through - miss in locality or no locality-requested */ - Collection instances = activeInstances.getAllInstancesOrdered(true); + Collection instances = ((LlapServiceInstanceSet) activeInstances) + .getAllInstancesOrdered(true); List allNodes = new ArrayList<>(instances.size()); List activeNodesWithFreeSlots = new ArrayList<>(); for (LlapServiceInstance inst : instances) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 82fdf6c..6a29810 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -98,13 +98,18 @@ public static TezSessionPoolManager getInstance() { protected TezSessionPoolManager() { } - public void startPool() throws Exception { + public void startPool(HiveConf conf, final WMFullResourcePlan resourcePlan) throws Exception { if (defaultSessionPool != null) { defaultSessionPool.start(); } if (expirationTracker != null) { expirationTracker.start(); } + initTriggers(conf); + if (resourcePlan != null) { + updateTriggers(resourcePlan); + LOG.info("Updated tez session pool manager with active resource plan: {}", resourcePlan.getPlan().getName()); + } } public void setupPool(HiveConf conf) throws Exception { @@ -157,8 +162,6 @@ public TezSessionPoolSession create(TezSessionPoolSession oldSession) { numConcurrentLlapQueries = conf.getIntVar(ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES); llapQueue = new Semaphore(numConcurrentLlapQueries, true); - initTriggers(conf); - String queueAllowedStr = HiveConf.getVar(initConf, ConfVars.HIVE_SERVER2_TEZ_SESSION_CUSTOM_QUEUE_ALLOWED); try { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index b98fb58..046ea19 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -738,7 +738,10 @@ public LocalResource getAppJarLr() { private Path createTezDir(String sessionId, String suffix) throws IOException { // tez needs its own scratch dir (per session) // TODO: De-link from SessionState. A TezSession can be linked to different Hive Sessions via the pool. - Path tezDir = new Path(SessionState.get().getHdfsScratchDirURIString(), TEZ_DIR); + SessionState sessionState = SessionState.get(); + String hdfsScratchDir = sessionState == null ? HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR) : sessionState + .getHdfsScratchDirURIString(); + Path tezDir = new Path(hdfsScratchDir, TEZ_DIR); tezDir = new Path(tezDir, sessionId + ((suffix == null) ? "" : ("-" + suffix))); FileSystem fs = tezDir.getFileSystem(conf); FsPermission fsPermission = new FsPermission(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIRPERMISSION)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java index b33f027..29bd73f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.split.SplitLocationProvider; @@ -44,7 +45,7 @@ public static SplitLocationProvider getSplitLocationProvider(Configuration conf, LOG.info("Using LLAP instance " + serviceRegistry.getApplicationId()); Collection serviceInstances = - serviceRegistry.getInstances().getAllInstancesOrdered(true); + ((LlapServiceInstanceSet) serviceRegistry.getInstances()).getAllInstancesOrdered(true); Preconditions.checkArgument(!serviceInstances.isEmpty(), "No running LLAP daemons! Please check LLAP service status and zookeeper configuration"); ArrayList locations = new ArrayList<>(serviceInstances.size()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java index a8d729d..0d1990a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.impl.InactiveServiceInstance; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; +import org.apache.hadoop.hive.registry.ServiceInstanceSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -112,7 +113,7 @@ public boolean initClusterInfo() { return false; // Don't fail; this is best-effort. } } - LlapServiceInstanceSet instances; + ServiceInstanceSet instances; try { instances = svc.getInstances(10); } catch (IOException e) { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java index d261623..d5b683f 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java @@ -18,11 +18,14 @@ package org.apache.hadoop.hive.ql.exec.tez; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Random; + import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.junit.Before; @@ -90,7 +93,7 @@ public void testSessionPoolGetInOrder() { poolManager = new TestTezSessionPoolManager(); poolManager.setupPool(conf); - poolManager.startPool(); + poolManager.startPool(conf, null); // this is now a LIFO operation // draw 1 and replace @@ -153,7 +156,7 @@ public void testSessionPoolThreads() { poolManager = new TestTezSessionPoolManager(); poolManager.setupPool(conf); - poolManager.startPool(); + poolManager.startPool(conf, null); TezSessionState[] sessions = new TezSessionState[12]; int[] queueCounts = new int[3]; for (int i = 0; i < sessions.length; ++i) { @@ -234,7 +237,7 @@ public void testLlapSessionQueuing() { conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES, 2); poolManager = new TestTezSessionPoolManager(); poolManager.setupPool(conf); - poolManager.startPool(); + poolManager.startPool(conf, null); } catch (Exception e) { LOG.error("Initialization error", e); fail(); @@ -295,7 +298,7 @@ public void testReturn() { try { poolManager = new TestTezSessionPoolManager(); poolManager.setupPool(conf); - poolManager.startPool(); + poolManager.startPool(conf, null); } catch (Exception e) { e.printStackTrace(); fail(); diff --git a/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java b/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java new file mode 100644 index 0000000..e4f778c --- /dev/null +++ b/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.server; + +import static org.apache.hive.service.server.HiveServer2.INSTANCE_URI_CONFIG; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; + +import org.apache.commons.lang3.StringUtils; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.leader.LeaderLatch; +import org.apache.curator.framework.recipes.leader.LeaderLatchListener; +import org.apache.curator.utils.CloseableUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.registry.ServiceRegistry; +import org.apache.hadoop.hive.registry.ServiceInstanceSet; +import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; +import org.apache.hadoop.hive.registry.impl.ZkRegistryBase; +import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; +import org.apache.hadoop.registry.client.binding.RegistryUtils; +import org.apache.hadoop.registry.client.types.Endpoint; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hive.service.ServiceException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +public class HS2ActivePassiveHARegistry extends ZkRegistryBase implements LeaderLatchListener, + ServiceRegistry, HiveServer2HAInstanceSet { + private static final Logger LOG = LoggerFactory.getLogger(HS2ActivePassiveHARegistry.class); + static final String ACTIVE_ENDPOINT = "activeEndpoint"; + static final String PASSIVE_ENDPOINT = "passiveEndpoint"; + private static final String SASL_LOGIN_CONTEXT_NAME = "HS2ActivePassiveHAZooKeeperClient"; + private static final String INSTANCE_PREFIX = "instance-"; + private static final String INSTANCE_GROUP = "instances"; + private static final String LEADER_LATCH_PATH = "/_LEADER"; + private boolean isLeader; + private LeaderLatch leaderLatch; + private ServiceRecord srv; + private boolean isClient; + + // There are 2 paths under which the instances get registered + // 1) Standard path used by ZkRegistryBase where all instances register themselves (also stores metadata) + // Secure: /hs2ActivePassiveHA-sasl/instances/instance-0000000000 + // Unsecure: /hs2ActivePassiveHA-unsecure/instances/instance-0000000000 + // 2) Leader latch path used for HS2 HA Active/Passive configuration where all instances register under _LEADER + // path but only one among them is the leader + // Secure: /hs2ActivePassiveHA-sasl/_LEADER/xxxx-latch-0000000000 + // Unsecure: /hs2ActivePassiveHA-unsecure/_LEADER/xxxx-latch-0000000000 + static HS2ActivePassiveHARegistry create(Configuration conf, boolean isClient) { + String zkNameSpace = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_REGISTRY_NAMESPACE); + Preconditions.checkArgument(!StringUtils.isBlank(zkNameSpace), + HiveConf.ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_REGISTRY_NAMESPACE.varname + " cannot be null or empty"); + String principal = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL); + String keytab = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB); + String zkNameSpacePrefix = zkNameSpace + "-"; + return new HS2ActivePassiveHARegistry(null, zkNameSpacePrefix, LEADER_LATCH_PATH, principal, keytab, + SASL_LOGIN_CONTEXT_NAME, conf, isClient); + } + + private HS2ActivePassiveHARegistry(final String instanceName, final String zkNamespacePrefix, + final String leaderLatchPath, + final String krbPrincipal, final String krbKeytab, final String saslContextName, final Configuration conf, + final boolean isClient) { + super(instanceName, conf, null, zkNamespacePrefix, null, INSTANCE_PREFIX, INSTANCE_GROUP, + saslContextName, krbPrincipal, krbKeytab, null); + this.isClient = isClient; + leaderLatch = new LeaderLatch(zooKeeperClient, leaderLatchPath, UNIQUE_ID.toString()); + } + + @Override + public void start() throws IOException { + super.start(); + if (!isClient) { + this.srv = getNewServiceRecord(); + register(); + leaderLatch.addListener(this); + try { + // all participating instances uses the same latch path, and curator randomly chooses one instance to be leader + // which can be verified via leaderLatch.hasLeadership() + leaderLatch.start(); + } catch (Exception e) { + throw new IOException(e); + } + LOG.info("Registered HS2 with ZK. service record: {}", srv); + } else { + populateCache(); + } + } + + @Override + public String register() throws IOException { + updateEndpoint(srv, PASSIVE_ENDPOINT); + return registerServiceRecord(srv); + } + + @Override + public void unregister() { + CloseableUtils.closeQuietly(leaderLatch); + unregisterInternal(); + } + + private void populateCache() throws IOException { + PathChildrenCache pcc = ensureInstancesCache(0); + populateCache(pcc, false); + } + + @Override + public ServiceInstanceSet getInstances(final String component, final long clusterReadyTimeoutMs) + throws IOException { + throw new IOException("Not supported to get instances by component name"); + } + + private void addActiveEndpointToServiceRecord() throws IOException { + addEndpointToServiceRecord(getNewServiceRecord(), ACTIVE_ENDPOINT); + } + + private void addPassiveEndpointToServiceRecord() throws IOException { + addEndpointToServiceRecord(getNewServiceRecord(), PASSIVE_ENDPOINT); + } + + private void addEndpointToServiceRecord(final ServiceRecord srv, final String endpointName) throws IOException { + updateEndpoint(srv, endpointName); + updateServiceRecord(srv); + } + + private void updateEndpoint(final ServiceRecord srv, final String endpointName) { + final String instanceUri = srv.get(INSTANCE_URI_CONFIG); + final String[] tokens = instanceUri.split(":"); + final String hostname = tokens[0]; + final int port = Integer.parseInt(tokens[1]); + Endpoint urlEndpoint = RegistryTypeUtils.ipcEndpoint(endpointName, new InetSocketAddress(hostname, port)); + srv.addInternalEndpoint(urlEndpoint); + LOG.info("Added {} endpoint to service record", urlEndpoint); + } + + @Override + public void stop() { + CloseableUtils.closeQuietly(leaderLatch); + super.stop(); + } + + @Override + protected HiveServer2Instance createServiceInstance(final ServiceRecord srv) throws IOException { + Endpoint activeEndpoint = srv.getInternalEndpoint(HS2ActivePassiveHARegistry.ACTIVE_ENDPOINT); + return new HiveServer2Instance(srv, activeEndpoint != null ? ACTIVE_ENDPOINT : PASSIVE_ENDPOINT); + } + + @Override + public synchronized void registerStateChangeListener( + final ServiceInstanceStateChangeListener listener) + throws IOException { + super.registerStateChangeListener(listener); + } + + @Override + public ApplicationId getApplicationId() throws IOException { + throw new IOException("Not supported until HS2 runs as YARN application"); + } + + @Override + protected String getZkPathUser(final Configuration conf) { + return RegistryUtils.currentUser(); + } + + private boolean hasLeadership() { + return leaderLatch.hasLeadership(); + } + + // Do only lightweight actions in main-event handler thread. Time consuming operations are handled via separate + // executor service registered via registerLeaderLatchListener() + @Override + public void isLeader() { + // only leader publishes instance uri as endpoint which will be used by clients to make connections to HS2 via + // service discovery. + if (hasLeadership()) { + if (isLeader) { + LOG.info("Already handled the leadership event, ignoring.."); + return; + } + try { + addActiveEndpointToServiceRecord(); + isLeader = true; + } catch (IOException e) { + throw new ServiceException("Unable to add active endpoint to service record", e); + } + LOG.info("HS2 instance in ACTIVE mode. Service record: {}", srv); + } + } + + @Override + public void notLeader() { + if (!hasLeadership()) { + if (!isLeader) { + LOG.info("Already handled the lost leadership event, ignoring.."); + return; + } + try { + addPassiveEndpointToServiceRecord(); + isLeader = false; + } catch (IOException e) { + throw new ServiceException("Unable to add passive endpoint to service record", e); + } + LOG.info("HS2 instance lost leadership. Switched to PASSIVE standby mode. Service record: {}", srv); + } + } + + @Override + public HiveServer2Instance getLeader() { + for (HiveServer2Instance hs2Instance : getAll()) { + if (hs2Instance.isLeader()) { + return hs2Instance; + } + } + return null; + } + + @Override + public Collection getAll() { + return getAllInternal(); + } + + @Override + public HiveServer2Instance getInstance(final String instanceId) { + for (HiveServer2Instance hs2Instance : getAll()) { + if (hs2Instance.getWorkerIdentity().equals(instanceId)) { + return hs2Instance; + } + } + return null; + } + + @Override + public Set getByHost(final String host) { + return getByHostInternal(host); + } + + @Override + public int size() { + return sizeInternal(); + } + + void registerLeaderLatchListener(final LeaderLatchListener latchListener, final ExecutorService executorService) { + leaderLatch.addListener(latchListener, executorService); + } + + void unregisterLeaderLatchListener(final LeaderLatchListener latchListener) { + leaderLatch.removeListener(latchListener); + } + + private Map getConfsToPublish() { + final Map confsToPublish = new HashMap<>(); + // Hostname + confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname, + conf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname)); + // Hostname:port + confsToPublish.put(INSTANCE_URI_CONFIG, conf.get(INSTANCE_URI_CONFIG)); + confsToPublish.put(UNIQUE_IDENTIFIER, UNIQUE_ID.toString()); + // Transport mode + confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, + conf.get(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname)); + // Transport specific confs + if (HiveServer2.isHTTPTransportMode(conf)) { + confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT.varname, + conf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT.varname)); + confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH.varname, + conf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH.varname)); + } else { + confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, + conf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT.varname)); + confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_SASL_QOP.varname, + conf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_SASL_QOP.varname)); + } + // Auth specific confs + confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION.varname, + conf.get(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION.varname)); + if (HiveServer2.isKerberosAuthMode(conf)) { + confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL.varname, + conf.get(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL.varname)); + } + // SSL conf + confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_USE_SSL.varname, + conf.get(HiveConf.ConfVars.HIVE_SERVER2_USE_SSL.varname)); + return confsToPublish; + } + + private ServiceRecord getNewServiceRecord() { + ServiceRecord srv = new ServiceRecord(); + final Map confsToPublish = getConfsToPublish(); + for (Map.Entry entry : confsToPublish.entrySet()) { + srv.set(entry.getKey(), entry.getValue()); + } + return srv; + } +} diff --git a/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistryClient.java b/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistryClient.java new file mode 100644 index 0000000..512d4e8 --- /dev/null +++ b/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistryClient.java @@ -0,0 +1,54 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package org.apache.hive.service.server; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.registry.impl.ZkRegistryBase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +public class HS2ActivePassiveHARegistryClient { + private static final Logger LOG = LoggerFactory.getLogger(HS2ActivePassiveHARegistryClient.class); + private static final Map hs2Registries = new HashMap<>(); + + /** + * Helper method to get a HiveServer2HARegistry instance to read from the registry. Only used by clients (JDBC), + * service discovery to connect to active HS2 instance in Active/Passive HA configuration. + * + * @param conf {@link Configuration} instance which contains service registry information. + * @return HiveServer2HARegistry + */ + public static synchronized HS2ActivePassiveHARegistry getClient(Configuration conf) throws IOException { + String namespace = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_REGISTRY_NAMESPACE); + Preconditions.checkArgument(!StringUtils.isBlank(namespace), + HiveConf.ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_REGISTRY_NAMESPACE.varname + " cannot be null or empty"); + String nsKey = ZkRegistryBase.getRootNamespace(null, namespace + "-"); + HS2ActivePassiveHARegistry registry = hs2Registries.get(nsKey); + if (registry == null) { + registry = HS2ActivePassiveHARegistry.create(conf, true); + registry.start(); + hs2Registries.put(nsKey, registry); + } else { + LOG.debug("Returning cached registry client for nsKey: {}", nsKey); + } + return registry; + } +} diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index 2a528cd..5c09e57 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -27,10 +27,11 @@ import java.util.Properties; import java.util.Random; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; @@ -38,6 +39,7 @@ import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -45,8 +47,10 @@ import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.framework.recipes.leader.LeaderLatchListener; import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode; import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.JvmPauseMonitor; import org.apache.hadoop.hive.common.LogUtils; import org.apache.hadoop.hive.common.LogUtils.LogInitializationException; @@ -83,6 +87,7 @@ import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService; import org.apache.hive.service.cli.thrift.ThriftCLIService; import org.apache.hive.service.cli.thrift.ThriftHttpCLIService; +import org.apache.hive.service.servlet.HS2LeadershipStatus; import org.apache.hive.service.servlet.QueryProfileServlet; import org.apache.logging.log4j.util.Strings; import org.apache.zookeeper.CreateMode; @@ -98,6 +103,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * HiveServer2. @@ -106,6 +112,8 @@ public class HiveServer2 extends CompositeService { private static CountDownLatch deleteSignal; private static final Logger LOG = LoggerFactory.getLogger(HiveServer2.class); + public static final String INSTANCE_URI_CONFIG = "hive.server2.instance.uri"; + private static final int SHUTDOWN_TIME = 60; private CLIService cliService; private ThriftCLIService thriftCLIService; private PersistentEphemeralNode znode; @@ -115,6 +123,16 @@ private HttpServer webServer; // Web UI private TezSessionPoolManager tezSessionPoolManager; private WorkloadManager wm; + private Map confsToPublish = new HashMap(); + private String serviceUri; + private boolean serviceDiscovery; + private boolean activePassiveHA; + private LeaderLatchListener leaderLatchListener; + private ExecutorService leaderActionsExecutorService; + private HS2ActivePassiveHARegistry hs2HARegistry; + private Hive sessionHive; + private String wmQueue; + private AtomicBoolean isLeader = new AtomicBoolean(false); public HiveServer2() { super(HiveServer2.class.getSimpleName()); @@ -128,13 +146,6 @@ public synchronized void init(HiveConf hiveConf) { if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_METRICS_ENABLED)) { MetricsFactory.init(hiveConf); } - - // will be invoked anyway in TezTask. Doing it early to initialize triggers for non-pool tez session. - tezSessionPoolManager = TezSessionPoolManager.getInstance(); - tezSessionPoolManager.initTriggers(hiveConf); - if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) { - tezSessionPoolManager.setupPool(hiveConf); - } } catch (Throwable t) { LOG.warn("Could not initiate the HiveServer2 Metrics system. Metrics may not be reported.", t); } @@ -179,22 +190,26 @@ public void run() { LlapRegistryService.getClient(hiveConf); } - Hive sessionHive = null; try { sessionHive = Hive.get(hiveConf); } catch (HiveException e) { throw new RuntimeException("Failed to get metastore connection", e); } - initializeWorkloadManagement(hiveConf, sessionHive); - // Create views registry HiveMaterializedViewsRegistry.get().init(); + wmQueue = hiveConf.get(ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE.varname, "").trim(); + + this.serviceDiscovery = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY); + this.activePassiveHA = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE); + // Setup web UI + final int webUIPort; + final String webHost; try { - int webUIPort = - hiveConf.getIntVar(ConfVars.HIVE_SERVER2_WEBUI_PORT); + webUIPort = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_WEBUI_PORT); + webHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_WEBUI_BIND_HOST); // We disable web UI in tests unless the test is explicitly setting a // unique web ui port so that we don't mess up ptests. boolean uiDisabledInTest = hiveConf.getBoolVar(ConfVars.HIVE_IN_TEST) && @@ -208,7 +223,7 @@ public void run() { LOG.info("Starting Web UI on port "+ webUIPort); HttpServer.Builder builder = new HttpServer.Builder("hiveserver2"); builder.setPort(webUIPort).setConf(hiveConf); - builder.setHost(hiveConf.getVar(ConfVars.HIVE_SERVER2_WEBUI_BIND_HOST)); + builder.setHost(webHost); builder.setMaxThreads( hiveConf.getIntVar(ConfVars.HIVE_SERVER2_WEBUI_MAX_THREADS)); builder.setAdmins(hiveConf.getVar(ConfVars.USERS_IN_ADMIN_ROLE)); @@ -245,6 +260,11 @@ public void run() { builder.setSPNEGOKeytab(spnegoKeytab); builder.setUseSPNEGO(true); } + if (serviceDiscovery && activePassiveHA) { + builder.setContextAttribute("hs2.isLeader", isLeader); + builder.addServlet("leader", HS2LeadershipStatus.class); + // TODO: add /peers endpoint to list all HS2 instances with leadership info + } builder.addServlet("llap", LlapServlet.class); builder.setContextRootRewriteTarget("/hiveserver2.jsp"); @@ -255,53 +275,25 @@ public void run() { } catch (IOException ie) { throw new ServiceException(ie); } - // Add a shutdown hook for catching SIGTERM & SIGINT - ShutdownHookManager.addShutdownHook(new Runnable() { - @Override - public void run() { - hiveServer2.stop(); - } - }); - } - private void initializeWorkloadManagement(HiveConf hiveConf, Hive sessionHive) { - String wmQueue = HiveConf.getVar(hiveConf, ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE); - boolean hasQueue = wmQueue != null && !wmQueue.isEmpty(); - WMFullResourcePlan resourcePlan; try { - resourcePlan = sessionHive.getActiveResourcePlan(); - } catch (HiveException e) { - throw new RuntimeException(e); - } - if (hasQueue && resourcePlan == null - && HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_IN_TEST)) { - LOG.info("Creating a default resource plan for test"); - resourcePlan = createTestResourcePlan(); - } - if (resourcePlan == null) { - if (!hasQueue) { - LOG.info("Workload management is not enabled and there's no resource plan"); - return; // TODO: we could activate it anyway, similar to the below; in case someone - // wants to activate a resource plan for Tez triggers only w/o restart. - } - LOG.warn("Workload management is enabled but there's no resource plan"); - } - - if (hasQueue) { - // Initialize workload management. - LOG.info("Initializing workload management"); - try { - wm = WorkloadManager.create(wmQueue, hiveConf, resourcePlan); - } catch (ExecutionException | InterruptedException e) { - throw new ServiceException("Unable to instantiate Workload Manager", e); + if (serviceDiscovery) { + serviceUri = getServerInstanceURI(); + addConfsToPublish(hiveConf, confsToPublish, serviceUri); + if (activePassiveHA) { + hiveConf.set(INSTANCE_URI_CONFIG, serviceUri); + leaderLatchListener = new HS2LeaderLatchListener(this); + leaderActionsExecutorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Leader Actions Handler Thread").build()); + hs2HARegistry = HS2ActivePassiveHARegistry.create(hiveConf, false); + } } + } catch (Exception e) { + throw new ServiceException(e); } - if (resourcePlan != null) { - tezSessionPoolManager.updateTriggers(resourcePlan); - LOG.info("Updated tez session pool manager with active resource plan: {}", - resourcePlan.getPlan().getName()); - } + // Add a shutdown hook for catching SIGTERM & SIGINT + ShutdownHookManager.addShutdownHook(() -> hiveServer2.stop()); } private WMFullResourcePlan createTestResourcePlan() { @@ -315,10 +307,10 @@ private WMFullResourcePlan createTestResourcePlan() { return resourcePlan; } - public static boolean isHTTPTransportMode(HiveConf hiveConf) { + public static boolean isHTTPTransportMode(Configuration hiveConf) { String transportMode = System.getenv("HIVE_SERVER2_TRANSPORT_MODE"); if (transportMode == null) { - transportMode = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE); + transportMode = hiveConf.get(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname); } if (transportMode != null && (transportMode.equalsIgnoreCase("http"))) { return true; @@ -326,8 +318,8 @@ public static boolean isHTTPTransportMode(HiveConf hiveConf) { return false; } - public static boolean isKerberosAuthMode(HiveConf hiveConf) { - String authMode = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION); + public static boolean isKerberosAuthMode(Configuration hiveConf) { + String authMode = hiveConf.get(ConfVars.HIVE_SERVER2_AUTHENTICATION.varname); if (authMode != null && (authMode.equalsIgnoreCase("KERBEROS"))) { return true; } @@ -367,7 +359,7 @@ public static boolean isKerberosAuthMode(HiveConf hiveConf) { * @param hiveConf * @throws Exception */ - private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception { + private void addServerInstanceToZooKeeper(HiveConf hiveConf, Map confsToPublish) throws Exception { String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf); String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE); String instanceURI = getServerInstanceURI(); @@ -408,8 +400,8 @@ private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception { if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_PUBLISH_CONFIGS)) { // HiveServer2 configs that this instance will publish to ZooKeeper, // so that the clients can read these and configure themselves properly. - Map confsToPublish = new HashMap(); - addConfsToPublish(hiveConf, confsToPublish); + + addConfsToPublish(hiveConf, confsToPublish, instanceURI); // Publish configs for this instance as the data on the node znodeData = Joiner.on(';').withKeyValueSeparator("=").join(confsToPublish); } else { @@ -446,10 +438,12 @@ private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception { * Add conf keys, values that HiveServer2 will publish to ZooKeeper. * @param hiveConf */ - private void addConfsToPublish(HiveConf hiveConf, Map confsToPublish) { + private void addConfsToPublish(HiveConf hiveConf, Map confsToPublish, String serviceUri) { // Hostname confsToPublish.put(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname, hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST)); + // Hostname:port + confsToPublish.put(INSTANCE_URI_CONFIG, serviceUri); // Transport mode confsToPublish.put(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, hiveConf.getVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE)); @@ -499,6 +493,10 @@ private void setUpZooKeeperAuth(HiveConf hiveConf) throws Exception { } } + public boolean isLeader() { + return isLeader.get(); + } + /** * The watcher class which sets the de-register flag when the znode corresponding to this server * instance is deleted. Additionally, it shuts down the server if there are no more active client @@ -568,10 +566,16 @@ public synchronized void start() { super.start(); // If we're supporting dynamic service discovery, we'll add the service uri for this // HiveServer2 instance to Zookeeper as a znode. - HiveConf hiveConf = this.getHiveConf(); - if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) { + HiveConf hiveConf = getHiveConf(); + if (serviceDiscovery) { try { - addServerInstanceToZooKeeper(hiveConf); + if (activePassiveHA) { + hs2HARegistry.registerLeaderLatchListener(leaderLatchListener, leaderActionsExecutorService); + hs2HARegistry.start(); + LOG.info("HS2 HA registry started"); + } else { + addServerInstanceToZooKeeper(hiveConf, confsToPublish); + } } catch (Exception e) { LOG.error("Error adding this HiveServer2 instance to ZooKeeper: ", e); throw new ServiceException(e); @@ -586,22 +590,115 @@ public synchronized void start() { throw new ServiceException(e); } } + + if (!activePassiveHA) { + LOG.info("HS2 interactive HA not enabled. Starting tez sessions.."); + startOrReconnectTezSessions(); + } else { + LOG.info("HS2 interactive HA enabled. Tez sessions will be started/reconnected by the leader."); + } + } + + private static class HS2LeaderLatchListener implements LeaderLatchListener { + private HiveServer2 hiveServer2; + + HS2LeaderLatchListener(final HiveServer2 hs2) { + this.hiveServer2 = hs2; + } + + @Override + public void isLeader() { + LOG.info("HS2 instance {} became the LEADER. Starting/Reconnecting tez sessions..", hiveServer2.serviceUri); + hiveServer2.isLeader.set(true); + hiveServer2.startOrReconnectTezSessions(); + } + + @Override + public void notLeader() { + LOG.info("HS2 instance {} LOST LEADERSHIP. Stopping/Disconnecting tez sessions..", hiveServer2.serviceUri); + hiveServer2.isLeader.set(false); + hiveServer2.stopOrDisconnectTezSessions(); + } + } + + // TODO: this is handled in a separate thread in case of HSI-HA, pass threadlocal SessionState? + private void startOrReconnectTezSessions() { + LOG.info("Starting/Reconnecting tez sessions.."); + // TODO: add tez session reconnect after TEZ-3875 + WMFullResourcePlan resourcePlan = null; + if (!StringUtils.isEmpty(wmQueue)) { + try { + resourcePlan = sessionHive.getActiveResourcePlan(); + } catch (HiveException e) { + throw new RuntimeException(e); + } + + if (resourcePlan == null) { + if (!HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_IN_TEST)) { + LOG.error("Cannot activate workload management - no active resource plan"); + } else { + LOG.info("Creating a default resource plan for test"); + resourcePlan = createTestResourcePlan(); + } + } + } + initAndStartTezSessionPoolManager(resourcePlan); + initAndStartWorkloadManager(resourcePlan); + } + + private void initAndStartTezSessionPoolManager(final WMFullResourcePlan resourcePlan) { + // starting Tez session pool in start here to let parent session state initialize on CliService state, to avoid + // SessionState.get() return null during createTezDir + try { + // will be invoked anyway in TezTask. Doing it early to initialize triggers for non-pool tez session. + LOG.info("Initializing tez session pool manager"); + tezSessionPoolManager = TezSessionPoolManager.getInstance(); + if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) { + tezSessionPoolManager.setupPool(hiveConf); + } + tezSessionPoolManager.startPool(hiveConf, resourcePlan); + LOG.info("Tez session pool manager initialized."); + } catch (Exception e) { + throw new ServiceException("Unable to setup tez session pool", e); + } + } + + private void initAndStartWorkloadManager(final WMFullResourcePlan resourcePlan) { + if (!StringUtils.isEmpty(wmQueue)) { + if (resourcePlan != null) { + // Initialize workload management. + LOG.info("Initializing workload management"); + try { + wm = WorkloadManager.create(wmQueue, hiveConf, resourcePlan); + wm.start(); + LOG.info("Workload manager initialized."); + } catch (Exception e) { + throw new ServiceException("Unable to instantiate and start Workload Manager", e); + } + } + } + } + + private void stopOrDisconnectTezSessions() { + LOG.info("Stoppping/Disconnecting tez sessions."); + // There should already be an instance of the session pool manager. + // If not, ignoring is fine while stopping HiveServer2. if (tezSessionPoolManager != null) { try { - tezSessionPoolManager.startPool(); - LOG.info("Started tez session pool manager.."); + tezSessionPoolManager.stop(); + LOG.info("Stopped tez session pool manager."); } catch (Exception e) { - LOG.error("Error starting tez session pool manager: ", e); - throw new ServiceException(e); + LOG.error("Tez session pool manager stop had an error during stop of HiveServer2. " + + "Shutting down HiveServer2 anyway.", e); } } if (wm != null) { try { - wm.start(); - LOG.info("Started workload manager.."); + wm.stop(); + LOG.info("Stopped workload manager."); } catch (Exception e) { - LOG.error("Error starting workload manager", e); - throw new ServiceException(e); + LOG.error("Workload manager stop had an error during stop of HiveServer2. " + + "Shutting down HiveServer2 anyway.", e); } } } @@ -611,6 +708,12 @@ public synchronized void stop() { LOG.info("Shutting down HiveServer2"); HiveConf hiveConf = this.getHiveConf(); super.stop(); + if (hs2HARegistry != null) { + hs2HARegistry.unregisterLeaderLatchListener(leaderLatchListener); + shutdownExecutor(leaderActionsExecutorService); + hs2HARegistry.stop(); + LOG.info("HS2 HA registry stopped"); + } if (webServer != null) { try { webServer.stop(); @@ -629,32 +732,15 @@ public synchronized void stop() { } } // Remove this server instance from ZooKeeper if dynamic service discovery is set - if (hiveConf != null && hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) { + if (serviceDiscovery && !activePassiveHA) { try { removeServerInstanceFromZooKeeper(); } catch (Exception e) { LOG.error("Error removing znode for this HiveServer2 instance from ZooKeeper.", e); } } - // There should already be an instance of the session pool manager. - // If not, ignoring is fine while stopping HiveServer2. - if (hiveConf != null && hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS) && - tezSessionPoolManager != null) { - try { - tezSessionPoolManager.stop(); - } catch (Exception e) { - LOG.error("Tez session pool manager stop had an error during stop of HiveServer2. " - + "Shutting down HiveServer2 anyway.", e); - } - } - if (wm != null) { - try { - wm.stop(); - } catch (Exception e) { - LOG.error("Workload manager stop had an error during stop of HiveServer2. " - + "Shutting down HiveServer2 anyway.", e); - } - } + + stopOrDisconnectTezSessions(); if (hiveConf != null && hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { try { @@ -665,6 +751,22 @@ public synchronized void stop() { } } + private void shutdownExecutor(final ExecutorService leaderActionsExecutorService) { + leaderActionsExecutorService.shutdown(); + try { + if (!leaderActionsExecutorService.awaitTermination(SHUTDOWN_TIME, TimeUnit.SECONDS)) { + LOG.warn("Executor service did not terminate in the specified time {} sec", SHUTDOWN_TIME); + List droppedTasks = leaderActionsExecutorService.shutdownNow(); + LOG.warn("Executor service was abruptly shut down. " + droppedTasks.size() + " tasks will not be executed."); + } + } catch (InterruptedException e) { + LOG.warn("Executor service did not terminate in the specified time {} sec. Exception: {}", SHUTDOWN_TIME, + e.getMessage()); + List droppedTasks = leaderActionsExecutorService.shutdownNow(); + LOG.warn("Executor service was abruptly shut down. " + droppedTasks.size() + " tasks will not be executed."); + } + } + @VisibleForTesting public static void scheduleClearDanglingScratchDir(HiveConf hiveConf, int initialWaitInSec) { if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_CLEAR_DANGLING_SCRATCH_DIR)) { diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2HAInstanceSet.java b/service/src/java/org/apache/hive/service/server/HiveServer2HAInstanceSet.java new file mode 100644 index 0000000..b31d63c --- /dev/null +++ b/service/src/java/org/apache/hive/service/server/HiveServer2HAInstanceSet.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.server; + +import org.apache.hadoop.hive.registry.ServiceInstanceSet; + +public interface HiveServer2HAInstanceSet extends ServiceInstanceSet { + + /** + * In Active/Passive setup, returns current active leader. + * + * @return leader instance + */ + HiveServer2Instance getLeader(); +} diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2Instance.java b/service/src/java/org/apache/hive/service/server/HiveServer2Instance.java new file mode 100644 index 0000000..61b8bc6 --- /dev/null +++ b/service/src/java/org/apache/hive/service/server/HiveServer2Instance.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.server; + +import java.io.IOException; +import java.util.Objects; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.registry.impl.ServiceInstanceBase; +import org.apache.hadoop.registry.client.types.Endpoint; +import org.apache.hadoop.registry.client.types.ServiceRecord; + +import com.google.common.base.Preconditions; + +public class HiveServer2Instance extends ServiceInstanceBase { + private boolean isLeader; + private String transportMode; + private String httpEndpoint; + + HiveServer2Instance(final ServiceRecord srv, final String endPointName) throws IOException { + super(srv, endPointName); + + Endpoint activeEndpoint = srv.getInternalEndpoint(HS2ActivePassiveHARegistry.ACTIVE_ENDPOINT); + if (activeEndpoint != null) { + isLeader = true; + } + Endpoint passiveEndpoint = srv.getInternalEndpoint(HS2ActivePassiveHARegistry.PASSIVE_ENDPOINT); + if (passiveEndpoint != null) { + isLeader = false; + } + Preconditions.checkArgument(activeEndpoint == null || passiveEndpoint == null, + "Incorrect service record. Both active and passive endpoints cannot be non-null!"); + this.transportMode = srv.get(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname); + if (transportMode.equalsIgnoreCase("http")) { + this.httpEndpoint = srv.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH.varname); + } + } + + public boolean isLeader() { + return isLeader; + } + + public String getTransportMode() { + return transportMode; + } + + public String getHttpEndpoint() { + return httpEndpoint; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + HiveServer2Instance other = (HiveServer2Instance) o; + return super.equals(o) && isLeader == other.isLeader + && Objects.equals(transportMode, other.transportMode) + && Objects.equals(httpEndpoint, other.httpEndpoint); + } + + @Override + public int hashCode() { + return super.hashCode() + Objects.hashCode(isLeader) + Objects.hashCode(transportMode) + Objects.hashCode(httpEndpoint); + } + + @Override + public String toString() { + String result = "instanceId: " + getWorkerIdentity() + " isLeader: " + isLeader + " host: " + getHost() + + " port: " + getRpcPort() + " transportMode: " + transportMode; + if (httpEndpoint != null) { + result += " httpEndpoint: " + httpEndpoint; + } + return result; + } +} diff --git a/service/src/java/org/apache/hive/service/servlet/HS2LeadershipStatus.java b/service/src/java/org/apache/hive/service/servlet/HS2LeadershipStatus.java new file mode 100644 index 0000000..1ec63ce --- /dev/null +++ b/service/src/java/org/apache/hive/service/servlet/HS2LeadershipStatus.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.service.servlet; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.servlet.ServletContext; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.codehaus.jackson.map.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Returns 200 if this HS2 instance is leader. + */ +public class HS2LeadershipStatus extends HttpServlet { + private static final Logger LOG = LoggerFactory.getLogger(HS2LeadershipStatus.class); + + @Override + public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException { + ServletContext ctx = getServletContext(); + AtomicBoolean isLeader = (AtomicBoolean) ctx.getAttribute("hs2.isLeader"); + LOG.info("Returning isLeader: {}", isLeader); + ObjectMapper mapper = new ObjectMapper(); + mapper.writerWithDefaultPrettyPrinter().writeValue(response.getWriter(), isLeader); + response.setStatus(HttpServletResponse.SC_OK); + } +}