diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 907d064..73f4d60 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1916,6 +1916,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal new TimeValidator(TimeUnit.MILLISECONDS), "ZooKeeper client's session timeout (in milliseconds). The client is disconnected, and as a result, all locks released, \n" + "if a heartbeat is not sent in the timeout."), + HIVE_ZOOKEEPER_CONNECTION_TIMEOUT("hive.zookeeper.connection.timeout", "15s", + new TimeValidator(TimeUnit.SECONDS), + "ZooKeeper client's connection timeout in seconds. Connection timeout * hive.zookeeper.connection.max.retries\n" + + "with exponential backoff is when curator client deems connection is lost to zookeeper."), HIVE_ZOOKEEPER_NAMESPACE("hive.zookeeper.namespace", "hive_zookeeper_namespace", "The parent node under which all ZooKeeper nodes are created."), HIVE_ZOOKEEPER_CLEAN_EXTRA_NODES("hive.zookeeper.clean.extra.nodes", false, @@ -2465,6 +2469,13 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "If true, the HiveServer2 WebUI will be secured with PAM."), // Tez session settings + HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE("hive.server2.active.passive.ha.enable", false, + "Whether HiveServer2 Active/Passive High Availability be enabled when Hive Interactive sessions are enabled." + + "This will also require hive.server2.support.dynamic.service.discovery to be enabled."), + HIVE_SERVER2_ACTIVE_PASSIVE_HA_REGISTRY_NAMESPACE("hive.server2.active.passive.ha.registry.namespace", + "hs2ActivePassiveHA", + "When HiveServer2 Active/Passive High Availability is enabled, uses this namespace for registering HS2\n" + + "instances with zookeeper"), HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE("hive.server2.tez.interactive.queue", "", "A single YARN queues to use for Hive Interactive sessions. When this is specified,\n" + "workload management is enabled and used for these sessions."), diff --git a/itests/hive-minikdc/pom.xml b/itests/hive-minikdc/pom.xml index 337535a..1e40d9d 100644 --- a/itests/hive-minikdc/pom.xml +++ b/itests/hive-minikdc/pom.xml @@ -117,6 +117,26 @@ test tests + + org.apache.tez + tez-dag + ${tez.version} + test + + + org.slf4j + slf4j-log4j12 + + + commmons-logging + commons-logging + + + org.apache.hadoop + hadoop-yarn-server-web-proxy + + + junit diff --git a/itests/hive-unit-hadoop2/pom.xml b/itests/hive-unit-hadoop2/pom.xml index fb31fd4..85a6145 100644 --- a/itests/hive-unit-hadoop2/pom.xml +++ b/itests/hive-unit-hadoop2/pom.xml @@ -53,7 +53,26 @@ hive-exec ${project.version} - + + org.apache.tez + tez-dag + ${tez.version} + test + + + org.slf4j + slf4j-log4j12 + + + commmons-logging + commons-logging + + + org.apache.hadoop + hadoop-yarn-server-web-proxy + + + diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java new file mode 100644 index 0000000..26acbd7 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import static org.junit.Assert.assertEquals; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.URL; +import java.sql.Connection; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.apache.curator.test.TestingServer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.registry.impl.ZkRegistryBase; +import org.apache.hive.jdbc.miniHS2.MiniHS2; +import org.apache.hive.service.server.HS2ActivePassiveHARegistry; +import org.apache.hive.service.server.HS2ActivePassiveHARegistryClient; +import org.apache.hive.service.server.HiveServer2Instance; +import org.apache.hive.service.servlet.HS2Peers; +import org.codehaus.jackson.map.ObjectMapper; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestActivePassiveHA { + private MiniHS2 miniHS2_1 = null; + private MiniHS2 miniHS2_2 = null; + private static TestingServer zkServer; + private Connection hs2Conn = null; + private HiveConf hiveConf1; + private HiveConf hiveConf2; + + @BeforeClass + public static void beforeTest() throws Exception { + MiniHS2.cleanupLocalDir(); + zkServer = new TestingServer(); + Class.forName(MiniHS2.getJdbcDriverName()); + } + + @AfterClass + public static void afterTest() throws Exception { + if (zkServer != null) { + zkServer.close(); + zkServer = null; + } + MiniHS2.cleanupLocalDir(); + } + + @Before + public void setUp() throws Exception { + hiveConf1 = new HiveConf(); + hiveConf1.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + // Set up zookeeper dynamic service discovery configs + setHAConfigs(hiveConf1); + miniHS2_1 = new MiniHS2.Builder().withConf(hiveConf1).cleanupLocalDirOnStartup(false).build(); + hiveConf2 = new HiveConf(); + hiveConf2.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + // Set up zookeeper dynamic service discovery configs + setHAConfigs(hiveConf2); + miniHS2_2 = new MiniHS2.Builder().withConf(hiveConf2).cleanupLocalDirOnStartup(false).build(); + } + + @After + public void tearDown() throws Exception { + if (hs2Conn != null) { + hs2Conn.close(); + } + if ((miniHS2_1 != null) && miniHS2_1.isStarted()) { + miniHS2_1.stop(); + } + if ((miniHS2_2 != null) && miniHS2_2.isStarted()) { + miniHS2_2.stop(); + } + } + + private static void setHAConfigs(Configuration conf) { + conf.setBoolean(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY.varname, true); + conf.set(ConfVars.HIVE_ZOOKEEPER_QUORUM.varname, zkServer.getConnectString()); + final String zkRootNamespace = "hs2test"; + conf.set(ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE.varname, zkRootNamespace); + conf.setBoolean(ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE.varname, true); + conf.setTimeDuration(ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT.varname, 2, TimeUnit.SECONDS); + conf.setTimeDuration(ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME.varname, 100, TimeUnit.MILLISECONDS); + conf.setInt(ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES.varname, 1); + } + + @Test(timeout = 60000) + public void testActivePassive() throws Exception { + Map confOverlay = new HashMap<>(); + hiveConf1.set(ZkRegistryBase.UNIQUE_IDENTIFIER, UUID.randomUUID().toString()); + miniHS2_1.start(confOverlay); + while(!miniHS2_1.isStarted()) { + Thread.sleep(100); + } + + hiveConf2.set(ZkRegistryBase.UNIQUE_IDENTIFIER, UUID.randomUUID().toString()); + miniHS2_2.start(confOverlay); + while(!miniHS2_2.isStarted()) { + Thread.sleep(100); + } + + assertEquals(true, miniHS2_1.isLeader()); + String url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; + assertEquals("true", sendGet(url)); + + assertEquals(false, miniHS2_2.isLeader()); + url = "http://localhost:" + hiveConf2.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; + assertEquals("false", sendGet(url)); + + url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/peers"; + String resp = sendGet(url); + ObjectMapper objectMapper = new ObjectMapper(); + HS2Peers.HS2Instances hs2Peers = objectMapper.readValue(resp, HS2Peers.HS2Instances.class); + int port1 = Integer.parseInt(hiveConf1.get(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname)); + assertEquals(2, hs2Peers.getHiveServer2Instances().size()); + for (HiveServer2Instance hsi : hs2Peers.getHiveServer2Instances()) { + if (hsi.getRpcPort() == port1) { + assertEquals(true, hsi.isLeader()); + } else { + assertEquals(false, hsi.isLeader()); + } + } + + Configuration conf = new Configuration(); + setHAConfigs(conf); + HS2ActivePassiveHARegistry client = HS2ActivePassiveHARegistryClient.getClient(conf); + List hs2Instances = new ArrayList<>(client.getAll()); + assertEquals(2, hs2Instances.size()); + List leaders = new ArrayList<>(); + List standby = new ArrayList<>(); + for (HiveServer2Instance instance : hs2Instances) { + if (instance.isLeader()) { + leaders.add(instance); + } else { + standby.add(instance); + } + } + assertEquals(1, leaders.size()); + assertEquals(1, standby.size()); + + miniHS2_1.stop(); + + while(!miniHS2_2.isStarted()) { + Thread.sleep(100); + } + assertEquals(true, miniHS2_2.isLeader()); + url = "http://localhost:" + hiveConf2.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; + assertEquals("true", sendGet(url)); + + while (client.getAll().size() != 1) { + Thread.sleep(100); + } + + client = HS2ActivePassiveHARegistryClient.getClient(conf); + hs2Instances = new ArrayList<>(client.getAll()); + assertEquals(1, hs2Instances.size()); + leaders = new ArrayList<>(); + standby = new ArrayList<>(); + for (HiveServer2Instance instance : hs2Instances) { + if (instance.isLeader()) { + leaders.add(instance); + } else { + standby.add(instance); + } + } + assertEquals(1, leaders.size()); + assertEquals(0, standby.size()); + + url = "http://localhost:" + hiveConf2.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/peers"; + resp = sendGet(url); + objectMapper = new ObjectMapper(); + hs2Peers = objectMapper.readValue(resp, HS2Peers.HS2Instances.class); + int port2 = Integer.parseInt(hiveConf2.get(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname)); + assertEquals(1, hs2Peers.getHiveServer2Instances().size()); + for (HiveServer2Instance hsi : hs2Peers.getHiveServer2Instances()) { + if (hsi.getRpcPort() == port2) { + assertEquals(true, hsi.isLeader()); + } else { + assertEquals(false, hsi.isLeader()); + } + } + + // start 1st server again + hiveConf1.set(ZkRegistryBase.UNIQUE_IDENTIFIER, UUID.randomUUID().toString()); + miniHS2_1.start(confOverlay); + + while(!miniHS2_1.isStarted()) { + Thread.sleep(100); + } + assertEquals(false, miniHS2_1.isLeader()); + url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; + assertEquals("false", sendGet(url)); + + while (client.getAll().size() != 2) { + Thread.sleep(100); + } + + client = HS2ActivePassiveHARegistryClient.getClient(conf); + hs2Instances = new ArrayList<>(client.getAll()); + assertEquals(2, hs2Instances.size()); + leaders = new ArrayList<>(); + standby = new ArrayList<>(); + for (HiveServer2Instance instance : hs2Instances) { + if (instance.isLeader()) { + leaders.add(instance); + } else { + standby.add(instance); + } + } + assertEquals(1, leaders.size()); + assertEquals(1, standby.size()); + + url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/peers"; + resp = sendGet(url); + objectMapper = new ObjectMapper(); + hs2Peers = objectMapper.readValue(resp, HS2Peers.HS2Instances.class); + port2 = Integer.parseInt(hiveConf2.get(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname)); + assertEquals(2, hs2Peers.getHiveServer2Instances().size()); + for (HiveServer2Instance hsi : hs2Peers.getHiveServer2Instances()) { + if (hsi.getRpcPort() == port2) { + assertEquals(true, hsi.isLeader()); + } else { + assertEquals(false, hsi.isLeader()); + } + } + } + + private String sendGet(String url) throws Exception { + URL obj = new URL(url); + HttpURLConnection con = (HttpURLConnection) obj.openConnection(); + con.setRequestMethod("GET"); + BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream())); + String inputLine; + StringBuilder response = new StringBuilder(); + while ((inputLine = in.readLine()) != null) { + response.append(inputLine); + } + in.close(); + return response.toString(); + } +} \ No newline at end of file diff --git a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/AbstractHiveService.java b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/AbstractHiveService.java index 6cab8cd..d21b764 100644 --- a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/AbstractHiveService.java +++ b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/AbstractHiveService.java @@ -35,14 +35,16 @@ private String hostname; private int binaryPort; private int httpPort; + private int webPort; private boolean startedHiveService = false; private List addedProperties = new ArrayList(); - public AbstractHiveService(HiveConf hiveConf, String hostname, int binaryPort, int httpPort) { + public AbstractHiveService(HiveConf hiveConf, String hostname, int binaryPort, int httpPort, int webPort) { this.hiveConf = hiveConf; this.hostname = hostname; this.binaryPort = binaryPort; this.httpPort = httpPort; + this.webPort = webPort; } /** @@ -136,6 +138,10 @@ public int getHttpPort() { return httpPort; } + public int getWebPort() { + return webPort; + } + public boolean isStarted() { return startedHiveService; } diff --git a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java index 8bbf8a4..997726c 100644 --- a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java +++ b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java @@ -217,6 +217,8 @@ private MiniHS2(HiveConf hiveConf, MiniClusterType miniClusterType, boolean useM (usePortsFromConf ? hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT) : MetaStoreTestUtils .findFreePort()), (usePortsFromConf ? hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT) : MetaStoreTestUtils + .findFreePort()), + (usePortsFromConf ? hiveConf.getIntVar(ConfVars.HIVE_SERVER2_WEBUI_PORT) : MetaStoreTestUtils .findFreePort())); hiveConf.setLongVar(ConfVars.HIVE_SERVER2_MAX_START_ATTEMPTS, 3l); hiveConf.setTimeVar(ConfVars.HIVE_SERVER2_SLEEP_INTERVAL_BETWEEN_START_ATTEMPTS, 10, @@ -306,6 +308,7 @@ private MiniHS2(HiveConf hiveConf, MiniClusterType miniClusterType, boolean useM hiveConf.setVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST, getHost()); hiveConf.setIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT, getBinaryPort()); hiveConf.setIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT, getHttpPort()); + hiveConf.setIntVar(ConfVars.HIVE_SERVER2_WEBUI_PORT, getWebPort()); Path scratchDir = new Path(baseFsDir, "scratch"); // Create root scratchdir with write all, so that user impersonation has no issues. @@ -404,6 +407,10 @@ public void cleanup() { } + public boolean isLeader() { + return hiveServer2.isLeader(); + } + public CLIServiceClient getServiceClient() { verifyStarted(); return getServiceClientInternal(); diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java index 5d7f813..6178b4b 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java @@ -14,13 +14,16 @@ package org.apache.hadoop.hive.llap.registry; import java.io.IOException; + +import org.apache.hadoop.hive.registry.ServiceInstance; +import org.apache.hadoop.hive.registry.ServiceInstanceSet; import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; import org.apache.hadoop.yarn.api.records.ApplicationId; /** * ServiceRegistry interface for switching between fixed host and dynamic registry implementations. */ -public interface ServiceRegistry { +public interface ServiceRegistry { /** * Start the service registry @@ -49,14 +52,14 @@ * @param clusterReadyTimeoutMs The time to wait for the cluster to be ready, if it's not * started yet. 0 means do not wait. */ - LlapServiceInstanceSet getInstances(String component, long clusterReadyTimeoutMs) throws IOException; + ServiceInstanceSet getInstances(String component, long clusterReadyTimeoutMs) throws + IOException; /** * Adds state change listeners for service instances. * @param listener - state change listener */ - void registerStateChangeListener( - ServiceInstanceStateChangeListener listener) throws IOException; + void registerStateChangeListener(ServiceInstanceStateChangeListener listener) throws IOException; /** * @return The application ID of the LLAP cluster. diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java index c88198f..f99d86c 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.ServiceRegistry; +import org.apache.hadoop.hive.registry.ServiceInstance; import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.StringUtils; @@ -267,8 +268,7 @@ public LlapServiceInstanceSet getInstances(String component, long timeoutMs) thr } @Override - public void registerStateChangeListener( - final ServiceInstanceStateChangeListener listener) { + public void registerStateChangeListener(final ServiceInstanceStateChangeListener listener) throws IOException { // nothing to set LOG.warn("Callbacks for instance state changes are not supported in fixed registry."); } diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java index 80a6aba..3bda40b 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.ServiceRegistry; +import org.apache.hadoop.hive.registry.ServiceInstanceSet; import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; import org.apache.hadoop.registry.client.binding.RegistryUtils; import org.apache.hadoop.service.AbstractService; @@ -35,7 +36,7 @@ private static final Logger LOG = LoggerFactory.getLogger(LlapRegistryService.class); - private ServiceRegistry registry = null; + private ServiceRegistry registry = null; private final boolean isDaemon; private boolean isDynamic = false; private String identity = "(pending)"; @@ -136,7 +137,7 @@ public LlapServiceInstanceSet getInstances() throws IOException { } public LlapServiceInstanceSet getInstances(long clusterReadyTimeoutMs) throws IOException { - return this.registry.getInstances("LLAP", clusterReadyTimeoutMs); + return (LlapServiceInstanceSet) this.registry.getInstances("LLAP", clusterReadyTimeoutMs); } public void registerStateChangeListener( diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java index 8339230..f5d6202 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java @@ -53,7 +53,7 @@ import org.slf4j.LoggerFactory; public class LlapZookeeperRegistryImpl - extends ZkRegistryBase implements ServiceRegistry { + extends ZkRegistryBase implements ServiceRegistry { private static final Logger LOG = LoggerFactory.getLogger(LlapZookeeperRegistryImpl.class); /** @@ -65,8 +65,6 @@ private static final String IPC_LLAP = "llap"; private static final String IPC_OUTPUTFORMAT = "llapoutputformat"; private final static String NAMESPACE_PREFIX = "llap-"; - private final static String USER_SCOPE_PATH_PREFIX = "user-"; - private static final String WORKER_PREFIX = "worker-"; private static final String SLOT_PREFIX = "slot-"; private static final String SASL_LOGIN_CONTEXT_NAME = "LlapZooKeeperClient"; @@ -79,7 +77,7 @@ public LlapZookeeperRegistryImpl(String instanceName, Configuration conf) { super(instanceName, conf, HiveConf.getVar(conf, ConfVars.LLAP_ZK_REGISTRY_NAMESPACE), NAMESPACE_PREFIX, - USER_SCOPE_PATH_PREFIX, WORKER_PREFIX, + USER_SCOPE_PATH_PREFIX, WORKER_PREFIX, WORKER_GROUP, LlapProxy.isDaemon() ? SASL_LOGIN_CONTEXT_NAME : null, HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL), HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE), @@ -225,7 +223,7 @@ public Resource getResource() { @Override public String toString() { - return "DynamicServiceInstance [id=" + getWorkerIdentity() + ", host=" + host + ":" + rpcPort + + return "DynamicServiceInstance [id=" + getWorkerIdentity() + ", host=" + getHost() + ":" + getRpcPort() + " with resources=" + getResource() + ", shufflePort=" + getShufflePort() + ", servicesAddress=" + getServicesAddress() + ", mgmtPort=" + getManagementPort() + "]"; } @@ -327,9 +325,9 @@ private static String extractWorkerIdFromSlot(ChildData childData) { if (data == null) continue; String nodeName = extractNodeName(childData); if (nodeName.startsWith(WORKER_PREFIX)) { - Set instances = getInstancesByPath(childData.getPath()); + LlapServiceInstance instances = getInstanceByPath(childData.getPath()); if (instances != null) { - unsorted.addAll(instances); + unsorted.add(instances); } } else if (nodeName.startsWith(SLOT_PREFIX)) { slotByWorker.put(extractWorkerIdFromSlot(childData), diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java index 32d5caa..3208e21 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; +import org.apache.hadoop.hive.registry.ServiceInstanceSet; import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; @@ -55,7 +56,7 @@ private final SocketFactory socketFactory; private final RetryPolicy retryPolicy; private final Configuration conf; - private LlapServiceInstanceSet activeInstances; + private ServiceInstanceSet activeInstances; private Collection lastKnownInstances; private LlapManagementProtocolClientImpl client; private LlapServiceInstance clientInstance; diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/RegistryUtilities.java b/llap-client/src/java/org/apache/hadoop/hive/registry/RegistryUtilities.java new file mode 100644 index 0000000..e069e43 --- /dev/null +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/RegistryUtilities.java @@ -0,0 +1,52 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.registry; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.UUID; + +public class RegistryUtilities { + private static final String LOCALHOST = "localhost"; + + /** + * Will return hostname stored in InetAddress. + * + * @return hostname + */ + public static String getHostName() { + try { + return InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + return LOCALHOST; + } + } + + /** + * Will return FQDN of the host after doing reverse DNS lookip. + * + * @return FQDN of host + */ + public static String getCanonicalHostName() { + try { + return InetAddress.getLocalHost().getCanonicalHostName(); + } catch (UnknownHostException e) { + return LOCALHOST; + } + } + + public static String getUUID() { + return String.valueOf(UUID.randomUUID()); + } +} \ No newline at end of file diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java index 908b3bb..4493e99 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java @@ -21,27 +21,27 @@ * Worker identity is a UUID (unique across restarts), to identify a node which died & was brought * back on the same host/port */ - public abstract String getWorkerIdentity(); + String getWorkerIdentity(); /** * Hostname of the service instance * - * @return + * @return service hostname */ - public abstract String getHost(); + String getHost(); /** * RPC Endpoint for service instance - * - * @return + * + * @return rpc port */ - public int getRpcPort(); + int getRpcPort(); /** * Config properties of the Service Instance (llap.daemon.*) - * - * @return + * + * @return properties */ - public abstract Map getProperties(); + Map getProperties(); } \ No newline at end of file diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java index 34fba5c..63178cc 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java @@ -29,15 +29,15 @@ * The worker identity does not collide between restarts, so each restart will have a unique id, * while having the same host/ip pair. * - * @return + * @return instance list */ Collection getAll(); /** * Get an instance by worker identity. * - * @param name - * @return + * @param name worker id + * @return instance */ InstanceType getInstance(String name); @@ -46,13 +46,13 @@ * * The list could include dead and alive instances. * - * @param host - * @return + * @param host hostname + * @return instance list */ Set getByHost(String host); /** - * Get number of instances in the currently availabe. + * Get number of instances in the currently available. * * @return - number of instances */ diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java index db3d788..de8910c 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java @@ -15,6 +15,8 @@ import java.io.IOException; import java.util.Map; +import java.util.Objects; + import org.apache.hadoop.hive.registry.ServiceInstance; import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; import org.apache.hadoop.registry.client.types.AddressTypes; @@ -25,26 +27,27 @@ public class ServiceInstanceBase implements ServiceInstance { private static final Logger LOG = LoggerFactory.getLogger(ServiceInstanceBase.class); + private String host; + private int rpcPort; + private String workerIdentity; + private Map properties; - protected final ServiceRecord srv; - protected final String host; - protected final int rpcPort; + // empty c'tor to make jackson happy + public ServiceInstanceBase() { - public ServiceInstanceBase(ServiceRecord srv, String rpcName) throws IOException { - this.srv = srv; + } + public ServiceInstanceBase(ServiceRecord srv, String rpcName) throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("Working with ServiceRecord: {}", srv); } - final Endpoint rpc = srv.getInternalEndpoint(rpcName); - - this.host = - RegistryTypeUtils.getAddressField(rpc.addresses.get(0), - AddressTypes.ADDRESS_HOSTNAME_FIELD); - this.rpcPort = - Integer.parseInt(RegistryTypeUtils.getAddressField(rpc.addresses.get(0), - AddressTypes.ADDRESS_PORT_FIELD)); + this.host = RegistryTypeUtils.getAddressField(rpc.addresses.get(0), + AddressTypes.ADDRESS_HOSTNAME_FIELD); + this.rpcPort = Integer.parseInt(RegistryTypeUtils.getAddressField(rpc.addresses.get(0), + AddressTypes.ADDRESS_PORT_FIELD)); + this.workerIdentity = srv.get(ZkRegistryBase.UNIQUE_IDENTIFIER); + this.properties = srv.attributes(); } @Override @@ -57,17 +60,19 @@ public boolean equals(Object o) { } ServiceInstanceBase other = (ServiceInstanceBase) o; - return this.getWorkerIdentity().equals(other.getWorkerIdentity()); + return Objects.equals(getWorkerIdentity(), other.getWorkerIdentity()) + && Objects.equals(host, other.host) + && Objects.equals(rpcPort, other.rpcPort); } @Override public int hashCode() { - return getWorkerIdentity().hashCode(); + return getWorkerIdentity().hashCode() + (31 * host.hashCode()) + (31 * rpcPort); } @Override public String getWorkerIdentity() { - return srv.get(ZkRegistryBase.UNIQUE_IDENTIFIER); + return workerIdentity; } @Override @@ -82,12 +87,28 @@ public int getRpcPort() { @Override public Map getProperties() { - return srv.attributes(); + return properties; + } + + public void setHost(final String host) { + this.host = host; + } + + public void setRpcPort(final int rpcPort) { + this.rpcPort = rpcPort; + } + + public void setWorkerIdentity(final String workerIdentity) { + this.workerIdentity = workerIdentity; + } + + public void setProperties(final Map properties) { + this.properties = properties; } @Override public String toString() { return "DynamicServiceInstance [id=" + getWorkerIdentity() + ", host=" - + host + ":" + rpcPort + "]"; + + host + ":" + rpcPort + "]"; } } \ No newline at end of file diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java index 0724cf5..d09cb24 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java @@ -13,29 +13,26 @@ */ package org.apache.hadoop.hive.registry.impl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.IOException; import org.apache.commons.codec.binary.Base64; - -import com.google.common.io.ByteStreams; - -import org.apache.tez.common.security.JobTokenIdentifier; - -import org.apache.hadoop.security.token.Token; - -import java.io.IOException; import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; import org.apache.hadoop.registry.client.types.AddressTypes; import org.apache.hadoop.registry.client.types.Endpoint; import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.security.token.Token; +import org.apache.tez.common.security.JobTokenIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.io.ByteStreams; public class TezAmInstance extends ServiceInstanceBase { private static final Logger LOG = LoggerFactory.getLogger(TezAmInstance.class); private final int pluginPort; private Token token; - public TezAmInstance(ServiceRecord srv) throws IOException { + TezAmInstance(ServiceRecord srv) throws IOException { super(srv, TezAmRegistryImpl.IPC_TEZCLIENT); final Endpoint plugin = srv.getInternalEndpoint(TezAmRegistryImpl.IPC_PLUGIN); if (plugin != null) { @@ -76,7 +73,7 @@ public String getPluginTokenJobId() { @Override public String toString() { - return "TezAmInstance [" + getSessionId() + ", host=" + host + ", rpcPort=" + rpcPort + + return "TezAmInstance [" + getSessionId() + ", host=" + getHost() + ", rpcPort=" + getRpcPort() + ", pluginPort=" + pluginPort + ", token=" + token + "]"; } diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java index 417e571..ab02cf4 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java @@ -37,21 +37,19 @@ static final String AM_SESSION_ID = "am.session.id", AM_PLUGIN_TOKEN = "am.plugin.token", AM_PLUGIN_JOBID = "am.plugin.jobid"; private final static String NAMESPACE_PREFIX = "tez-am-"; - private final static String USER_SCOPE_PATH_PREFIX = "user-"; - private static final String WORKER_PREFIX = "worker-"; private static final String SASL_LOGIN_CONTEXT_NAME = "TezAmZooKeeperClient"; private final String registryName; - public static TezAmRegistryImpl create(Configuration conf, boolean b) { + public static TezAmRegistryImpl create(Configuration conf, boolean useSecureZk) { String amRegistryName = HiveConf.getVar(conf, ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME); return StringUtils.isBlank(amRegistryName) ? null - : new TezAmRegistryImpl(amRegistryName, conf, true); + : new TezAmRegistryImpl(amRegistryName, conf, useSecureZk); } private TezAmRegistryImpl(String instanceName, Configuration conf, boolean useSecureZk) { - super(instanceName, conf, null, NAMESPACE_PREFIX, USER_SCOPE_PATH_PREFIX, WORKER_PREFIX, + super(instanceName, conf, null, NAMESPACE_PREFIX, USER_SCOPE_PATH_PREFIX, WORKER_PREFIX, WORKER_GROUP, useSecureZk ? SASL_LOGIN_CONTEXT_NAME : null, HiveConf.getVar(conf, ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_PRINCIPAL), HiveConf.getVar(conf, ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_KEYTAB_FILE), diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java index 17269dd..e7227a8 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java @@ -13,13 +13,7 @@ */ package org.apache.hadoop.hive.registry.impl; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; @@ -34,6 +28,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; + import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.ACLProvider; @@ -44,12 +39,15 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode; import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode.Mode; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.utils.CloseableUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.LlapUtil; +import org.apache.hadoop.hive.registry.RegistryUtilities; import org.apache.hadoop.hive.registry.ServiceInstance; import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; import org.apache.hadoop.registry.client.binding.RegistryUtils; @@ -65,6 +63,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * This is currently used for implementation inheritance only; it doesn't provide a unified flow * into which one can just plug a few abstract method implementations, because providing one with @@ -77,16 +81,18 @@ private static final Logger LOG = LoggerFactory.getLogger(ZkRegistryBase.class); private final static String SASL_NAMESPACE = "sasl"; private final static String UNSECURE_NAMESPACE = "unsecure"; - - static final String UNIQUE_IDENTIFIER = "registry.unique.id"; - private static final UUID uniq = UUID.randomUUID(); + protected final static String USER_SCOPE_PATH_PREFIX = "user-"; + protected static final String WORKER_PREFIX = "worker-"; + protected static final String WORKER_GROUP = "workers"; + public static final String UNIQUE_IDENTIFIER = "registry.unique.id"; + protected static final UUID UNIQUE_ID = UUID.randomUUID(); + private static final Joiner PATH_JOINER = Joiner.on("/").skipNulls(); protected final Configuration conf; protected final CuratorFramework zooKeeperClient; - // userPathPrefix is the path specific to the user for which ACLs should be restrictive. // workersPath is the directory path where all the worker znodes are located. protected final String workersPath; - private final String userPathPrefix, workerNodePrefix; + private final String workerNodePrefix; protected final ServiceRecordMarshal encoder; // to marshal/unmarshal znode data @@ -99,7 +105,9 @@ private final String disableMessage; private final Lock instanceCacheLock = new ReentrantLock(); - private final Map> pathToInstanceCache; + // there can be only one instance per path + private final Map pathToInstanceCache; + // there can be multiple instances per node private final Map> nodeToInstanceCache; // The registration znode. @@ -109,29 +117,22 @@ private PathChildrenCache instancesCache; // Created on demand. /** Local hostname. */ - protected static final String hostname; - static { - String localhost = "localhost"; - try { - localhost = InetAddress.getLocalHost().getCanonicalHostName(); - } catch (UnknownHostException uhe) { - // ignore - } - hostname = localhost; - } + protected static final String hostname = RegistryUtilities.getCanonicalHostName(); /** * @param rootNs A single root namespace override. Not recommended. - * @param nsPrefix The namespace prefix to use with default namespaces. + * @param nsPrefix The namespace prefix to use with default namespaces (appends 'sasl' for secure else 'unsecure' + * to namespace prefix to get effective root namespace). * @param userScopePathPrefix The prefix to use for the user-specific part of the path. * @param workerPrefix The prefix to use for each worker znode. + * @param workerGroup group name to use for all workers * @param zkSaslLoginContextName SASL login context name for ZK security; null if not needed. * @param zkPrincipal ZK security principal. * @param zkKeytab ZK security keytab. * @param aclsConfig A config setting to use to determine if ACLs should be verified. */ public ZkRegistryBase(String instanceName, Configuration conf, String rootNs, String nsPrefix, - String userScopePathPrefix, String workerPrefix, + String userScopePathPrefix, String workerPrefix, String workerGroup, String zkSaslLoginContextName, String zkPrincipal, String zkKeytab, ConfVars aclsConfig) { this.conf = new Configuration(conf); this.saslLoginContextName = zkSaslLoginContextName; @@ -145,29 +146,52 @@ public ZkRegistryBase(String instanceName, Configuration conf, String rootNs, St this.disableMessage = ""; } this.conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); - String zkEnsemble = getQuorumServers(this.conf); this.encoder = new RegistryUtils.ServiceRecordMarshal(); - int sessionTimeout = (int) HiveConf.getTimeVar(conf, - ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS); - int baseSleepTime = (int) HiveConf.getTimeVar(conf, - ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS); - int maxRetries = HiveConf.getIntVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES); // sample path: /llap-sasl/hiveuser/hostname/workers/worker-0000000 // worker-0000000 is the sequence number which will be retained until session timeout. If a // worker does not respond due to communication interruptions it will retain the same sequence // number when it returns back. If session timeout expires, the node will be deleted and new // addition of the same node (restart) will get next sequence number - this.userPathPrefix = userScopePathPrefix + getZkPathUser(this.conf); - this.workerNodePrefix = workerPrefix; - this.workersPath = "/" + userPathPrefix + "/" + instanceName + "/workers"; + final String userPathPrefix = userScopePathPrefix == null ? null : userScopePathPrefix + getZkPathUser(conf); + this.workerNodePrefix = workerPrefix == null ? WORKER_PREFIX : workerPrefix; + this.workersPath = "/" + PATH_JOINER.join(userPathPrefix, instanceName, workerGroup); this.instancesCache = null; this.stateChangeListeners = new HashSet<>(); this.pathToInstanceCache = new ConcurrentHashMap<>(); this.nodeToInstanceCache = new ConcurrentHashMap<>(); + final String namespace = getRootNamespace(rootNs, nsPrefix); + ACLProvider aclProvider; + // get acl provider for most outer path that is non-null + if (userPathPrefix == null) { + if (instanceName == null) { + if (workerGroup == null) { + aclProvider = getACLProviderForZKPath(namespace); + } else { + aclProvider = getACLProviderForZKPath(workerGroup); + } + } else { + aclProvider = getACLProviderForZKPath(instanceName); + } + } else { + aclProvider = getACLProviderForZKPath(userScopePathPrefix); + } + this.zooKeeperClient = getZookeeperClient(conf, namespace, aclProvider); + this.zooKeeperClient.getConnectionStateListenable().addListener(new ZkConnectionStateListener()); + } + + public static String getRootNamespace(String userProvidedNamespace, String defaultNamespacePrefix) { + final boolean isSecure = UserGroupInformation.isSecurityEnabled(); + String rootNs = userProvidedNamespace; + if (rootNs == null) { + rootNs = defaultNamespacePrefix + (isSecure ? SASL_NAMESPACE : UNSECURE_NAMESPACE); + } + return rootNs; + } + private ACLProvider getACLProviderForZKPath(String zkPath) { final boolean isSecure = UserGroupInformation.isSecurityEnabled(); - ACLProvider zooKeeperAclProvider = new ACLProvider() { + return new ACLProvider() { @Override public List getDefaultAcl() { // We always return something from getAclForPath so this should not happen. @@ -177,31 +201,40 @@ public ZkRegistryBase(String instanceName, Configuration conf, String rootNs, St @Override public List getAclForPath(String path) { - if (!isSecure || path == null || !path.contains(userPathPrefix)) { + if (!isSecure || path == null || !path.contains(zkPath)) { // No security or the path is below the user path - full access. return Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE); } return createSecureAcls(); } }; - if (rootNs == null) { - rootNs = nsPrefix + (isSecure ? SASL_NAMESPACE : UNSECURE_NAMESPACE); // The normal path. - } + } + + private CuratorFramework getZookeeperClient(Configuration conf, String namespace, ACLProvider zooKeeperAclProvider) { + String zkEnsemble = getQuorumServers(conf); + int sessionTimeout = (int) HiveConf.getTimeVar(conf, + ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS); + int connectionTimeout = (int) HiveConf.getTimeVar(conf, + ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS); + int baseSleepTime = (int) HiveConf.getTimeVar(conf, + ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS); + int maxRetries = HiveConf.getIntVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES); // Create a CuratorFramework instance to be used as the ZooKeeper client // Use the zooKeeperAclProvider to create appropriate ACLs - this.zooKeeperClient = CuratorFrameworkFactory.builder() - .connectString(zkEnsemble) - .sessionTimeoutMs(sessionTimeout) - .aclProvider(zooKeeperAclProvider) - .namespace(rootNs) - .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)) - .build(); + return CuratorFrameworkFactory.builder() + .connectString(zkEnsemble) + .sessionTimeoutMs(sessionTimeout) + .connectionTimeoutMs(connectionTimeout) + .aclProvider(zooKeeperAclProvider) + .namespace(namespace) + .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)) + .build(); } private static List createSecureAcls() { // Read all to the world - List nodeAcls = new ArrayList(ZooDefs.Ids.READ_ACL_UNSAFE); + List nodeAcls = new ArrayList<>(ZooDefs.Ids.READ_ACL_UNSAFE); // Create/Delete/Write/Admin to creator nodeAcls.addAll(ZooDefs.Ids.CREATOR_ALL_ACL); return nodeAcls; @@ -211,9 +244,9 @@ public ZkRegistryBase(String instanceName, Configuration conf, String rootNs, St * Get the ensemble server addresses from the configuration. The format is: host1:port, * host2:port.. * - * @param conf + * @param conf configuration **/ - private String getQuorumServers(Configuration conf) { + private static String getQuorumServers(Configuration conf) { String[] hosts = conf.getTrimmedStrings(ConfVars.HIVE_ZOOKEEPER_QUORUM.varname); String port = conf.get(ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.varname, ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.getDefaultValue()); @@ -238,7 +271,7 @@ private String getQuorumServers(Configuration conf) { protected final String registerServiceRecord(ServiceRecord srv) throws IOException { // restart sensitive instance id - srv.set(UNIQUE_IDENTIFIER, uniq.toString()); + srv.set(UNIQUE_IDENTIFIER, UNIQUE_ID.toString()); // Create a znode under the rootNamespace parent for this instance of the server try { @@ -275,11 +308,28 @@ protected final String registerServiceRecord(ServiceRecord srv) throws IOExcepti CloseableUtils.closeQuietly(znode); throw (e instanceof IOException) ? (IOException)e : new IOException(e); } - return uniq.toString(); + return UNIQUE_ID.toString(); } + protected final void updateServiceRecord(ServiceRecord srv) throws IOException { + try { + znode.setData(encoder.toBytes(srv)); + + if (doCheckAcls) { + try { + checkAndSetAcls(); + } catch (Exception ex) { + throw new IOException("Error validating or setting ACLs. " + disableMessage, ex); + } + } + } catch (Exception e) { + LOG.error("Unable to update znode with new service record", e); + CloseableUtils.closeQuietly(znode); + throw (e instanceof IOException) ? (IOException) e : new IOException(e); + } + } - protected final void initializeWithoutRegisteringInternal() throws IOException { + final void initializeWithoutRegisteringInternal() throws IOException { // Create a znode under the rootNamespace parent for this instance of the server try { try { @@ -345,8 +395,8 @@ private void setUpAcls(String path) throws Exception { private void addToCache(String path, String host, InstanceType instance) { instanceCacheLock.lock(); try { - putInCache(path, pathToInstanceCache, instance); - putInCache(host, nodeToInstanceCache, instance); + putInInstanceCache(path, pathToInstanceCache, instance); + putInNodeCache(host, nodeToInstanceCache, instance); } finally { instanceCacheLock.unlock(); } @@ -368,14 +418,19 @@ private void removeFromCache(String path, String host) { path, host, pathToInstanceCache.size(), nodeToInstanceCache.size()); } - private void putInCache(String key, Map> cache, + private void putInInstanceCache(String key, Map cache, InstanceType instance) { + cache.put(key, instance); + } + + private void putInNodeCache(String key, Map> cache, + InstanceType instance) { Set instanceSet = cache.get(key); if (instanceSet == null) { - instanceSet = Sets.newHashSet(); - cache.put(key, instanceSet); + instanceSet = new HashSet<>(); + instanceSet.add(instance); } - instanceSet.add(instance); + cache.put(key, instanceSet); } protected final void populateCache(PathChildrenCache instancesCache, boolean doInvokeListeners) { @@ -403,7 +458,7 @@ protected final void populateCache(PathChildrenCache instancesCache, boolean doI protected abstract InstanceType createServiceInstance(ServiceRecord srv) throws IOException; - protected static final byte[] getWorkerData(ChildData childData, String workerNodePrefix) { + protected static byte[] getWorkerData(ChildData childData, String workerNodePrefix) { if (childData == null) return null; byte[] data = childData.getData(); if (data == null) return null; @@ -415,8 +470,7 @@ protected final void populateCache(PathChildrenCache instancesCache, boolean doI private final Logger LOG = LoggerFactory.getLogger(InstanceStateChangeListener.class); @Override - public void childEvent(final CuratorFramework client, final PathChildrenCacheEvent event) - throws Exception { + public void childEvent(final CuratorFramework client, final PathChildrenCacheEvent event) { Preconditions.checkArgument(client != null && client.getState() == CuratorFrameworkState.STARTED, "client is not started"); @@ -427,28 +481,32 @@ public void childEvent(final CuratorFramework client, final PathChildrenCacheEve if (!nodeName.startsWith(workerNodePrefix)) return; LOG.info("{} for zknode {}", event.getType(), childData.getPath()); InstanceType instance = extractServiceInstance(event, childData); - int ephSeqVersion = extractSeqNum(nodeName); - switch (event.getType()) { - case CHILD_ADDED: - addToCache(childData.getPath(), instance.getHost(), instance); - for (ServiceInstanceStateChangeListener listener : stateChangeListeners) { - listener.onCreate(instance, ephSeqVersion); - } - break; - case CHILD_UPDATED: - addToCache(childData.getPath(), instance.getHost(), instance); - for (ServiceInstanceStateChangeListener listener : stateChangeListeners) { - listener.onUpdate(instance, ephSeqVersion); - } - break; - case CHILD_REMOVED: - removeFromCache(childData.getPath(), instance.getHost()); - for (ServiceInstanceStateChangeListener listener : stateChangeListeners) { - listener.onRemove(instance, ephSeqVersion); + if (instance != null) { + int ephSeqVersion = extractSeqNum(nodeName); + switch (event.getType()) { + case CHILD_ADDED: + addToCache(childData.getPath(), instance.getHost(), instance); + for (ServiceInstanceStateChangeListener listener : stateChangeListeners) { + listener.onCreate(instance, ephSeqVersion); + } + break; + case CHILD_UPDATED: + addToCache(childData.getPath(), instance.getHost(), instance); + for (ServiceInstanceStateChangeListener listener : stateChangeListeners) { + listener.onUpdate(instance, ephSeqVersion); + } + break; + case CHILD_REMOVED: + removeFromCache(childData.getPath(), instance.getHost()); + for (ServiceInstanceStateChangeListener listener : stateChangeListeners) { + listener.onRemove(instance, ephSeqVersion); + } + break; + default: + // Ignore all the other events; logged above. } - break; - default: - // Ignore all the other events; logged above. + } else { + LOG.info("instance is null for event: {} childData: {}", event.getType(), childData); } } } @@ -464,7 +522,7 @@ protected final int sizeInternal() { protected final Set getByHostInternal(String host) { Set byHost = nodeToInstanceCache.get(host); - byHost = (byHost == null) ? Sets.newHashSet() : byHost; + byHost = (byHost == null) ? Sets.newHashSet() : byHost; if (LOG.isDebugEnabled()) { LOG.debug("Returning " + byHost.size() + " hosts for locality allocation on " + host); } @@ -472,11 +530,7 @@ protected final int sizeInternal() { } protected final Collection getAllInternal() { - Set instances = new HashSet<>(); - for(Set instanceSet : pathToInstanceCache.values()) { - instances.addAll(instanceSet); - } - return instances; + return new HashSet<>(pathToInstanceCache.values()); } private static String extractNodeName(ChildData childData) { @@ -564,13 +618,17 @@ public void start() throws IOException { CloseableUtils.class.getName(); } + protected void unregisterInternal() { + CloseableUtils.closeQuietly(znode); + } + public void stop() { CloseableUtils.closeQuietly(znode); CloseableUtils.closeQuietly(instancesCache); CloseableUtils.closeQuietly(zooKeeperClient); } - protected final Set getInstancesByPath(String path) { + protected final InstanceType getInstanceByPath(String path) { return pathToInstanceCache.get(path); } @@ -588,4 +646,12 @@ private int extractSeqNum(String nodeName) { throw e; } } + + // for debugging + private class ZkConnectionStateListener implements ConnectionStateListener { + @Override + public void stateChanged(final CuratorFramework curatorFramework, final ConnectionState connectionState) { + LOG.info("Connection state change notification received. State: {}", connectionState); + } + } } diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java index 0120639..3aec46b 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; import org.apache.hadoop.hive.llap.tez.Converters; +import org.apache.hadoop.hive.registry.ServiceInstanceSet; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.NullWritable; @@ -343,7 +344,7 @@ private LlapServiceInstance getServiceInstance(JobConf job, LlapInputSplit llapS private LlapServiceInstance getServiceInstanceForHost(LlapRegistryService registryService, String host) throws IOException { InetAddress address = InetAddress.getByName(host); - LlapServiceInstanceSet instanceSet = registryService.getInstances(); + ServiceInstanceSet instanceSet = registryService.getInstances(); LlapServiceInstance serviceInstance = null; // The name used in the service registry may not match the host name we're using. @@ -375,7 +376,7 @@ private LlapServiceInstance getServiceInstanceForHost(LlapRegistryService regist private LlapServiceInstance getServiceInstanceRandom(LlapRegistryService registryService) throws IOException { - LlapServiceInstanceSet instanceSet = registryService.getInstances(); + ServiceInstanceSet instanceSet = registryService.getInstances(); LlapServiceInstance serviceInstance = null; LOG.info("Finding random live service instance"); diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java index 58bf8dc..b944fad 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java @@ -34,8 +34,10 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.llap.registry.impl.LlapZookeeperRegistryImpl; +import org.apache.hadoop.hive.registry.ServiceInstanceSet; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.CompositeService; @@ -230,7 +232,8 @@ public void doGet(HttpServletRequest request, HttpServletResponse response) { } jg.writeStringField("identity", registry.getWorkerIdentity()); jg.writeArrayFieldStart("peers"); - for (LlapServiceInstance s : registry.getInstances().getAllInstancesOrdered(false)) { + ServiceInstanceSet instanceSet = registry.getInstances(); + for (LlapServiceInstance s : ((LlapServiceInstanceSet) instanceSet).getAllInstancesOrdered(false)) { jg.writeStartObject(); jg.writeStringField("identity", s.getWorkerIdentity()); jg.writeStringField("host", s.getHost()); diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index 66de3b8..6ddecca 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -14,6 +14,16 @@ package org.apache.hadoop.hive.llap.tezplugins; +import com.google.common.io.ByteArrayDataOutput; + +import org.apache.hadoop.hive.registry.ServiceInstanceSet; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.yarn.api.records.ApplicationId; + +import org.apache.hadoop.hive.registry.impl.TezAmRegistryImpl; + +import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 46cfe56..a051f90 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -98,13 +98,18 @@ public static TezSessionPoolManager getInstance() { protected TezSessionPoolManager() { } - public void startPool() throws Exception { + public void startPool(HiveConf conf, final WMFullResourcePlan resourcePlan) throws Exception { if (defaultSessionPool != null) { defaultSessionPool.start(); } if (expirationTracker != null) { expirationTracker.start(); } + initTriggers(conf); + if (resourcePlan != null) { + updateTriggers(resourcePlan); + LOG.info("Updated tez session pool manager with active resource plan: {}", resourcePlan.getPlan().getName()); + } } public void setupPool(HiveConf conf) throws Exception { @@ -157,8 +162,6 @@ public TezSessionPoolSession create(TezSessionPoolSession oldSession) { numConcurrentLlapQueries = conf.getIntVar(ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES); llapQueue = new Semaphore(numConcurrentLlapQueries, true); - initTriggers(conf); - String queueAllowedStr = HiveConf.getVar(initConf, ConfVars.HIVE_SERVER2_TEZ_SESSION_CUSTOM_QUEUE_ALLOWED); try { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java index d1b3fec..d3748ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java @@ -18,25 +18,20 @@ package org.apache.hadoop.hive.ql.exec.tez; -import com.google.common.util.concurrent.SettableFuture; -import org.apache.hadoop.hive.registry.impl.TezAmInstance; -import org.apache.hadoop.conf.Configuration; import java.io.IOException; import java.net.URISyntaxException; -import java.util.Collection; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; + import javax.security.auth.login.LoginException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; + import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; -import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider; -import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; import org.apache.hadoop.hive.registry.impl.TezAmInstance; import org.apache.tez.dag.api.TezException; + import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index b98fb58..046ea19 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -738,7 +738,10 @@ public LocalResource getAppJarLr() { private Path createTezDir(String sessionId, String suffix) throws IOException { // tez needs its own scratch dir (per session) // TODO: De-link from SessionState. A TezSession can be linked to different Hive Sessions via the pool. - Path tezDir = new Path(SessionState.get().getHdfsScratchDirURIString(), TEZ_DIR); + SessionState sessionState = SessionState.get(); + String hdfsScratchDir = sessionState == null ? HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR) : sessionState + .getHdfsScratchDirURIString(); + Path tezDir = new Path(hdfsScratchDir, TEZ_DIR); tezDir = new Path(tezDir, sessionId + ((suffix == null) ? "" : ("-" + suffix))); FileSystem fs = tezDir.getFileSystem(conf); FsPermission fsPermission = new FsPermission(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIRPERMISSION)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java index bc438bb..1b7321b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.split.SplitLocationProvider; @@ -52,7 +53,7 @@ public static SplitLocationProvider getSplitLocationProvider(Configuration conf, LOG.info("Using LLAP instance " + serviceRegistry.getApplicationId()); Collection serviceInstances = - serviceRegistry.getInstances().getAllInstancesOrdered(true); + serviceRegistry.getInstances().getAllInstancesOrdered(true); Preconditions.checkArgument(!serviceInstances.isEmpty(), "No running LLAP daemons! Please check LLAP service status and zookeeper configuration"); ArrayList locations = new ArrayList<>(serviceInstances.size()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java index a8d729d..0d1990a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.impl.InactiveServiceInstance; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; +import org.apache.hadoop.hive.registry.ServiceInstanceSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -112,7 +113,7 @@ public boolean initClusterInfo() { return false; // Don't fail; this is best-effort. } } - LlapServiceInstanceSet instances; + ServiceInstanceSet instances; try { instances = svc.getInstances(10); } catch (IOException e) { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java index d261623..d5b683f 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java @@ -18,11 +18,14 @@ package org.apache.hadoop.hive.ql.exec.tez; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Random; + import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.junit.Before; @@ -90,7 +93,7 @@ public void testSessionPoolGetInOrder() { poolManager = new TestTezSessionPoolManager(); poolManager.setupPool(conf); - poolManager.startPool(); + poolManager.startPool(conf, null); // this is now a LIFO operation // draw 1 and replace @@ -153,7 +156,7 @@ public void testSessionPoolThreads() { poolManager = new TestTezSessionPoolManager(); poolManager.setupPool(conf); - poolManager.startPool(); + poolManager.startPool(conf, null); TezSessionState[] sessions = new TezSessionState[12]; int[] queueCounts = new int[3]; for (int i = 0; i < sessions.length; ++i) { @@ -234,7 +237,7 @@ public void testLlapSessionQueuing() { conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES, 2); poolManager = new TestTezSessionPoolManager(); poolManager.setupPool(conf); - poolManager.startPool(); + poolManager.startPool(conf, null); } catch (Exception e) { LOG.error("Initialization error", e); fail(); @@ -295,7 +298,7 @@ public void testReturn() { try { poolManager = new TestTezSessionPoolManager(); poolManager.setupPool(conf); - poolManager.startPool(); + poolManager.startPool(conf, null); } catch (Exception e) { e.printStackTrace(); fail(); diff --git a/service/pom.xml b/service/pom.xml index 9ad7555..e3774df 100644 --- a/service/pom.xml +++ b/service/pom.xml @@ -264,7 +264,48 @@ ${junit.version} test - + + org.apache.tez + tez-api + ${tez.version} + test + + + org.apache.tez + tez-runtime-library + ${tez.version} + test + + + org.slf4j + slf4j-log4j12 + + + commmons-logging + commons-logging + + + + + org.apache.tez + tez-dag + ${tez.version} + test + + + org.slf4j + slf4j-log4j12 + + + commmons-logging + commons-logging + + + org.apache.hadoop + hadoop-yarn-server-web-proxy + + + org.mockito mockito-all diff --git a/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java b/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java new file mode 100644 index 0000000..6514d98 --- /dev/null +++ b/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.server; + +import static org.apache.hive.service.server.HiveServer2.INSTANCE_URI_CONFIG; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; + +import org.apache.commons.lang3.StringUtils; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.leader.LeaderLatch; +import org.apache.curator.framework.recipes.leader.LeaderLatchListener; +import org.apache.curator.utils.CloseableUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.registry.ServiceRegistry; +import org.apache.hadoop.hive.registry.ServiceInstanceSet; +import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; +import org.apache.hadoop.hive.registry.impl.ZkRegistryBase; +import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; +import org.apache.hadoop.registry.client.binding.RegistryUtils; +import org.apache.hadoop.registry.client.types.Endpoint; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hive.service.ServiceException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +public class HS2ActivePassiveHARegistry extends ZkRegistryBase implements + ServiceRegistry, HiveServer2HAInstanceSet { + private static final Logger LOG = LoggerFactory.getLogger(HS2ActivePassiveHARegistry.class); + static final String ACTIVE_ENDPOINT = "activeEndpoint"; + static final String PASSIVE_ENDPOINT = "passiveEndpoint"; + private static final String SASL_LOGIN_CONTEXT_NAME = "HS2ActivePassiveHAZooKeeperClient"; + private static final String INSTANCE_PREFIX = "instance-"; + private static final String INSTANCE_GROUP = "instances"; + private static final String LEADER_LATCH_PATH = "/_LEADER"; + private LeaderLatch leaderLatch; + private ServiceRecord srv; + private boolean isClient; + + // There are 2 paths under which the instances get registered + // 1) Standard path used by ZkRegistryBase where all instances register themselves (also stores metadata) + // Secure: /hs2ActivePassiveHA-sasl/instances/instance-0000000000 + // Unsecure: /hs2ActivePassiveHA-unsecure/instances/instance-0000000000 + // 2) Leader latch path used for HS2 HA Active/Passive configuration where all instances register under _LEADER + // path but only one among them is the leader + // Secure: /hs2ActivePassiveHA-sasl/_LEADER/xxxx-latch-0000000000 + // Unsecure: /hs2ActivePassiveHA-unsecure/_LEADER/xxxx-latch-0000000000 + static HS2ActivePassiveHARegistry create(Configuration conf, boolean isClient) { + String zkNameSpace = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_REGISTRY_NAMESPACE); + Preconditions.checkArgument(!StringUtils.isBlank(zkNameSpace), + HiveConf.ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_REGISTRY_NAMESPACE.varname + " cannot be null or empty"); + String principal = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL); + String keytab = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB); + String zkNameSpacePrefix = zkNameSpace + "-"; + return new HS2ActivePassiveHARegistry(null, zkNameSpacePrefix, LEADER_LATCH_PATH, principal, keytab, + SASL_LOGIN_CONTEXT_NAME, conf, isClient); + } + + private HS2ActivePassiveHARegistry(final String instanceName, final String zkNamespacePrefix, + final String leaderLatchPath, + final String krbPrincipal, final String krbKeytab, final String saslContextName, final Configuration conf, + final boolean isClient) { + super(instanceName, conf, null, zkNamespacePrefix, null, INSTANCE_PREFIX, INSTANCE_GROUP, + saslContextName, krbPrincipal, krbKeytab, null); + this.isClient = isClient; + leaderLatch = new LeaderLatch(zooKeeperClient, leaderLatchPath, UNIQUE_ID.toString(), + LeaderLatch.CloseMode.NOTIFY_LEADER); + } + + @Override + public void start() throws IOException { + super.start(); + if (!isClient) { + this.srv = getNewServiceRecord(); + register(); + leaderLatch.addListener(new HS2LeaderLatchListener()); + try { + // all participating instances uses the same latch path, and curator randomly chooses one instance to be leader + // which can be verified via leaderLatch.hasLeadership() + leaderLatch.start(); + } catch (Exception e) { + throw new IOException(e); + } + LOG.info("Registered HS2 with ZK. service record: {}", srv); + } else { + populateCache(); + } + } + + @Override + protected void unregisterInternal() { + super.unregisterInternal(); + } + + @Override + public String register() throws IOException { + updateEndpoint(srv, PASSIVE_ENDPOINT); + return registerServiceRecord(srv); + } + + @Override + public void unregister() { + CloseableUtils.closeQuietly(leaderLatch); + unregisterInternal(); + } + + private void populateCache() throws IOException { + PathChildrenCache pcc = ensureInstancesCache(0); + populateCache(pcc, false); + } + + @Override + public ServiceInstanceSet getInstances(final String component, final long clusterReadyTimeoutMs) + throws IOException { + throw new IOException("Not supported to get instances by component name"); + } + + private void addActiveEndpointToServiceRecord() throws IOException { + addEndpointToServiceRecord(getNewServiceRecord(), ACTIVE_ENDPOINT); + } + + private void addPassiveEndpointToServiceRecord() throws IOException { + addEndpointToServiceRecord(getNewServiceRecord(), PASSIVE_ENDPOINT); + } + + private void addEndpointToServiceRecord(final ServiceRecord srv, final String endpointName) throws IOException { + updateEndpoint(srv, endpointName); + updateServiceRecord(srv); + } + + private void updateEndpoint(final ServiceRecord srv, final String endpointName) { + final String instanceUri = srv.get(INSTANCE_URI_CONFIG); + final String[] tokens = instanceUri.split(":"); + final String hostname = tokens[0]; + final int port = Integer.parseInt(tokens[1]); + Endpoint urlEndpoint = RegistryTypeUtils.ipcEndpoint(endpointName, new InetSocketAddress(hostname, port)); + srv.addInternalEndpoint(urlEndpoint); + LOG.info("Added {} endpoint to service record", urlEndpoint); + } + + @Override + public void stop() { + CloseableUtils.closeQuietly(leaderLatch); + super.stop(); + } + + @Override + protected HiveServer2Instance createServiceInstance(final ServiceRecord srv) throws IOException { + Endpoint activeEndpoint = srv.getInternalEndpoint(HS2ActivePassiveHARegistry.ACTIVE_ENDPOINT); + return new HiveServer2Instance(srv, activeEndpoint != null ? ACTIVE_ENDPOINT : PASSIVE_ENDPOINT); + } + + @Override + public synchronized void registerStateChangeListener( + final ServiceInstanceStateChangeListener listener) + throws IOException { + super.registerStateChangeListener(listener); + } + + @Override + public ApplicationId getApplicationId() throws IOException { + throw new IOException("Not supported until HS2 runs as YARN application"); + } + + @Override + protected String getZkPathUser(final Configuration conf) { + return RegistryUtils.currentUser(); + } + + private boolean hasLeadership() { + return leaderLatch.hasLeadership(); + } + + private class HS2LeaderLatchListener implements LeaderLatchListener { + + // leadership state changes and sending out notifications to listener happens inside synchronous method in curator. + // Do only lightweight actions in main-event handler thread. Time consuming operations are handled via separate + // executor service registered via registerLeaderLatchListener(). + @Override + public void isLeader() { + // only leader publishes instance uri as endpoint which will be used by clients to make connections to HS2 via + // service discovery. + try { + if (!hasLeadership()) { + LOG.info("isLeader notification received but hasLeadership returned false.. awaiting.."); + leaderLatch.await(); + } + addActiveEndpointToServiceRecord(); + LOG.info("HS2 instance in ACTIVE mode. Service record: {}", srv); + } catch (Exception e) { + throw new ServiceException("Unable to add active endpoint to service record", e); + } + } + + @Override + public void notLeader() { + try { + if (hasLeadership()) { + LOG.info("notLeader notification received but hasLeadership returned true.. awaiting.."); + leaderLatch.await(); + } + addPassiveEndpointToServiceRecord(); + LOG.info("HS2 instance lost leadership. Switched to PASSIVE standby mode. Service record: {}", srv); + } catch (Exception e) { + throw new ServiceException("Unable to add passive endpoint to service record", e); + } + } + } + + @Override + public HiveServer2Instance getLeader() { + for (HiveServer2Instance hs2Instance : getAll()) { + if (hs2Instance.isLeader()) { + return hs2Instance; + } + } + return null; + } + + @Override + public Collection getAll() { + return getAllInternal(); + } + + @Override + public HiveServer2Instance getInstance(final String instanceId) { + for (HiveServer2Instance hs2Instance : getAll()) { + if (hs2Instance.getWorkerIdentity().equals(instanceId)) { + return hs2Instance; + } + } + return null; + } + + @Override + public Set getByHost(final String host) { + return getByHostInternal(host); + } + + @Override + public int size() { + return sizeInternal(); + } + + /** + * If leadership related notifications is desired, use this method to register leader latch listener. + * + * @param latchListener - listener + * @param executorService - event handler executor service + */ + void registerLeaderLatchListener(final LeaderLatchListener latchListener, final ExecutorService executorService) { + leaderLatch.addListener(latchListener, executorService); + } + + private Map getConfsToPublish() { + final Map confsToPublish = new HashMap<>(); + // Hostname + confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname, + conf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname)); + // Hostname:port + confsToPublish.put(INSTANCE_URI_CONFIG, conf.get(INSTANCE_URI_CONFIG)); + confsToPublish.put(UNIQUE_IDENTIFIER, UNIQUE_ID.toString()); + // Transport mode + confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, + conf.get(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname)); + // Transport specific confs + if (HiveServer2.isHTTPTransportMode(conf)) { + confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT.varname, + conf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT.varname)); + confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH.varname, + conf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH.varname)); + } else { + confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, + conf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT.varname)); + confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_SASL_QOP.varname, + conf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_SASL_QOP.varname)); + } + // Auth specific confs + confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION.varname, + conf.get(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION.varname)); + if (HiveServer2.isKerberosAuthMode(conf)) { + confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL.varname, + conf.get(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL.varname)); + } + // SSL conf + confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_USE_SSL.varname, + conf.get(HiveConf.ConfVars.HIVE_SERVER2_USE_SSL.varname)); + return confsToPublish; + } + + private ServiceRecord getNewServiceRecord() { + ServiceRecord srv = new ServiceRecord(); + final Map confsToPublish = getConfsToPublish(); + for (Map.Entry entry : confsToPublish.entrySet()) { + srv.set(entry.getKey(), entry.getValue()); + } + return srv; + } +} diff --git a/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistryClient.java b/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistryClient.java new file mode 100644 index 0000000..512d4e8 --- /dev/null +++ b/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistryClient.java @@ -0,0 +1,54 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package org.apache.hive.service.server; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.registry.impl.ZkRegistryBase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +public class HS2ActivePassiveHARegistryClient { + private static final Logger LOG = LoggerFactory.getLogger(HS2ActivePassiveHARegistryClient.class); + private static final Map hs2Registries = new HashMap<>(); + + /** + * Helper method to get a HiveServer2HARegistry instance to read from the registry. Only used by clients (JDBC), + * service discovery to connect to active HS2 instance in Active/Passive HA configuration. + * + * @param conf {@link Configuration} instance which contains service registry information. + * @return HiveServer2HARegistry + */ + public static synchronized HS2ActivePassiveHARegistry getClient(Configuration conf) throws IOException { + String namespace = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_REGISTRY_NAMESPACE); + Preconditions.checkArgument(!StringUtils.isBlank(namespace), + HiveConf.ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_REGISTRY_NAMESPACE.varname + " cannot be null or empty"); + String nsKey = ZkRegistryBase.getRootNamespace(null, namespace + "-"); + HS2ActivePassiveHARegistry registry = hs2Registries.get(nsKey); + if (registry == null) { + registry = HS2ActivePassiveHARegistry.create(conf, true); + registry.start(); + hs2Registries.put(nsKey, registry); + } else { + LOG.debug("Returning cached registry client for nsKey: {}", nsKey); + } + return registry; + } +} diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index b7ece2b..5b792ac 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -27,10 +27,11 @@ import java.util.Properties; import java.util.Random; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; @@ -38,6 +39,7 @@ import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -45,8 +47,10 @@ import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.framework.recipes.leader.LeaderLatchListener; import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode; import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.JvmPauseMonitor; import org.apache.hadoop.hive.common.LogUtils; import org.apache.hadoop.hive.common.LogUtils.LogInitializationException; @@ -69,6 +73,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveMaterializedViewsRegistry; import org.apache.hadoop.hive.ql.session.ClearDanglingScratchDir; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.Utils; @@ -86,6 +91,8 @@ import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService; import org.apache.hive.service.cli.thrift.ThriftCLIService; import org.apache.hive.service.cli.thrift.ThriftHttpCLIService; +import org.apache.hive.service.servlet.HS2LeadershipStatus; +import org.apache.hive.service.servlet.HS2Peers; import org.apache.hive.service.servlet.QueryProfileServlet; import org.apache.logging.log4j.util.Strings; import org.apache.zookeeper.CreateMode; @@ -101,6 +108,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * HiveServer2. @@ -109,16 +117,27 @@ public class HiveServer2 extends CompositeService { private static CountDownLatch deleteSignal; private static final Logger LOG = LoggerFactory.getLogger(HiveServer2.class); + public static final String INSTANCE_URI_CONFIG = "hive.server2.instance.uri"; + private static final int SHUTDOWN_TIME = 60; private CLIService cliService; private ThriftCLIService thriftCLIService; private PersistentEphemeralNode znode; - private String znodePath; private CuratorFramework zooKeeperClient; private boolean deregisteredWithZooKeeper = false; // Set to true only when deregistration happens private HttpServer webServer; // Web UI private TezSessionPoolManager tezSessionPoolManager; private WorkloadManager wm; private PamAuthenticator pamAuthenticator; + private Map confsToPublish = new HashMap(); + private String serviceUri; + private boolean serviceDiscovery; + private boolean activePassiveHA; + private LeaderLatchListener leaderLatchListener; + private ExecutorService leaderActionsExecutorService; + private HS2ActivePassiveHARegistry hs2HARegistry; + private Hive sessionHive; + private String wmQueue; + private AtomicBoolean isLeader = new AtomicBoolean(false); public HiveServer2() { super(HiveServer2.class.getSimpleName()); @@ -140,13 +159,6 @@ public synchronized void init(HiveConf hiveConf) { if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_METRICS_ENABLED)) { MetricsFactory.init(hiveConf); } - - // will be invoked anyway in TezTask. Doing it early to initialize triggers for non-pool tez session. - tezSessionPoolManager = TezSessionPoolManager.getInstance(); - tezSessionPoolManager.initTriggers(hiveConf); - if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) { - tezSessionPoolManager.setupPool(hiveConf); - } } catch (Throwable t) { LOG.warn("Could not initiate the HiveServer2 Metrics system. Metrics may not be reported.", t); } @@ -191,15 +203,12 @@ public void run() { LlapRegistryService.getClient(hiveConf); } - Hive sessionHive = null; try { sessionHive = Hive.get(hiveConf); } catch (HiveException e) { throw new RuntimeException("Failed to get metastore connection", e); } - initializeWorkloadManagement(hiveConf, sessionHive); - // Create views registry HiveMaterializedViewsRegistry.get().init(); @@ -212,10 +221,17 @@ public void run() { } } + wmQueue = hiveConf.get(ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE.varname, "").trim(); + + this.serviceDiscovery = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY); + this.activePassiveHA = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE); + // Setup web UI + final int webUIPort; + final String webHost; try { - int webUIPort = - hiveConf.getIntVar(ConfVars.HIVE_SERVER2_WEBUI_PORT); + webUIPort = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_WEBUI_PORT); + webHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_WEBUI_BIND_HOST); // We disable web UI in tests unless the test is explicitly setting a // unique web ui port so that we don't mess up ptests. boolean uiDisabledInTest = hiveConf.getBoolVar(ConfVars.HIVE_IN_TEST) && @@ -229,7 +245,7 @@ public void run() { LOG.info("Starting Web UI on port "+ webUIPort); HttpServer.Builder builder = new HttpServer.Builder("hiveserver2"); builder.setPort(webUIPort).setConf(hiveConf); - builder.setHost(hiveConf.getVar(ConfVars.HIVE_SERVER2_WEBUI_BIND_HOST)); + builder.setHost(webHost); builder.setMaxThreads( hiveConf.getIntVar(ConfVars.HIVE_SERVER2_WEBUI_MAX_THREADS)); builder.setAdmins(hiveConf.getVar(ConfVars.USERS_IN_ADMIN_ROLE)); @@ -281,6 +297,12 @@ public void run() { throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_WEBUI_USE_SSL.varname + " has false value. It is recommended to set to true when PAM is used."); } } + if (serviceDiscovery && activePassiveHA) { + builder.setContextAttribute("hs2.isLeader", isLeader); + builder.setContextAttribute("hiveconf", hiveConf); + builder.addServlet("leader", HS2LeadershipStatus.class); + builder.addServlet("peers", HS2Peers.class); + } builder.addServlet("llap", LlapServlet.class); builder.addServlet("jdbcjar", JdbcJarDownloadServlet.class); builder.setContextRootRewriteTarget("/hiveserver2.jsp"); @@ -292,57 +314,25 @@ public void run() { } catch (IOException ie) { throw new ServiceException(ie); } - // Add a shutdown hook for catching SIGTERM & SIGINT - ShutdownHookManager.addShutdownHook(new Runnable() { - @Override - public void run() { - hiveServer2.stop(); - } - }); - } - private void initializeWorkloadManagement(HiveConf hiveConf, Hive sessionHive) { - String wmQueue = HiveConf.getVar(hiveConf, ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE); - boolean hasQueue = wmQueue != null && !wmQueue.isEmpty(); - WMFullResourcePlan resourcePlan; try { - resourcePlan = sessionHive.getActiveResourcePlan(); - } catch (Throwable e) { - if (!HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_IN_TEST_SSL)) { - throw new RuntimeException(e); - } else { - resourcePlan = null; // Ignore errors in SSL tests where the connection is misconfigured. - } - } - if (hasQueue && resourcePlan == null - && HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_IN_TEST)) { - LOG.info("Creating a default resource plan for test"); - resourcePlan = createTestResourcePlan(); - } - if (resourcePlan == null) { - if (!hasQueue) { - LOG.info("Workload management is not enabled and there's no resource plan"); - return; // TODO: we could activate it anyway, similar to the below; in case someone - // wants to activate a resource plan for Tez triggers only w/o restart. - } - LOG.warn("Workload management is enabled but there's no resource plan"); - } - - if (hasQueue) { - // Initialize workload management. - LOG.info("Initializing workload management"); - try { - wm = WorkloadManager.create(wmQueue, hiveConf, resourcePlan); - } catch (ExecutionException | InterruptedException e) { - throw new ServiceException("Unable to instantiate Workload Manager", e); + if (serviceDiscovery) { + serviceUri = getServerInstanceURI(); + addConfsToPublish(hiveConf, confsToPublish, serviceUri); + if (activePassiveHA) { + hiveConf.set(INSTANCE_URI_CONFIG, serviceUri); + leaderLatchListener = new HS2LeaderLatchListener(this, SessionState.get()); + leaderActionsExecutorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Leader Actions Handler Thread").build()); + hs2HARegistry = HS2ActivePassiveHARegistry.create(hiveConf, false); + } } + } catch (Exception e) { + throw new ServiceException(e); } - if (resourcePlan != null) { - tezSessionPoolManager.updateTriggers(resourcePlan); - LOG.info("Updated tez session pool manager with active resource plan: {}", - resourcePlan.getPlan().getName()); - } + // Add a shutdown hook for catching SIGTERM & SIGINT + ShutdownHookManager.addShutdownHook(() -> hiveServer2.stop()); } private WMFullResourcePlan createTestResourcePlan() { @@ -356,10 +346,10 @@ private WMFullResourcePlan createTestResourcePlan() { return resourcePlan; } - public static boolean isHTTPTransportMode(HiveConf hiveConf) { + public static boolean isHTTPTransportMode(Configuration hiveConf) { String transportMode = System.getenv("HIVE_SERVER2_TRANSPORT_MODE"); if (transportMode == null) { - transportMode = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE); + transportMode = hiveConf.get(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname); } if (transportMode != null && (transportMode.equalsIgnoreCase("http"))) { return true; @@ -367,8 +357,8 @@ public static boolean isHTTPTransportMode(HiveConf hiveConf) { return false; } - public static boolean isKerberosAuthMode(HiveConf hiveConf) { - String authMode = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION); + public static boolean isKerberosAuthMode(Configuration hiveConf) { + String authMode = hiveConf.get(ConfVars.HIVE_SERVER2_AUTHENTICATION.varname); if (authMode != null && (authMode.equalsIgnoreCase("KERBEROS"))) { return true; } @@ -408,7 +398,7 @@ public static boolean isKerberosAuthMode(HiveConf hiveConf) { * @param hiveConf * @throws Exception */ - private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception { + private void addServerInstanceToZooKeeper(HiveConf hiveConf, Map confsToPublish) throws Exception { String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf); String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE); String instanceURI = getServerInstanceURI(); @@ -449,8 +439,8 @@ private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception { if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_PUBLISH_CONFIGS)) { // HiveServer2 configs that this instance will publish to ZooKeeper, // so that the clients can read these and configure themselves properly. - Map confsToPublish = new HashMap(); - addConfsToPublish(hiveConf, confsToPublish); + + addConfsToPublish(hiveConf, confsToPublish, instanceURI); // Publish configs for this instance as the data on the node znodeData = Joiner.on(';').withKeyValueSeparator("=").join(confsToPublish); } else { @@ -467,7 +457,7 @@ private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception { throw new Exception("Max znode creation wait time: " + znodeCreationTimeout + "s exhausted"); } setDeregisteredWithZooKeeper(false); - znodePath = znode.getActualPath(); + final String znodePath = znode.getActualPath(); // Set a watch on the znode if (zooKeeperClient.checkExists().usingWatcher(new DeRegisterWatcher()).forPath(znodePath) == null) { // No node exists, throw exception @@ -487,10 +477,12 @@ private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception { * Add conf keys, values that HiveServer2 will publish to ZooKeeper. * @param hiveConf */ - private void addConfsToPublish(HiveConf hiveConf, Map confsToPublish) { + private void addConfsToPublish(HiveConf hiveConf, Map confsToPublish, String serviceUri) { // Hostname confsToPublish.put(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname, hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST)); + // Hostname:port + confsToPublish.put(INSTANCE_URI_CONFIG, serviceUri); // Transport mode confsToPublish.put(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, hiveConf.getVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE)); @@ -540,6 +532,11 @@ private void setUpZooKeeperAuth(HiveConf hiveConf) throws Exception { } } + public boolean isLeader() { + return isLeader.get(); + } + + /** * The watcher class which sets the de-register flag when the znode corresponding to this server * instance is deleted. Additionally, it shuts down the server if there are no more active client @@ -609,10 +606,16 @@ public synchronized void start() { super.start(); // If we're supporting dynamic service discovery, we'll add the service uri for this // HiveServer2 instance to Zookeeper as a znode. - HiveConf hiveConf = this.getHiveConf(); - if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) { + HiveConf hiveConf = getHiveConf(); + if (serviceDiscovery) { try { - addServerInstanceToZooKeeper(hiveConf); + if (activePassiveHA) { + hs2HARegistry.registerLeaderLatchListener(leaderLatchListener, leaderActionsExecutorService); + hs2HARegistry.start(); + LOG.info("HS2 HA registry started"); + } else { + addServerInstanceToZooKeeper(hiveConf, confsToPublish); + } } catch (Exception e) { LOG.error("Error adding this HiveServer2 instance to ZooKeeper: ", e); throw new ServiceException(e); @@ -627,22 +630,125 @@ public synchronized void start() { throw new ServiceException(e); } } + + if (!activePassiveHA) { + LOG.info("HS2 interactive HA not enabled. Starting tez sessions.."); + startOrReconnectTezSessions(); + } else { + LOG.info("HS2 interactive HA enabled. Tez sessions will be started/reconnected by the leader."); + } + } + + private static class HS2LeaderLatchListener implements LeaderLatchListener { + private HiveServer2 hiveServer2; + private SessionState parentSession; + + HS2LeaderLatchListener(final HiveServer2 hs2, final SessionState parentSession) { + this.hiveServer2 = hs2; + this.parentSession = parentSession; + } + + // leadership status change happens inside synchronized methods LeaderLatch.setLeadership(). + // Also we use single threaded executor service for handling notifications which guarantees ordering for + // notification handling. if a leadership status change happens when tez sessions are getting created, + // the notLeader notification will get queued in executor service. + @Override + public void isLeader() { + LOG.info("HS2 instance {} became the LEADER. Starting/Reconnecting tez sessions..", hiveServer2.serviceUri); + hiveServer2.isLeader.set(true); + if (parentSession != null) { + SessionState.setCurrentSessionState(parentSession); + } + hiveServer2.startOrReconnectTezSessions(); + LOG.info("Started/Reconnected tez sessions."); + } + + @Override + public void notLeader() { + LOG.info("HS2 instance {} LOST LEADERSHIP. Stopping/Disconnecting tez sessions..", hiveServer2.serviceUri); + hiveServer2.isLeader.set(false); + hiveServer2.stopOrDisconnectTezSessions(); + LOG.info("Stopped/Disconnected tez sessions."); + } + } + + private void startOrReconnectTezSessions() { + LOG.info("Starting/Reconnecting tez sessions.."); + // TODO: add tez session reconnect after TEZ-3875 + WMFullResourcePlan resourcePlan = null; + if (!StringUtils.isEmpty(wmQueue)) { + try { + resourcePlan = sessionHive.getActiveResourcePlan(); + } catch (HiveException e) { + if (!HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_IN_TEST_SSL)) { + throw new RuntimeException(e); + } else { + resourcePlan = null; // Ignore errors in SSL tests where the connection is misconfigured. + } + } + + if (resourcePlan == null && HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_IN_TEST)) { + LOG.info("Creating a default resource plan for test"); + resourcePlan = createTestResourcePlan(); + } + } + initAndStartTezSessionPoolManager(resourcePlan); + initAndStartWorkloadManager(resourcePlan); + } + + private void initAndStartTezSessionPoolManager(final WMFullResourcePlan resourcePlan) { + // starting Tez session pool in start here to let parent session state initialize on CliService state, to avoid + // SessionState.get() return null during createTezDir + try { + // will be invoked anyway in TezTask. Doing it early to initialize triggers for non-pool tez session. + LOG.info("Initializing tez session pool manager"); + tezSessionPoolManager = TezSessionPoolManager.getInstance(); + if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) { + tezSessionPoolManager.setupPool(hiveConf); + } + tezSessionPoolManager.startPool(hiveConf, resourcePlan); + LOG.info("Tez session pool manager initialized."); + } catch (Exception e) { + throw new ServiceException("Unable to setup tez session pool", e); + } + } + + private void initAndStartWorkloadManager(final WMFullResourcePlan resourcePlan) { + if (!StringUtils.isEmpty(wmQueue)) { + // Initialize workload management. + LOG.info("Initializing workload management"); + try { + wm = WorkloadManager.create(wmQueue, hiveConf, resourcePlan); + wm.start(); + LOG.info("Workload manager initialized."); + } catch (Exception e) { + throw new ServiceException("Unable to instantiate and start Workload Manager", e); + } + } else { + LOG.info("Workload management is not enabled."); + } + } + + private void stopOrDisconnectTezSessions() { + LOG.info("Stoppping/Disconnecting tez sessions."); + // There should already be an instance of the session pool manager. + // If not, ignoring is fine while stopping HiveServer2. if (tezSessionPoolManager != null) { try { - tezSessionPoolManager.startPool(); - LOG.info("Started tez session pool manager.."); + tezSessionPoolManager.stop(); + LOG.info("Stopped tez session pool manager."); } catch (Exception e) { - LOG.error("Error starting tez session pool manager: ", e); - throw new ServiceException(e); + LOG.error("Tez session pool manager stop had an error during stop of HiveServer2. " + + "Shutting down HiveServer2 anyway.", e); } } if (wm != null) { try { - wm.start(); - LOG.info("Started workload manager.."); + wm.stop(); + LOG.info("Stopped workload manager."); } catch (Exception e) { - LOG.error("Error starting workload manager", e); - throw new ServiceException(e); + LOG.error("Workload manager stop had an error during stop of HiveServer2. " + + "Shutting down HiveServer2 anyway.", e); } } } @@ -652,6 +758,12 @@ public synchronized void stop() { LOG.info("Shutting down HiveServer2"); HiveConf hiveConf = this.getHiveConf(); super.stop(); + if (hs2HARegistry != null) { + hs2HARegistry.stop(); + shutdownExecutor(leaderActionsExecutorService); + LOG.info("HS2 HA registry stopped"); + hs2HARegistry = null; + } if (webServer != null) { try { webServer.stop(); @@ -670,32 +782,15 @@ public synchronized void stop() { } } // Remove this server instance from ZooKeeper if dynamic service discovery is set - if (hiveConf != null && hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) { + if (serviceDiscovery && !activePassiveHA) { try { removeServerInstanceFromZooKeeper(); } catch (Exception e) { LOG.error("Error removing znode for this HiveServer2 instance from ZooKeeper.", e); } } - // There should already be an instance of the session pool manager. - // If not, ignoring is fine while stopping HiveServer2. - if (hiveConf != null && hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS) && - tezSessionPoolManager != null) { - try { - tezSessionPoolManager.stop(); - } catch (Exception e) { - LOG.error("Tez session pool manager stop had an error during stop of HiveServer2. " - + "Shutting down HiveServer2 anyway.", e); - } - } - if (wm != null) { - try { - wm.stop(); - } catch (Exception e) { - LOG.error("Workload manager stop had an error during stop of HiveServer2. " - + "Shutting down HiveServer2 anyway.", e); - } - } + + stopOrDisconnectTezSessions(); if (hiveConf != null && hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { try { @@ -706,6 +801,22 @@ public synchronized void stop() { } } + private void shutdownExecutor(final ExecutorService leaderActionsExecutorService) { + leaderActionsExecutorService.shutdown(); + try { + if (!leaderActionsExecutorService.awaitTermination(SHUTDOWN_TIME, TimeUnit.SECONDS)) { + LOG.warn("Executor service did not terminate in the specified time {} sec", SHUTDOWN_TIME); + List droppedTasks = leaderActionsExecutorService.shutdownNow(); + LOG.warn("Executor service was abruptly shut down. " + droppedTasks.size() + " tasks will not be executed."); + } + } catch (InterruptedException e) { + LOG.warn("Executor service did not terminate in the specified time {} sec. Exception: {}", SHUTDOWN_TIME, + e.getMessage()); + List droppedTasks = leaderActionsExecutorService.shutdownNow(); + LOG.warn("Executor service was abruptly shut down. " + droppedTasks.size() + " tasks will not be executed."); + } + } + @VisibleForTesting public static void scheduleClearDanglingScratchDir(HiveConf hiveConf, int initialWaitInSec) { if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_CLEAR_DANGLING_SCRATCH_DIR)) { diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2HAInstanceSet.java b/service/src/java/org/apache/hive/service/server/HiveServer2HAInstanceSet.java new file mode 100644 index 0000000..b31d63c --- /dev/null +++ b/service/src/java/org/apache/hive/service/server/HiveServer2HAInstanceSet.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.server; + +import org.apache.hadoop.hive.registry.ServiceInstanceSet; + +public interface HiveServer2HAInstanceSet extends ServiceInstanceSet { + + /** + * In Active/Passive setup, returns current active leader. + * + * @return leader instance + */ + HiveServer2Instance getLeader(); +} diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2Instance.java b/service/src/java/org/apache/hive/service/server/HiveServer2Instance.java new file mode 100644 index 0000000..558e809 --- /dev/null +++ b/service/src/java/org/apache/hive/service/server/HiveServer2Instance.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.server; + +import java.io.IOException; +import java.util.Objects; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.registry.impl.ServiceInstanceBase; +import org.apache.hadoop.registry.client.types.Endpoint; +import org.apache.hadoop.registry.client.types.ServiceRecord; + +import com.google.common.base.Preconditions; + +public class HiveServer2Instance extends ServiceInstanceBase { + private boolean isLeader; + private String transportMode; + private String httpEndpoint; + + // empty c'tor to make jackson happy + public HiveServer2Instance() { + + } + + public HiveServer2Instance(final ServiceRecord srv, final String endPointName) throws IOException { + super(srv, endPointName); + + Endpoint activeEndpoint = srv.getInternalEndpoint(HS2ActivePassiveHARegistry.ACTIVE_ENDPOINT); + Endpoint passiveEndpoint = srv.getInternalEndpoint(HS2ActivePassiveHARegistry.PASSIVE_ENDPOINT); + this.isLeader = activeEndpoint != null; + Preconditions.checkArgument(activeEndpoint == null || passiveEndpoint == null, + "Incorrect service record. Both active and passive endpoints cannot be non-null!"); + this.transportMode = srv.get(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname); + if (transportMode.equalsIgnoreCase("http")) { + this.httpEndpoint = srv.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH.varname); + } else { + this.httpEndpoint = ""; + } + } + + public boolean isLeader() { + return isLeader; + } + + public String getTransportMode() { + return transportMode; + } + + public String getHttpEndpoint() { + return httpEndpoint; + } + + public void setLeader(final boolean leader) { + isLeader = leader; + } + + public void setTransportMode(final String transportMode) { + this.transportMode = transportMode; + } + + public void setHttpEndpoint(final String httpEndpoint) { + this.httpEndpoint = httpEndpoint; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + HiveServer2Instance other = (HiveServer2Instance) o; + return super.equals(o) && isLeader == other.isLeader + && Objects.equals(transportMode, other.transportMode) + && Objects.equals(httpEndpoint, other.httpEndpoint); + } + + @Override + public int hashCode() { + return super.hashCode() + Objects.hashCode(isLeader) + Objects.hashCode(transportMode) + Objects.hashCode(httpEndpoint); + } + + @Override + public String toString() { + String result = "instanceId: " + getWorkerIdentity() + " isLeader: " + isLeader + " host: " + getHost() + + " port: " + getRpcPort() + " transportMode: " + transportMode; + if (httpEndpoint != null) { + result += " httpEndpoint: " + httpEndpoint; + } + return result; + } +} diff --git a/service/src/java/org/apache/hive/service/servlet/HS2LeadershipStatus.java b/service/src/java/org/apache/hive/service/servlet/HS2LeadershipStatus.java new file mode 100644 index 0000000..921a23e --- /dev/null +++ b/service/src/java/org/apache/hive/service/servlet/HS2LeadershipStatus.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.service.servlet; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.servlet.ServletContext; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.codehaus.jackson.map.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Returns 200 if this HS2 instance is leader. + */ +public class HS2LeadershipStatus extends HttpServlet { + private static final Logger LOG = LoggerFactory.getLogger(HS2LeadershipStatus.class); + + @Override + public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException { + ServletContext ctx = getServletContext(); + AtomicBoolean isLeader = (AtomicBoolean) ctx.getAttribute("hs2.isLeader"); + LOG.info("Returning isLeader: {}", isLeader); + ObjectMapper mapper = new ObjectMapper(); + mapper.writerWithDefaultPrettyPrinter().writeValue(response.getWriter(), isLeader); + response.setStatus(HttpServletResponse.SC_OK); + response.flushBuffer(); + } +} diff --git a/service/src/java/org/apache/hive/service/servlet/HS2Peers.java b/service/src/java/org/apache/hive/service/servlet/HS2Peers.java new file mode 100644 index 0000000..a51bbeb --- /dev/null +++ b/service/src/java/org/apache/hive/service/servlet/HS2Peers.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.service.servlet; + +import java.io.IOException; +import java.util.Collection; + +import javax.servlet.ServletContext; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hive.service.server.HS2ActivePassiveHARegistry; +import org.apache.hive.service.server.HS2ActivePassiveHARegistryClient; +import org.apache.hive.service.server.HiveServer2Instance; +import org.codehaus.jackson.annotate.JsonAutoDetect; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializationConfig; + +/** + * Returns all HS2 instances in Active-Passive standy modes. + */ +public class HS2Peers extends HttpServlet { + public static class HS2Instances { + private Collection hiveServer2Instances; + + // empty c'tor to make jackson happy + public HS2Instances() { + } + + public HS2Instances(final Collection hiveServer2Instances) { + this.hiveServer2Instances = hiveServer2Instances; + } + + public Collection getHiveServer2Instances() { + return hiveServer2Instances; + } + + public void setHiveServer2Instances(final Collection hiveServer2Instances) { + this.hiveServer2Instances = hiveServer2Instances; + } + } + + @Override + public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException { + ServletContext ctx = getServletContext(); + HiveConf hiveConf = (HiveConf) ctx.getAttribute("hiveconf"); + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false); + // serialize json based on field annotations only + mapper.setVisibilityChecker(mapper.getSerializationConfig().getDefaultVisibilityChecker() + .withSetterVisibility(JsonAutoDetect.Visibility.NONE)); + HS2ActivePassiveHARegistry hs2Registry = HS2ActivePassiveHARegistryClient.getClient(hiveConf); + HS2Instances instances = new HS2Instances(hs2Registry.getAll()); + mapper.writerWithDefaultPrettyPrinter().writeValue(response.getWriter(), instances); + response.setStatus(HttpServletResponse.SC_OK); + response.flushBuffer(); + } +}