diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 711dfbd..17597ed 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2432,6 +2432,13 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "The maximum number of past queries to show in HiverSever2 WebUI."), // Tez session settings + HIVE_SERVER2_INTERACTIVE_HA_ENABLE("hive.server2.interactive.ha.enable", false, + "Whether HiveServer2 Active/Passive High Availability be enabled when Hive Interactive sessions are enabled." + + "This will also require hive.server2.support.dynamic.service.discovery to be enabled."), + HIVE_SERVER2_INTERACTIVE_HA_REGISTRY_NAMESPACE("hive.server2.interactive.ha.registry.namespace", + "hs2-interactive-ha", + "When HiveServer2 Active/Passive High Availability is enabled, uses this namespace for registering HS2\n" + + "instances with zookeeper"), HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE("hive.server2.tez.interactive.queue", "", "A single YARN queues to use for Hive Interactive sessions. When this is specified,\n" + "workload management is enabled and used for these sessions."), diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java index 5d7f813..6178b4b 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java @@ -14,13 +14,16 @@ package org.apache.hadoop.hive.llap.registry; import java.io.IOException; + +import org.apache.hadoop.hive.registry.ServiceInstance; +import org.apache.hadoop.hive.registry.ServiceInstanceSet; import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; import org.apache.hadoop.yarn.api.records.ApplicationId; /** * ServiceRegistry interface for switching between fixed host and dynamic registry implementations. */ -public interface ServiceRegistry { +public interface ServiceRegistry { /** * Start the service registry @@ -49,14 +52,14 @@ * @param clusterReadyTimeoutMs The time to wait for the cluster to be ready, if it's not * started yet. 0 means do not wait. */ - LlapServiceInstanceSet getInstances(String component, long clusterReadyTimeoutMs) throws IOException; + ServiceInstanceSet getInstances(String component, long clusterReadyTimeoutMs) throws + IOException; /** * Adds state change listeners for service instances. * @param listener - state change listener */ - void registerStateChangeListener( - ServiceInstanceStateChangeListener listener) throws IOException; + void registerStateChangeListener(ServiceInstanceStateChangeListener listener) throws IOException; /** * @return The application ID of the LLAP cluster. diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java index c88198f..f99d86c 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.ServiceRegistry; +import org.apache.hadoop.hive.registry.ServiceInstance; import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.StringUtils; @@ -267,8 +268,7 @@ public LlapServiceInstanceSet getInstances(String component, long timeoutMs) thr } @Override - public void registerStateChangeListener( - final ServiceInstanceStateChangeListener listener) { + public void registerStateChangeListener(final ServiceInstanceStateChangeListener listener) throws IOException { // nothing to set LOG.warn("Callbacks for instance state changes are not supported in fixed registry."); } diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java index 80a6aba..6d0a552 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.ServiceRegistry; +import org.apache.hadoop.hive.registry.ServiceInstanceSet; import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; import org.apache.hadoop.registry.client.binding.RegistryUtils; import org.apache.hadoop.service.AbstractService; @@ -35,7 +36,7 @@ private static final Logger LOG = LoggerFactory.getLogger(LlapRegistryService.class); - private ServiceRegistry registry = null; + private ServiceRegistry registry = null; private final boolean isDaemon; private boolean isDynamic = false; private String identity = "(pending)"; @@ -131,11 +132,11 @@ private void unregisterWorker() throws IOException { } } - public LlapServiceInstanceSet getInstances() throws IOException { + public ServiceInstanceSet getInstances() throws IOException { return getInstances(0); } - public LlapServiceInstanceSet getInstances(long clusterReadyTimeoutMs) throws IOException { + public ServiceInstanceSet getInstances(long clusterReadyTimeoutMs) throws IOException { return this.registry.getInstances("LLAP", clusterReadyTimeoutMs); } diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java index 8339230..ee80429 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java @@ -53,7 +53,7 @@ import org.slf4j.LoggerFactory; public class LlapZookeeperRegistryImpl - extends ZkRegistryBase implements ServiceRegistry { + extends ZkRegistryBase implements ServiceRegistry { private static final Logger LOG = LoggerFactory.getLogger(LlapZookeeperRegistryImpl.class); /** @@ -79,7 +79,7 @@ public LlapZookeeperRegistryImpl(String instanceName, Configuration conf) { super(instanceName, conf, HiveConf.getVar(conf, ConfVars.LLAP_ZK_REGISTRY_NAMESPACE), NAMESPACE_PREFIX, - USER_SCOPE_PATH_PREFIX, WORKER_PREFIX, + USER_SCOPE_PATH_PREFIX, WORKER_PREFIX, WORKER_GROUP, LlapProxy.isDaemon() ? SASL_LOGIN_CONTEXT_NAME : null, HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL), HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE), diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java index 783a19f..739a368 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; +import org.apache.hadoop.hive.registry.ServiceInstanceSet; import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; @@ -55,7 +56,7 @@ private final SocketFactory socketFactory; private final RetryPolicy retryPolicy; private final Configuration conf; - private LlapServiceInstanceSet activeInstances; + private ServiceInstanceSet activeInstances; private Collection lastKnownInstances; private LlapManagementProtocolClientImpl client; private LlapServiceInstance clientInstance; diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/RegistryUtilities.java b/llap-client/src/java/org/apache/hadoop/hive/registry/RegistryUtilities.java new file mode 100644 index 0000000..e069e43 --- /dev/null +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/RegistryUtilities.java @@ -0,0 +1,52 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.registry; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.UUID; + +public class RegistryUtilities { + private static final String LOCALHOST = "localhost"; + + /** + * Will return hostname stored in InetAddress. + * + * @return hostname + */ + public static String getHostName() { + try { + return InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + return LOCALHOST; + } + } + + /** + * Will return FQDN of the host after doing reverse DNS lookip. + * + * @return FQDN of host + */ + public static String getCanonicalHostName() { + try { + return InetAddress.getLocalHost().getCanonicalHostName(); + } catch (UnknownHostException e) { + return LOCALHOST; + } + } + + public static String getUUID() { + return String.valueOf(UUID.randomUUID()); + } +} \ No newline at end of file diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java index 908b3bb..4493e99 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java @@ -21,27 +21,27 @@ * Worker identity is a UUID (unique across restarts), to identify a node which died & was brought * back on the same host/port */ - public abstract String getWorkerIdentity(); + String getWorkerIdentity(); /** * Hostname of the service instance * - * @return + * @return service hostname */ - public abstract String getHost(); + String getHost(); /** * RPC Endpoint for service instance - * - * @return + * + * @return rpc port */ - public int getRpcPort(); + int getRpcPort(); /** * Config properties of the Service Instance (llap.daemon.*) - * - * @return + * + * @return properties */ - public abstract Map getProperties(); + Map getProperties(); } \ No newline at end of file diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java index 34fba5c..63178cc 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java @@ -29,15 +29,15 @@ * The worker identity does not collide between restarts, so each restart will have a unique id, * while having the same host/ip pair. * - * @return + * @return instance list */ Collection getAll(); /** * Get an instance by worker identity. * - * @param name - * @return + * @param name worker id + * @return instance */ InstanceType getInstance(String name); @@ -46,13 +46,13 @@ * * The list could include dead and alive instances. * - * @param host - * @return + * @param host hostname + * @return instance list */ Set getByHost(String host); /** - * Get number of instances in the currently availabe. + * Get number of instances in the currently available. * * @return - number of instances */ diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceStateChangeListener.java b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceStateChangeListener.java index f1890e6..cc1ba33 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceStateChangeListener.java +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceStateChangeListener.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java index 0724cf5..6469adf 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java @@ -13,29 +13,26 @@ */ package org.apache.hadoop.hive.registry.impl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.IOException; import org.apache.commons.codec.binary.Base64; - -import com.google.common.io.ByteStreams; - -import org.apache.tez.common.security.JobTokenIdentifier; - -import org.apache.hadoop.security.token.Token; - -import java.io.IOException; import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; import org.apache.hadoop.registry.client.types.AddressTypes; import org.apache.hadoop.registry.client.types.Endpoint; import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.security.token.Token; +import org.apache.tez.common.security.JobTokenIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.io.ByteStreams; public class TezAmInstance extends ServiceInstanceBase { private static final Logger LOG = LoggerFactory.getLogger(TezAmInstance.class); private final int pluginPort; private Token token; - public TezAmInstance(ServiceRecord srv) throws IOException { + TezAmInstance(ServiceRecord srv) throws IOException { super(srv, TezAmRegistryImpl.IPC_TEZCLIENT); final Endpoint plugin = srv.getInternalEndpoint(TezAmRegistryImpl.IPC_PLUGIN); if (plugin != null) { diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java index 417e571..ab02cf4 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java @@ -37,21 +37,19 @@ static final String AM_SESSION_ID = "am.session.id", AM_PLUGIN_TOKEN = "am.plugin.token", AM_PLUGIN_JOBID = "am.plugin.jobid"; private final static String NAMESPACE_PREFIX = "tez-am-"; - private final static String USER_SCOPE_PATH_PREFIX = "user-"; - private static final String WORKER_PREFIX = "worker-"; private static final String SASL_LOGIN_CONTEXT_NAME = "TezAmZooKeeperClient"; private final String registryName; - public static TezAmRegistryImpl create(Configuration conf, boolean b) { + public static TezAmRegistryImpl create(Configuration conf, boolean useSecureZk) { String amRegistryName = HiveConf.getVar(conf, ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME); return StringUtils.isBlank(amRegistryName) ? null - : new TezAmRegistryImpl(amRegistryName, conf, true); + : new TezAmRegistryImpl(amRegistryName, conf, useSecureZk); } private TezAmRegistryImpl(String instanceName, Configuration conf, boolean useSecureZk) { - super(instanceName, conf, null, NAMESPACE_PREFIX, USER_SCOPE_PATH_PREFIX, WORKER_PREFIX, + super(instanceName, conf, null, NAMESPACE_PREFIX, USER_SCOPE_PATH_PREFIX, WORKER_PREFIX, WORKER_GROUP, useSecureZk ? SASL_LOGIN_CONTEXT_NAME : null, HiveConf.getVar(conf, ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_PRINCIPAL), HiveConf.getVar(conf, ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_KEYTAB_FILE), diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java index 17269dd..6dfddb1 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java @@ -13,6 +13,7 @@ */ package org.apache.hadoop.hive.registry.impl; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -50,6 +51,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.LlapUtil; +import org.apache.hadoop.hive.registry.RegistryUtilities; import org.apache.hadoop.hive.registry.ServiceInstance; import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; import org.apache.hadoop.registry.client.binding.RegistryUtils; @@ -77,9 +79,12 @@ private static final Logger LOG = LoggerFactory.getLogger(ZkRegistryBase.class); private final static String SASL_NAMESPACE = "sasl"; private final static String UNSECURE_NAMESPACE = "unsecure"; - - static final String UNIQUE_IDENTIFIER = "registry.unique.id"; - private static final UUID uniq = UUID.randomUUID(); + protected final static String USER_SCOPE_PATH_PREFIX = "user-"; + protected static final String WORKER_PREFIX = "worker-"; + protected static final String WORKER_GROUP = "workers"; + public static final String UNIQUE_IDENTIFIER = "registry.unique.id"; + protected static final UUID UNIQUE_ID = UUID.randomUUID(); + private static final Joiner PATH_JOINER = Joiner.on("/").skipNulls(); protected final Configuration conf; protected final CuratorFramework zooKeeperClient; @@ -109,29 +114,22 @@ private PathChildrenCache instancesCache; // Created on demand. /** Local hostname. */ - protected static final String hostname; - static { - String localhost = "localhost"; - try { - localhost = InetAddress.getLocalHost().getCanonicalHostName(); - } catch (UnknownHostException uhe) { - // ignore - } - hostname = localhost; - } + protected static final String hostname = RegistryUtilities.getCanonicalHostName(); /** * @param rootNs A single root namespace override. Not recommended. - * @param nsPrefix The namespace prefix to use with default namespaces. + * @param nsPrefix The namespace prefix to use with default namespaces (appends 'sasl' for secure else 'unsecure' + * to namespace prefix to get effective root namespace). * @param userScopePathPrefix The prefix to use for the user-specific part of the path. * @param workerPrefix The prefix to use for each worker znode. + * @param workerGroup group name to use for all workers * @param zkSaslLoginContextName SASL login context name for ZK security; null if not needed. * @param zkPrincipal ZK security principal. * @param zkKeytab ZK security keytab. * @param aclsConfig A config setting to use to determine if ACLs should be verified. */ public ZkRegistryBase(String instanceName, Configuration conf, String rootNs, String nsPrefix, - String userScopePathPrefix, String workerPrefix, + String userScopePathPrefix, String workerPrefix, String workerGroup, String zkSaslLoginContextName, String zkPrincipal, String zkKeytab, ConfVars aclsConfig) { this.conf = new Configuration(conf); this.saslLoginContextName = zkSaslLoginContextName; @@ -145,29 +143,51 @@ public ZkRegistryBase(String instanceName, Configuration conf, String rootNs, St this.disableMessage = ""; } this.conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); - String zkEnsemble = getQuorumServers(this.conf); this.encoder = new RegistryUtils.ServiceRecordMarshal(); - int sessionTimeout = (int) HiveConf.getTimeVar(conf, - ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS); - int baseSleepTime = (int) HiveConf.getTimeVar(conf, - ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS); - int maxRetries = HiveConf.getIntVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES); // sample path: /llap-sasl/hiveuser/hostname/workers/worker-0000000 // worker-0000000 is the sequence number which will be retained until session timeout. If a // worker does not respond due to communication interruptions it will retain the same sequence // number when it returns back. If session timeout expires, the node will be deleted and new // addition of the same node (restart) will get next sequence number - this.userPathPrefix = userScopePathPrefix + getZkPathUser(this.conf); - this.workerNodePrefix = workerPrefix; - this.workersPath = "/" + userPathPrefix + "/" + instanceName + "/workers"; + this.userPathPrefix = userScopePathPrefix == null ? null : userScopePathPrefix + getZkPathUser(conf); + this.workerNodePrefix = workerPrefix == null ? WORKER_PREFIX : workerPrefix; + this.workersPath = "/" + PATH_JOINER.join(userPathPrefix, instanceName, workerGroup); this.instancesCache = null; this.stateChangeListeners = new HashSet<>(); this.pathToInstanceCache = new ConcurrentHashMap<>(); this.nodeToInstanceCache = new ConcurrentHashMap<>(); + final String namespace = getRootNamespace(rootNs, nsPrefix); + ACLProvider aclProvider; + // get acl provider for most outer path that is non-null + if (userPathPrefix == null) { + if (instanceName == null) { + if (workerGroup == null) { + aclProvider = getACLProviderForZKPath(namespace); + } else { + aclProvider = getACLProviderForZKPath(workerGroup); + } + } else { + aclProvider = getACLProviderForZKPath(instanceName); + } + } else { + aclProvider = getACLProviderForZKPath(userScopePathPrefix); + } + this.zooKeeperClient = getZookeeperClient(conf, namespace, aclProvider); + } + + public static String getRootNamespace(String userProvidedNamespace, String defaultNamespacePrefix) { + final boolean isSecure = UserGroupInformation.isSecurityEnabled(); + String rootNs = userProvidedNamespace; + if (rootNs == null) { + rootNs = defaultNamespacePrefix + (isSecure ? SASL_NAMESPACE : UNSECURE_NAMESPACE); + } + return rootNs; + } + private ACLProvider getACLProviderForZKPath(String zkPath) { final boolean isSecure = UserGroupInformation.isSecurityEnabled(); - ACLProvider zooKeeperAclProvider = new ACLProvider() { + return new ACLProvider() { @Override public List getDefaultAcl() { // We always return something from getAclForPath so this should not happen. @@ -177,26 +197,32 @@ public ZkRegistryBase(String instanceName, Configuration conf, String rootNs, St @Override public List getAclForPath(String path) { - if (!isSecure || path == null || !path.contains(userPathPrefix)) { + if (!isSecure || path == null || !path.contains(zkPath)) { // No security or the path is below the user path - full access. return Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE); } return createSecureAcls(); } }; - if (rootNs == null) { - rootNs = nsPrefix + (isSecure ? SASL_NAMESPACE : UNSECURE_NAMESPACE); // The normal path. - } + } + + private CuratorFramework getZookeeperClient(Configuration conf, String namespace, ACLProvider zooKeeperAclProvider) { + String zkEnsemble = getQuorumServers(conf); + int sessionTimeout = (int) HiveConf.getTimeVar(conf, + ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS); + int baseSleepTime = (int) HiveConf.getTimeVar(conf, + ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS); + int maxRetries = HiveConf.getIntVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES); // Create a CuratorFramework instance to be used as the ZooKeeper client // Use the zooKeeperAclProvider to create appropriate ACLs - this.zooKeeperClient = CuratorFrameworkFactory.builder() - .connectString(zkEnsemble) - .sessionTimeoutMs(sessionTimeout) - .aclProvider(zooKeeperAclProvider) - .namespace(rootNs) - .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)) - .build(); + return CuratorFrameworkFactory.builder() + .connectString(zkEnsemble) + .sessionTimeoutMs(sessionTimeout) + .aclProvider(zooKeeperAclProvider) + .namespace(namespace) + .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)) + .build(); } private static List createSecureAcls() { @@ -213,7 +239,7 @@ public ZkRegistryBase(String instanceName, Configuration conf, String rootNs, St * * @param conf **/ - private String getQuorumServers(Configuration conf) { + private static String getQuorumServers(Configuration conf) { String[] hosts = conf.getTrimmedStrings(ConfVars.HIVE_ZOOKEEPER_QUORUM.varname); String port = conf.get(ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.varname, ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.getDefaultValue()); @@ -238,7 +264,7 @@ private String getQuorumServers(Configuration conf) { protected final String registerServiceRecord(ServiceRecord srv) throws IOException { // restart sensitive instance id - srv.set(UNIQUE_IDENTIFIER, uniq.toString()); + srv.set(UNIQUE_IDENTIFIER, UNIQUE_ID.toString()); // Create a znode under the rootNamespace parent for this instance of the server try { @@ -275,9 +301,26 @@ protected final String registerServiceRecord(ServiceRecord srv) throws IOExcepti CloseableUtils.closeQuietly(znode); throw (e instanceof IOException) ? (IOException)e : new IOException(e); } - return uniq.toString(); + return UNIQUE_ID.toString(); } + protected final void updateServiceRecord(ServiceRecord srv) throws IOException { + try { + znode.setData(encoder.toBytes(srv)); + + if (doCheckAcls) { + try { + checkAndSetAcls(); + } catch (Exception ex) { + throw new IOException("Error validating or setting ACLs. " + disableMessage, ex); + } + } + } catch (Exception e) { + LOG.error("Unable to update znode with new service record", e); + CloseableUtils.closeQuietly(znode); + throw (e instanceof IOException) ? (IOException) e : new IOException(e); + } + } protected final void initializeWithoutRegisteringInternal() throws IOException { // Create a znode under the rootNamespace parent for this instance of the server @@ -564,6 +607,10 @@ public void start() throws IOException { CloseableUtils.class.getName(); } + public void unregisterInternal() { + CloseableUtils.closeQuietly(znode); + } + public void stop() { CloseableUtils.closeQuietly(znode); CloseableUtils.closeQuietly(instancesCache); diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/metrics/MetricsUtils.java b/llap-common/src/java/org/apache/hadoop/hive/llap/metrics/MetricsUtils.java index c22ce4e..0d331b0 100644 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/metrics/MetricsUtils.java +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/metrics/MetricsUtils.java @@ -24,6 +24,7 @@ /** * Utility methods for metrics system. */ +// TODO: replace this with RegistryUtils public class MetricsUtils { private static final String LOCALHOST = "localhost"; public static final String METRICS_PROCESS_NAME = "LlapDaemon"; diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java index fc360d4..ba60e47 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; import org.apache.hadoop.hive.llap.tez.Converters; +import org.apache.hadoop.hive.registry.ServiceInstanceSet; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.NullWritable; @@ -343,7 +344,7 @@ private LlapServiceInstance getServiceInstance(JobConf job, LlapInputSplit llapS private LlapServiceInstance getServiceInstanceForHost(LlapRegistryService registryService, String host) throws IOException { InetAddress address = InetAddress.getByName(host); - LlapServiceInstanceSet instanceSet = registryService.getInstances(); + ServiceInstanceSet instanceSet = registryService.getInstances(); LlapServiceInstance serviceInstance = null; // The name used in the service registry may not match the host name we're using. @@ -375,7 +376,7 @@ private LlapServiceInstance getServiceInstanceForHost(LlapRegistryService regist private LlapServiceInstance getServiceInstanceRandom(LlapRegistryService registryService) throws IOException { - LlapServiceInstanceSet instanceSet = registryService.getInstances(); + ServiceInstanceSet instanceSet = registryService.getInstances(); LlapServiceInstance serviceInstance = null; LOG.info("Finding random live service instance"); diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java index 58bf8dc..b944fad 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java @@ -34,8 +34,10 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.llap.registry.impl.LlapZookeeperRegistryImpl; +import org.apache.hadoop.hive.registry.ServiceInstanceSet; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.CompositeService; @@ -230,7 +232,8 @@ public void doGet(HttpServletRequest request, HttpServletResponse response) { } jg.writeStringField("identity", registry.getWorkerIdentity()); jg.writeArrayFieldStart("peers"); - for (LlapServiceInstance s : registry.getInstances().getAllInstancesOrdered(false)) { + ServiceInstanceSet instanceSet = registry.getInstances(); + for (LlapServiceInstance s : ((LlapServiceInstanceSet) instanceSet).getAllInstancesOrdered(false)) { jg.writeStartObject(); jg.writeStringField("identity", s.getWorkerIdentity()); jg.writeStringField("host", s.getHost()); diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index e97a267..788cde5 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -16,6 +16,7 @@ import com.google.common.io.ByteArrayDataOutput; +import org.apache.hadoop.hive.registry.ServiceInstanceSet; import org.apache.hadoop.io.Text; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -149,7 +150,7 @@ public void setError(TaskInfo ctx, Throwable t) { private final Configuration conf; // interface into the registry service - private LlapServiceInstanceSet activeInstances; + private ServiceInstanceSet activeInstances; // Tracks all instances, including ones which have been disabled in the past. // LinkedHashMap to provide the same iteration order when selecting a random host. @@ -1225,7 +1226,8 @@ private SelectHostResult selectHost(TaskInfo request) { } /* fall through - miss in locality or no locality-requested */ - Collection instances = activeInstances.getAllInstancesOrdered(true); + Collection instances = ((LlapServiceInstanceSet) activeInstances) + .getAllInstancesOrdered(true); List allNodes = new ArrayList<>(instances.size()); List activeNodesWithFreeSlots = new ArrayList<>(); for (LlapServiceInstance inst : instances) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 3c1b8d0..fda6e9f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -99,13 +99,18 @@ public static TezSessionPoolManager getInstance() { protected TezSessionPoolManager() { } - public void startPool() throws Exception { + public void startPool(HiveConf conf, final WMFullResourcePlan resourcePlan) throws Exception { if (defaultSessionPool != null) { defaultSessionPool.start(); } if (expirationTracker != null) { expirationTracker.start(); } + initTriggers(conf); + if (resourcePlan != null) { + updateTriggers(resourcePlan); + LOG.info("Updated tez session pool manager with active resource plan: {}", resourcePlan.getPlan().getName()); + } } public void setupPool(HiveConf conf) throws Exception { @@ -158,8 +163,6 @@ public TezSessionPoolSession create(TezSessionPoolSession oldSession) { numConcurrentLlapQueries = conf.getIntVar(ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES); llapQueue = new Semaphore(numConcurrentLlapQueries, true); - initTriggers(conf); - String queueAllowedStr = HiveConf.getVar(initConf, ConfVars.HIVE_SERVER2_TEZ_SESSION_CUSTOM_QUEUE_ALLOWED); try { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 5e892c6..2409e75 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -702,7 +702,10 @@ public LocalResource getAppJarLr() { private Path createTezDir(String sessionId, String suffix) throws IOException { // tez needs its own scratch dir (per session) // TODO: De-link from SessionState. A TezSession can be linked to different Hive Sessions via the pool. - Path tezDir = new Path(SessionState.get().getHdfsScratchDirURIString(), TEZ_DIR); + SessionState sessionState = SessionState.get(); + String hdfsScratchDir = sessionState == null ? HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR) : sessionState + .getHdfsScratchDirURIString(); + Path tezDir = new Path(hdfsScratchDir, TEZ_DIR); tezDir = new Path(tezDir, sessionId + ((suffix == null) ? "" : ("-" + suffix))); FileSystem fs = tezDir.getFileSystem(conf); FsPermission fsPermission = new FsPermission(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIRPERMISSION)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java index 4c8e7bb..97648e6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.split.SplitLocationProvider; @@ -44,7 +45,7 @@ public static SplitLocationProvider getSplitLocationProvider(Configuration conf, LOG.info("Using LLAP instance " + serviceRegistry.getApplicationId()); Collection serviceInstances = - serviceRegistry.getInstances().getAllInstancesOrdered(true); + ((LlapServiceInstanceSet) serviceRegistry.getInstances()).getAllInstancesOrdered(true); Preconditions.checkArgument(!serviceInstances.isEmpty(), "No running LLAP daemons! Please check LLAP service status and zookeeper configuration"); ArrayList locations = new ArrayList<>(serviceInstances.size()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java index 2623a0e..973208e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.impl.InactiveServiceInstance; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; +import org.apache.hadoop.hive.registry.ServiceInstanceSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -112,7 +113,7 @@ public boolean initClusterInfo() { return false; // Don't fail; this is best-effort. } } - LlapServiceInstanceSet instances; + ServiceInstanceSet instances; try { instances = svc.getInstances(10); } catch (IOException e) { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java index 8fbe9a7..a616021 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java @@ -18,11 +18,14 @@ package org.apache.hadoop.hive.ql.exec.tez; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Random; + import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.junit.Before; @@ -90,7 +93,7 @@ public void testSessionPoolGetInOrder() { poolManager = new TestTezSessionPoolManager(); poolManager.setupPool(conf); - poolManager.startPool(); + poolManager.startPool(conf, null); // this is now a LIFO operation // draw 1 and replace @@ -153,7 +156,7 @@ public void testSessionPoolThreads() { poolManager = new TestTezSessionPoolManager(); poolManager.setupPool(conf); - poolManager.startPool(); + poolManager.startPool(conf, null); TezSessionState[] sessions = new TezSessionState[12]; int[] queueCounts = new int[3]; for (int i = 0; i < sessions.length; ++i) { @@ -234,7 +237,7 @@ public void testLlapSessionQueuing() { conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES, 2); poolManager = new TestTezSessionPoolManager(); poolManager.setupPool(conf); - poolManager.startPool(); + poolManager.startPool(conf, null); } catch (Exception e) { LOG.error("Initialization error", e); fail(); @@ -295,7 +298,7 @@ public void testReturn() { try { poolManager = new TestTezSessionPoolManager(); poolManager.setupPool(conf); - poolManager.startPool(); + poolManager.startPool(conf, null); } catch (Exception e) { e.printStackTrace(); fail(); diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index 223be6a..157f6bc 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -27,7 +27,7 @@ import java.util.Properties; import java.util.Random; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -38,6 +38,7 @@ import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -45,8 +46,10 @@ import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.framework.recipes.leader.LeaderLatchListener; import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode; import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.JvmPauseMonitor; import org.apache.hadoop.hive.common.LogUtils; import org.apache.hadoop.hive.common.LogUtils.LogInitializationException; @@ -98,6 +101,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * HiveServer2. @@ -106,6 +110,8 @@ public class HiveServer2 extends CompositeService { private static CountDownLatch deleteSignal; private static final Logger LOG = LoggerFactory.getLogger(HiveServer2.class); + public static final String INSTANCE_URI_CONFIG = "hive.server2.instance.uri"; + private static final int SHUTDOWN_TIME = 60; private CLIService cliService; private ThriftCLIService thriftCLIService; private PersistentEphemeralNode znode; @@ -115,6 +121,14 @@ private HttpServer webServer; // Web UI private TezSessionPoolManager tezSessionPoolManager; private WorkloadManager wm; + private Map confsToPublish = new HashMap(); + private boolean serviceDiscovery; + private boolean activePassiveHA; + private LeaderLatchListener leaderLatchListener; + private ExecutorService leaderActionsExecutorService; + private HiveServer2HARegistry hs2HARegistry; + private Hive sessionHive; + private String wmQueue; public HiveServer2() { super(HiveServer2.class.getSimpleName()); @@ -128,13 +142,6 @@ public synchronized void init(HiveConf hiveConf) { if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_METRICS_ENABLED)) { MetricsFactory.init(hiveConf); } - - // will be invoked anyway in TezTask. Doing it early to initialize triggers for non-pool tez session. - tezSessionPoolManager = TezSessionPoolManager.getInstance(); - tezSessionPoolManager.initTriggers(hiveConf); - if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) { - tezSessionPoolManager.setupPool(hiveConf); - } } catch (Throwable t) { LOG.warn("Could not initiate the HiveServer2 Metrics system. Metrics may not be reported.", t); } @@ -179,51 +186,23 @@ public void run() { LlapRegistryService.getClient(hiveConf); } - Hive sessionHive = null; try { sessionHive = Hive.get(hiveConf); } catch (HiveException e) { throw new RuntimeException("Failed to get metastore connection", e); } - String wmQueue = HiveConf.getVar(hiveConf, ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE); - if (wmQueue != null && !wmQueue.isEmpty()) { - WMFullResourcePlan resourcePlan; - try { - resourcePlan = sessionHive.getActiveResourcePlan(); - } catch (HiveException e) { - throw new RuntimeException(e); - } - - if (resourcePlan == null) { - if (!HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_IN_TEST)) { - LOG.error("Cannot activate workload management - no active resource plan"); - } else { - LOG.info("Creating a default resource plan for test"); - resourcePlan = createTestResourcePlan(); - } - } - - if (resourcePlan != null) { - // Initialize workload management. - LOG.info("Initializing workload management"); - try { - wm = WorkloadManager.create(wmQueue, hiveConf, resourcePlan); - } catch (ExecutionException | InterruptedException e) { - throw new ServiceException("Unable to instantiate Workload Manager", e); - } - } - tezSessionPoolManager.updateTriggers(resourcePlan); - LOG.info("Updated tez session pool manager with active resource plan: {}", resourcePlan.getPlan().getName()); - } - // Create views registry HiveMaterializedViewsRegistry.get().init(sessionHive); + wmQueue = hiveConf.get(ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE.varname, "").trim(); + // Setup web UI + final int webUIPort; + final String webHost; try { - int webUIPort = - hiveConf.getIntVar(ConfVars.HIVE_SERVER2_WEBUI_PORT); + webUIPort = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_WEBUI_PORT); + webHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_WEBUI_BIND_HOST); // We disable web UI in tests unless the test is explicitly setting a // unique web ui port so that we don't mess up ptests. boolean uiDisabledInTest = hiveConf.getBoolVar(ConfVars.HIVE_IN_TEST) && @@ -237,7 +216,7 @@ public void run() { LOG.info("Starting Web UI on port "+ webUIPort); HttpServer.Builder builder = new HttpServer.Builder("hiveserver2"); builder.setPort(webUIPort).setConf(hiveConf); - builder.setHost(hiveConf.getVar(ConfVars.HIVE_SERVER2_WEBUI_BIND_HOST)); + builder.setHost(webHost); builder.setMaxThreads( hiveConf.getIntVar(ConfVars.HIVE_SERVER2_WEBUI_MAX_THREADS)); builder.setAdmins(hiveConf.getVar(ConfVars.USERS_IN_ADMIN_ROLE)); @@ -284,13 +263,29 @@ public void run() { } catch (IOException ie) { throw new ServiceException(ie); } - // Add a shutdown hook for catching SIGTERM & SIGINT - ShutdownHookManager.addShutdownHook(new Runnable() { - @Override - public void run() { - hiveServer2.stop(); + + this.serviceDiscovery = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY); + this.activePassiveHA = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_INTERACTIVE_HA_ENABLE); + + try { + if (serviceDiscovery) { + final String serviceUri = getServerInstanceURI(); + addConfsToPublish(hiveConf, confsToPublish, serviceUri); + if (activePassiveHA) { + hiveConf.set(INSTANCE_URI_CONFIG, serviceUri); + leaderLatchListener = new HS2LeaderLatchListener(this); + leaderActionsExecutorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Leader Actions Handler Thread").build()); + hs2HARegistry = HiveServer2HARegistry.create(hiveConf); + hs2HARegistry.init(); + } } - }); + } catch (Exception e) { + throw new ServiceException(e); + } + + // Add a shutdown hook for catching SIGTERM & SIGINT + ShutdownHookManager.addShutdownHook(() -> hiveServer2.stop()); } private WMFullResourcePlan createTestResourcePlan() { @@ -304,10 +299,10 @@ private WMFullResourcePlan createTestResourcePlan() { return resourcePlan; } - public static boolean isHTTPTransportMode(HiveConf hiveConf) { + public static boolean isHTTPTransportMode(Configuration hiveConf) { String transportMode = System.getenv("HIVE_SERVER2_TRANSPORT_MODE"); if (transportMode == null) { - transportMode = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE); + transportMode = hiveConf.get(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname); } if (transportMode != null && (transportMode.equalsIgnoreCase("http"))) { return true; @@ -315,8 +310,8 @@ public static boolean isHTTPTransportMode(HiveConf hiveConf) { return false; } - public static boolean isKerberosAuthMode(HiveConf hiveConf) { - String authMode = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION); + public static boolean isKerberosAuthMode(Configuration hiveConf) { + String authMode = hiveConf.get(ConfVars.HIVE_SERVER2_AUTHENTICATION.varname); if (authMode != null && (authMode.equalsIgnoreCase("KERBEROS"))) { return true; } @@ -356,7 +351,7 @@ public static boolean isKerberosAuthMode(HiveConf hiveConf) { * @param hiveConf * @throws Exception */ - private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception { + private void addServerInstanceToZooKeeper(HiveConf hiveConf, Map confsToPublish) throws Exception { String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf); String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE); String instanceURI = getServerInstanceURI(); @@ -397,8 +392,8 @@ private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception { if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_PUBLISH_CONFIGS)) { // HiveServer2 configs that this instance will publish to ZooKeeper, // so that the clients can read these and configure themselves properly. - Map confsToPublish = new HashMap(); - addConfsToPublish(hiveConf, confsToPublish); + + addConfsToPublish(hiveConf, confsToPublish, instanceURI); // Publish configs for this instance as the data on the node znodeData = Joiner.on(';').withKeyValueSeparator("=").join(confsToPublish); } else { @@ -435,10 +430,12 @@ private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception { * Add conf keys, values that HiveServer2 will publish to ZooKeeper. * @param hiveConf */ - private void addConfsToPublish(HiveConf hiveConf, Map confsToPublish) { + private void addConfsToPublish(HiveConf hiveConf, Map confsToPublish, String serviceUri) { // Hostname confsToPublish.put(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname, hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST)); + // Hostname:port + confsToPublish.put(INSTANCE_URI_CONFIG, serviceUri); // Transport mode confsToPublish.put(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, hiveConf.getVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE)); @@ -557,10 +554,16 @@ public synchronized void start() { super.start(); // If we're supporting dynamic service discovery, we'll add the service uri for this // HiveServer2 instance to Zookeeper as a znode. - HiveConf hiveConf = this.getHiveConf(); - if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) { + HiveConf hiveConf = getHiveConf(); + if (serviceDiscovery) { try { - addServerInstanceToZooKeeper(hiveConf); + if (activePassiveHA) { + hs2HARegistry.registerLeaderLatchListener(leaderLatchListener, leaderActionsExecutorService); + hs2HARegistry.start(); + LOG.info("HS2 HA registry started"); + } else { + addServerInstanceToZooKeeper(hiveConf, confsToPublish); + } } catch (Exception e) { LOG.error("Error adding this HiveServer2 instance to ZooKeeper: ", e); throw new ServiceException(e); @@ -575,22 +578,113 @@ public synchronized void start() { throw new ServiceException(e); } } + + if (!activePassiveHA) { + LOG.info("HS2 interactive HA not enabled. Starting tez sessions.."); + startOrReconnectTezSessions(); + } else { + LOG.info("HS2 interactive HA enabled. Tez sessions will be started/reconnected by the leader."); + } + } + + private static class HS2LeaderLatchListener implements LeaderLatchListener { + private HiveServer2 hiveServer2; + + HS2LeaderLatchListener(final HiveServer2 hs2) { + this.hiveServer2 = hs2; + } + + @Override + public void isLeader() { + LOG.info("HS2 instance became the LEADER. Starting/Reconnecting tez sessions.."); + hiveServer2.startOrReconnectTezSessions(); + } + + @Override + public void notLeader() { + LOG.info("HS2 instance LOST LEADERSHIP. Stopping/Disconnecting tez sessions.."); + hiveServer2.stopOrDisconnectTezSessions(); + } + } + + // TODO: this is handled in a separate thread in case of HSI-HA, pass threadlocal SessionState? + private void startOrReconnectTezSessions() { + LOG.info("Starting/Reconnecting tez sessions.."); + // TODO: add tez session reconnect after TEZ-3875 + WMFullResourcePlan resourcePlan = null; + if (!StringUtils.isEmpty(wmQueue)) { + try { + resourcePlan = sessionHive.getActiveResourcePlan(); + } catch (HiveException e) { + throw new RuntimeException(e); + } + + if (resourcePlan == null) { + if (!HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_IN_TEST)) { + LOG.error("Cannot activate workload management - no active resource plan"); + } else { + LOG.info("Creating a default resource plan for test"); + resourcePlan = createTestResourcePlan(); + } + } + } + initAndStartTezSessionPoolManager(resourcePlan); + initAndStartWorkloadManager(resourcePlan); + } + + private void initAndStartTezSessionPoolManager(final WMFullResourcePlan resourcePlan) { + // starting Tez session pool in start here to let parent session state initialize on CliService state, to avoid + // SessionState.get() return null during createTezDir + try { + // will be invoked anyway in TezTask. Doing it early to initialize triggers for non-pool tez session. + LOG.info("Initializing tez session pool manager"); + tezSessionPoolManager = TezSessionPoolManager.getInstance(); + if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) { + tezSessionPoolManager.setupPool(hiveConf); + } + tezSessionPoolManager.startPool(hiveConf, resourcePlan); + LOG.info("Tez session pool manager initialized."); + } catch (Exception e) { + throw new ServiceException("Unable to setup tez session pool", e); + } + } + + private void initAndStartWorkloadManager(final WMFullResourcePlan resourcePlan) { + if (!StringUtils.isEmpty(wmQueue)) { + if (resourcePlan != null) { + // Initialize workload management. + LOG.info("Initializing workload management"); + try { + wm = WorkloadManager.create(wmQueue, hiveConf, resourcePlan); + wm.start(); + LOG.info("Workload manager initialized."); + } catch (Exception e) { + throw new ServiceException("Unable to instantiate and start Workload Manager", e); + } + } + } + } + + private void stopOrDisconnectTezSessions() { + LOG.info("Stoppping/Disconnecting tez sessions."); + // There should already be an instance of the session pool manager. + // If not, ignoring is fine while stopping HiveServer2. if (tezSessionPoolManager != null) { try { - tezSessionPoolManager.startPool(); - LOG.info("Started tez session pool manager.."); + tezSessionPoolManager.stop(); + LOG.info("Stopped tez session pool manager."); } catch (Exception e) { - LOG.error("Error starting tez session pool manager: ", e); - throw new ServiceException(e); + LOG.error("Tez session pool manager stop had an error during stop of HiveServer2. " + + "Shutting down HiveServer2 anyway.", e); } } if (wm != null) { try { - wm.start(); - LOG.info("Started workload manager.."); + wm.stop(); + LOG.info("Stopped workload manager."); } catch (Exception e) { - LOG.error("Error starting workload manager", e); - throw new ServiceException(e); + LOG.error("Workload manager stop had an error during stop of HiveServer2. " + + "Shutting down HiveServer2 anyway.", e); } } } @@ -600,6 +694,12 @@ public synchronized void stop() { LOG.info("Shutting down HiveServer2"); HiveConf hiveConf = this.getHiveConf(); super.stop(); + if (hs2HARegistry != null) { + hs2HARegistry.unregisterLeaderLatchListener(leaderLatchListener); + shutdownExecutor(leaderActionsExecutorService); + hs2HARegistry.stop(); + LOG.info("HS2 HA registry stopped"); + } if (webServer != null) { try { webServer.stop(); @@ -618,32 +718,15 @@ public synchronized void stop() { } } // Remove this server instance from ZooKeeper if dynamic service discovery is set - if (hiveConf != null && hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) { + if (serviceDiscovery && !activePassiveHA) { try { removeServerInstanceFromZooKeeper(); } catch (Exception e) { LOG.error("Error removing znode for this HiveServer2 instance from ZooKeeper.", e); } } - // There should already be an instance of the session pool manager. - // If not, ignoring is fine while stopping HiveServer2. - if (hiveConf != null && hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS) && - tezSessionPoolManager != null) { - try { - tezSessionPoolManager.stop(); - } catch (Exception e) { - LOG.error("Tez session pool manager stop had an error during stop of HiveServer2. " - + "Shutting down HiveServer2 anyway.", e); - } - } - if (wm != null) { - try { - wm.stop(); - } catch (Exception e) { - LOG.error("Workload manager stop had an error during stop of HiveServer2. " - + "Shutting down HiveServer2 anyway.", e); - } - } + + stopOrDisconnectTezSessions(); if (hiveConf != null && hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { try { @@ -654,6 +737,22 @@ public synchronized void stop() { } } + private void shutdownExecutor(final ExecutorService leaderActionsExecutorService) { + leaderActionsExecutorService.shutdown(); + try { + if (!leaderActionsExecutorService.awaitTermination(SHUTDOWN_TIME, TimeUnit.SECONDS)) { + LOG.warn("Executor service did not terminate in the specified time {} sec", SHUTDOWN_TIME); + List droppedTasks = leaderActionsExecutorService.shutdownNow(); + LOG.warn("Executor service was abruptly shut down. " + droppedTasks.size() + " tasks will not be executed."); + } + } catch (InterruptedException e) { + LOG.warn("Executor service did not terminate in the specified time {} sec. Exception: {}", SHUTDOWN_TIME, + e.getMessage()); + List droppedTasks = leaderActionsExecutorService.shutdownNow(); + LOG.warn("Executor service was abruptly shut down. " + droppedTasks.size() + " tasks will not be executed."); + } + } + @VisibleForTesting public static void scheduleClearDanglingScratchDir(HiveConf hiveConf, int initialWaitInSec) { if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_CLEAR_DANGLING_SCRATCH_DIR)) { diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2HAInstanceSet.java b/service/src/java/org/apache/hive/service/server/HiveServer2HAInstanceSet.java new file mode 100644 index 0000000..b31d63c --- /dev/null +++ b/service/src/java/org/apache/hive/service/server/HiveServer2HAInstanceSet.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.server; + +import org.apache.hadoop.hive.registry.ServiceInstanceSet; + +public interface HiveServer2HAInstanceSet extends ServiceInstanceSet { + + /** + * In Active/Passive setup, returns current active leader. + * + * @return leader instance + */ + HiveServer2Instance getLeader(); +} diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2HARegistry.java b/service/src/java/org/apache/hive/service/server/HiveServer2HARegistry.java new file mode 100644 index 0000000..f8adc10 --- /dev/null +++ b/service/src/java/org/apache/hive/service/server/HiveServer2HARegistry.java @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.server; + +import static org.apache.hive.service.server.HiveServer2.INSTANCE_URI_CONFIG; + +import java.io.IOException; +import java.net.URI; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; + +import org.apache.commons.lang3.StringUtils; +import org.apache.curator.framework.recipes.leader.LeaderLatch; +import org.apache.curator.framework.recipes.leader.LeaderLatchListener; +import org.apache.curator.utils.CloseableUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.registry.ServiceRegistry; +import org.apache.hadoop.hive.registry.ServiceInstanceSet; +import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; +import org.apache.hadoop.hive.registry.impl.ZkRegistryBase; +import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; +import org.apache.hadoop.registry.client.binding.RegistryUtils; +import org.apache.hadoop.registry.client.types.Endpoint; +import org.apache.hadoop.registry.client.types.ProtocolTypes; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hive.service.ServiceException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +public class HiveServer2HARegistry extends ZkRegistryBase implements LeaderLatchListener, + ServiceRegistry, HiveServer2HAInstanceSet { + private static final Logger LOG = LoggerFactory.getLogger(HiveServer2HARegistry.class); + private static final String HTTP_ENDPOINT_NAME = "hs2-interactive-ha"; + private static final String SASL_LOGIN_CONTEXT_NAME = "HS2InteractiveHAZooKeeperClient"; + private static final String INSTANCE_PREFIX = "instance-"; + private static final String INSTANCE_GROUP = "instances"; + private static final String LEADER_LATCH_PATH = "/_LEADER"; + private boolean isLeader; + private LeaderLatch leaderLatch; + private String leaderLatchPath; + private ServiceRecord srv; + + // There are 2 paths under which the instances get registered + // 1) Standard path used by ZkRegistryBase where all instances register themselves (also stores metadata) + // Secure: /hs2-interactive-ha-sasl/instances/instance-0000000000 + // Unsecure: /hs2-interactive-ha-unsecure/instances/instance-0000000000 + // 2) Leader latch path used for HS2 HA Active/Passive configuration where all instances register under _LEADER + // path but only one among them is the leader + // Secure: /hs2-interactive-ha-sasl/_LEADER/xxxx-latch-0000000000 + // Unsecure: /hs2-interactive-ha-unsecure/_LEADER/xxxx-latch-0000000000 + static HiveServer2HARegistry create(Configuration conf) { + String zkNameSpace = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_INTERACTIVE_HA_REGISTRY_NAMESPACE); + Preconditions.checkArgument(!StringUtils.isBlank(zkNameSpace), + HiveConf.ConfVars.HIVE_SERVER2_INTERACTIVE_HA_REGISTRY_NAMESPACE.varname + " cannot be null or empty"); + String principal = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL); + String keytab = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB); + String zkNameSpacePrefix = zkNameSpace + "-"; + // FIXME: configure session timeout + return new HiveServer2HARegistry(null, zkNameSpacePrefix, LEADER_LATCH_PATH, principal, keytab, + SASL_LOGIN_CONTEXT_NAME, conf); + } + + private HiveServer2HARegistry(final String instanceName, final String zkNamespacePrefix, final String leaderLatchPath, + final String krbPrincipal, final String krbKeytab, final String saslContextName, final Configuration conf) { + super(instanceName, conf, null, zkNamespacePrefix, null, INSTANCE_PREFIX, INSTANCE_GROUP, + saslContextName, krbPrincipal, krbKeytab, null); + this.leaderLatchPath = leaderLatchPath; + leaderLatch = new LeaderLatch(zooKeeperClient, leaderLatchPath, UNIQUE_ID.toString()); + } + + public void init() { + final Map confsToPublish = getConfsToPublish(); + this.srv = new ServiceRecord(); + for (Map.Entry entry : confsToPublish.entrySet()) { + srv.set(entry.getKey(), entry.getValue()); + } + } + + @Override + public void start() throws IOException { + super.start(); + leaderLatch.addListener(this); + try { + // all participating instances uses the same latch path, and curator randomly chooses one instance to be leader + // which can be verified via leaderLatch.hasLeadership() + leaderLatch.start(); + } catch (Exception e) { + throw new IOException(e); + } + register(); + LOG.info("Registered HS2 with ZK. service record: {}", srv); + } + + @Override + public String register() throws IOException { + return registerServiceRecord(srv); + } + + @Override + public void unregister() throws IOException { + CloseableUtils.closeQuietly(leaderLatch); + unregisterInternal(); + } + + @Override + public ServiceInstanceSet getInstances(final String component, final long clusterReadyTimeoutMs) + throws IOException { + throw new UnsupportedOperationException("Not supported to get instances by component name"); + } + + private void addEndpointToServiceRecord() throws IOException { + final String instanceUri = srv.get(INSTANCE_URI_CONFIG); + final URI uri = URI.create(instanceUri); + Endpoint urlEndpoint = RegistryTypeUtils.urlEndpoint(HTTP_ENDPOINT_NAME, ProtocolTypes.PROTOCOL_THRIFT, uri); + srv.addInternalEndpoint(urlEndpoint); + updateServiceRecord(srv); + LOG.info("Added {} endpoint to service record", urlEndpoint); + } + + private void removeEndpointFromServiceRecord() throws IOException { + // there is no way to clear endpoint from service record, so creating a new one without endpoint + ServiceRecord serviceRecord = new ServiceRecord(); + for (Map.Entry entry : srv.attributes().entrySet()) { + serviceRecord.set(entry.getKey(), entry.getValue()); + } + srv = serviceRecord; + updateServiceRecord(srv); + LOG.info("Removed endpoint from service record"); + } + + @Override + public void stop() { + CloseableUtils.closeQuietly(leaderLatch); + super.stop(); + } + + @Override + protected HiveServer2Instance createServiceInstance(final ServiceRecord srv) throws IOException { + return new HiveServer2Instance(srv, HTTP_ENDPOINT_NAME); + } + + @Override + public synchronized void registerStateChangeListener( + final ServiceInstanceStateChangeListener listener) + throws IOException { + super.registerStateChangeListener(listener); + } + + @Override + public ApplicationId getApplicationId() throws IOException { + throw new UnsupportedOperationException("Not supported until HS2 runs as YARN application"); + } + + @Override + protected String getZkPathUser(final Configuration conf) { + return RegistryUtils.currentUser(); + } + + private boolean hasLeadership() { + return leaderLatch.hasLeadership(); + } + + // Do only lightweight actions in main-event handler thread. Time consuming operations are handled via separate + // executor service registered via registerLeaderLatchListener() + @Override + public void isLeader() { + // only leader publishes instance uri as endpoint which will be used by clients to make connections to HS2 via + // service discovery. + if (hasLeadership()) { + if (isLeader) { + LOG.info("Already handled the leadership event, ignoring.."); + return; + } + try { + addEndpointToServiceRecord(); + isLeader = true; + } catch (IOException e) { + throw new ServiceException("Leader unable to add endpoint to service record", e); + } + LOG.info("HS2 instance became the LEADER. Service record: {}", srv); + } + } + + @Override + public void notLeader() { + if (!hasLeadership()) { + if (!isLeader) { + LOG.info("Already handled the lost leadership event, ignoring.."); + return; + } + try { + removeEndpointFromServiceRecord(); + isLeader = false; + } catch (IOException e) { + throw new ServiceException("Unable to remove endpoint from service record", e); + } + LOG.info("HS2 instance LOST LEADERSHIP. Service record: {}", srv); + } + } + + @Override + public HiveServer2Instance getLeader() { + for (HiveServer2Instance hs2Instance : getAll()) { + if (hs2Instance.isLeader()) { + return hs2Instance; + } + } + return null; + } + + @Override + public Collection getAll() { + return getAllInternal(); + } + + @Override + public HiveServer2Instance getInstance(final String instanceId) { + for (HiveServer2Instance hs2Instance : getAll()) { + if (hs2Instance.getInstanceId().equals(instanceId)) { + return hs2Instance; + } + } + return null; + } + + @Override + public Set getByHost(final String host) { + return getByHostInternal(host); + } + + @Override + public int size() { + return sizeInternal(); + } + + void registerLeaderLatchListener(final LeaderLatchListener latchListener, final ExecutorService executorService) { + leaderLatch.addListener(latchListener, executorService); + } + + void unregisterLeaderLatchListener(final LeaderLatchListener latchListener) { + leaderLatch.removeListener(latchListener); + } + + private Map getConfsToPublish() { + final Map confsToPublish = new HashMap<>(); + // Hostname + confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname, + conf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname)); + // Hostname:port + confsToPublish.put(INSTANCE_URI_CONFIG, conf.get(INSTANCE_URI_CONFIG)); + // Transport mode + confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, + conf.get(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname)); + // Transport specific confs + if (HiveServer2.isHTTPTransportMode(conf)) { + confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT.varname, + conf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT.varname)); + confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH.varname, + conf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH.varname)); + } else { + confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, + conf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT.varname)); + confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_SASL_QOP.varname, + conf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_SASL_QOP.varname)); + } + // Auth specific confs + confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION.varname, + conf.get(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION.varname)); + if (HiveServer2.isKerberosAuthMode(conf)) { + confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL.varname, + conf.get(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL.varname)); + } + // SSL conf + confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_USE_SSL.varname, + conf.get(HiveConf.ConfVars.HIVE_SERVER2_USE_SSL.varname)); + return confsToPublish; + } +} diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2HARegistryClient.java b/service/src/java/org/apache/hive/service/server/HiveServer2HARegistryClient.java new file mode 100644 index 0000000..eaa5fd2 --- /dev/null +++ b/service/src/java/org/apache/hive/service/server/HiveServer2HARegistryClient.java @@ -0,0 +1,50 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package org.apache.hive.service.server; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.registry.impl.ZkRegistryBase; + +import com.google.common.base.Preconditions; + +public class HiveServer2HARegistryClient { + private static final Map hs2Registries = new HashMap<>(); + + /** + * Helper method to get a HiveServer2HARegistry instance to read from the registry. Only used by clients (JDBC), + * service discovery to connect to active HS2 instance in Active/Passive HA configuration. + * + * @param conf {@link Configuration} instance which contains service registry information. + * @return HiveServer2HARegistry + */ + public static synchronized HiveServer2HARegistry getClient(Configuration conf) throws IOException { + String namespace = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_INTERACTIVE_HA_REGISTRY_NAMESPACE); + Preconditions.checkArgument(!StringUtils.isBlank(namespace), + HiveConf.ConfVars.HIVE_SERVER2_INTERACTIVE_HA_REGISTRY_NAMESPACE.varname + " cannot be null or empty"); + String nsKey = ZkRegistryBase.getRootNamespace(null, namespace + "-"); + HiveServer2HARegistry registry = hs2Registries.get(nsKey); + if (registry == null) { + registry = HiveServer2HARegistry.create(conf); + registry.init(); + registry.start(); + hs2Registries.put(namespace, registry); + } + return registry; + } +} diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2Instance.java b/service/src/java/org/apache/hive/service/server/HiveServer2Instance.java new file mode 100644 index 0000000..ec3b53b --- /dev/null +++ b/service/src/java/org/apache/hive/service/server/HiveServer2Instance.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.server; + +import java.io.IOException; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.registry.impl.ServiceInstanceBase; +import org.apache.hadoop.hive.registry.impl.ZkRegistryBase; +import org.apache.hadoop.registry.client.types.Endpoint; +import org.apache.hadoop.registry.client.types.ServiceRecord; + +public class HiveServer2Instance extends ServiceInstanceBase { + private String instanceId; + private boolean isLeader; + private String hostname; + private int port; + private String transportMode; + private String httpEndpoint; + + public HiveServer2Instance(final ServiceRecord srv, final String endpointName) throws IOException { + super(srv, endpointName); + + Endpoint endpoint = srv.getInternalEndpoint(endpointName); + if (endpoint != null) { + isLeader = true; + } + this.instanceId = srv.get(ZkRegistryBase.UNIQUE_IDENTIFIER); + final String instanceUri = srv.get(HiveServer2.INSTANCE_URI_CONFIG); + final String[] tokens = instanceUri.split(":"); + this.hostname = tokens[0]; + this.port = Integer.parseInt(tokens[1]); + this.transportMode = srv.get(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname); + if (transportMode.equalsIgnoreCase("http")) { + this.httpEndpoint = srv.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH.varname); + } + // TODO: see if other srv records are required to be handled + } + + public boolean isLeader() { + return isLeader; + } + + public String getHostname() { + return hostname; + } + + public int getPort() { + return port; + } + + public String getTransportMode() { + return transportMode; + } + + public String getHttpEndpoint() { + return httpEndpoint; + } + + public String getInstanceId() { + return instanceId; + } + + @Override + public String toString() { + String result = "instanceId: " + instanceId + " isLeader: " + isLeader + " host: " + hostname + " port: " + port + + " transportMode: " + transportMode; + if (httpEndpoint != null) { + result += " httpEndpoint: " + httpEndpoint; + } + return result; + } +}