diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 74a8749..99e4de8 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1481,7 +1481,8 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { "org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager, \n" + "2. When HiveServer2 supports service discovery via Zookeeper.\n" + "3. For delegation token storage if zookeeper store is used, if\n" + - "hive.cluster.delegation.token.store.zookeeper.connectString is not set"), + "hive.cluster.delegation.token.store.zookeeper.connectString is not set\n" + + "4. LLAP daemon registry service"), HIVE_ZOOKEEPER_CLIENT_PORT("hive.zookeeper.client.port", "2181", "The port of ZooKeeper servers to talk to.\n" + diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java index be811eb..73f94f3 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java @@ -55,11 +55,4 @@ */ public Set getByHost(String host); - /** - * Refresh the instance set from registry backing store. - * - * @throws IOException - */ - public void refresh() throws IOException; - } \ No newline at end of file diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceStateChangeListener.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceStateChangeListener.java new file mode 100644 index 0000000..92eb8bd --- /dev/null +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceStateChangeListener.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.registry; + +/** + * Callback listener for instance state change events + */ +public interface ServiceInstanceStateChangeListener { + /** + * Called when new {@link ServiceInstance} is created. + * + * @param serviceInstance - created service instance + */ + void onCreate(ServiceInstance serviceInstance); + + /** + * Called when an existing {@link ServiceInstance} is updated. + * + * @param serviceInstance - updated service instance + */ + void onUpdate(ServiceInstance serviceInstance); + + /** + * Called when an existing {@link ServiceInstance} is removed. + * + * @param serviceInstance - removed service instance + */ + void onRemove(ServiceInstance serviceInstance); +} diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java index d3fb517..7ca7ec8 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java @@ -22,38 +22,45 @@ /** * Start the service registry - * - * @throws InterruptedException + * + * @throws IOException */ - public void start() throws InterruptedException; + public void start() throws IOException; /** * Stop the service registry - * - * @throws InterruptedException + * + * @throws IOException */ - public void stop() throws InterruptedException; + public void stop() throws IOException; /** * Register the current instance - the implementation takes care of the endpoints to register. - * + * * @throws IOException */ public void register() throws IOException; /** * Remove the current registration cleanly (implementation defined cleanup) - * + * * @throws IOException */ public void unregister() throws IOException; /** * Client API to get the list of instances registered via the current registry key. - * + * * @param component * @return * @throws IOException */ public ServiceInstanceSet getInstances(String component) throws IOException; + + /** + * Adds state change listeners for service instances. + * + * @param listener - state change listener + */ + public void setStateChangeListener(ServiceInstanceStateChangeListener listener); } diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java index 92044bb..b4f697d 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.registry.ServiceInstance; import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; +import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener; import org.apache.hadoop.hive.llap.registry.ServiceRegistry; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.StringUtils; @@ -80,12 +81,12 @@ public LlapFixedRegistryImpl(String hosts, Configuration conf) { } @Override - public void start() throws InterruptedException { + public void start() throws IOException { // nothing to start } @Override - public void stop() throws InterruptedException { + public void stop() throws IOException { // nothing to stop } @@ -126,7 +127,6 @@ public FixedServiceInstance(String host) { this.host = host; } - @Override public String getWorkerIdentity() { return LlapFixedRegistryImpl.getWorkerIdentity(host); } @@ -223,12 +223,6 @@ public ServiceInstance getInstance(String name) { } return byHost; } - - @Override - public void refresh() throws IOException { - // I will do no such thing - } - } @Override @@ -237,6 +231,11 @@ public ServiceInstanceSet getInstances(String component) throws IOException { } @Override + public void setStateChangeListener(final ServiceInstanceStateChangeListener listener) { + // nothing to set + } + + @Override public String toString() { return String.format("FixedRegistry hosts=%s", StringUtils.join(",", this.hosts)); } diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java index 907faed..c50e4fe 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; +import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener; import org.apache.hadoop.hive.llap.registry.ServiceRegistry; import org.apache.hadoop.service.AbstractService; import org.slf4j.Logger; @@ -77,7 +78,7 @@ public static synchronized LlapRegistryService getClient(Configuration conf) { public void serviceInit(Configuration conf) { String hosts = HiveConf.getTrimmedVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS); if (hosts.startsWith("@")) { - registry = new LlapYarnRegistryImpl(hosts.substring(1), conf, isDaemon); + registry = new LlapZookeeperRegistryImpl(hosts.substring(1), conf); } else { registry = new LlapFixedRegistryImpl(hosts, conf); } @@ -122,4 +123,8 @@ private void unregisterWorker() throws IOException { public ServiceInstanceSet getInstances() throws IOException { return this.registry.getInstances("LLAP"); } + + public void setServiceInstanceStateChangeListener(ServiceInstanceStateChangeListener listener) { + this.registry.setStateChangeListener(listener); + } } diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java deleted file mode 100644 index efe31cc..0000000 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java +++ /dev/null @@ -1,439 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.llap.registry.impl; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.MalformedURLException; -import java.net.URISyntaxException; -import java.net.URL; -import java.net.UnknownHostException; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; -import org.apache.hadoop.hive.llap.registry.ServiceInstance; -import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; -import org.apache.hadoop.hive.llap.registry.ServiceRegistry; -import org.apache.hadoop.registry.client.api.RegistryOperationsFactory; -import org.apache.hadoop.registry.client.binding.RegistryPathUtils; -import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; -import org.apache.hadoop.registry.client.binding.RegistryUtils; -import org.apache.hadoop.registry.client.binding.RegistryUtils.ServiceRecordMarshal; -import org.apache.hadoop.registry.client.impl.zk.RegistryOperationsService; -import org.apache.hadoop.registry.client.types.AddressTypes; -import org.apache.hadoop.registry.client.types.Endpoint; -import org.apache.hadoop.registry.client.types.ProtocolTypes; -import org.apache.hadoop.registry.client.types.ServiceRecord; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.zookeeper.CreateMode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; - -public class LlapYarnRegistryImpl implements ServiceRegistry { - - /** IPC endpoint names. */ - private static final String IPC_SERVICES = "services", - IPC_MNG = "llapmng", IPC_SHUFFLE = "shuffle", IPC_LLAP = "llap"; - - private static final Logger LOG = LoggerFactory.getLogger(LlapYarnRegistryImpl.class); - - private final RegistryOperationsService client; - private final Configuration conf; - private final ServiceRecordMarshal encoder; - private final String path; - - private final DynamicServiceInstanceSet instances = new DynamicServiceInstanceSet(); - - private static final UUID uniq = UUID.randomUUID(); - private static final String hostname; - - private static final String UNIQUE_IDENTIFIER = "llap.unique.id"; - - private final static String SERVICE_CLASS = "org-apache-hive"; - - final ScheduledExecutorService refresher = Executors.newScheduledThreadPool(1, - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapYarnRegistryRefresher").build()); - final long refreshDelay; - private final boolean isDaemon; - - static { - String localhost = "localhost"; - try { - localhost = InetAddress.getLocalHost().getCanonicalHostName(); - } catch (UnknownHostException uhe) { - // ignore - } - hostname = localhost; - } - - public LlapYarnRegistryImpl(String instanceName, Configuration conf, boolean isDaemon) { - - LOG.info("Llap Registry is enabled with registryid: " + instanceName); - this.conf = new Configuration(conf); - conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); - // registry reference - client = (RegistryOperationsService) RegistryOperationsFactory.createInstance(conf); - encoder = new RegistryUtils.ServiceRecordMarshal(); - this.path = RegistryPathUtils.join(RegistryUtils.componentPath(RegistryUtils.currentUser(), - SERVICE_CLASS, instanceName, "workers"), "worker-"); - refreshDelay = HiveConf.getTimeVar( - conf, ConfVars.LLAP_DAEMON_SERVICE_REFRESH_INTERVAL, TimeUnit.SECONDS); - this.isDaemon = isDaemon; - Preconditions.checkArgument(refreshDelay > 0, - "Refresh delay for registry has to be positive = %d", refreshDelay); - } - - public Endpoint getRpcEndpoint() { - final int rpcPort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_RPC_PORT); - return RegistryTypeUtils.ipcEndpoint(IPC_LLAP, new InetSocketAddress(hostname, rpcPort)); - } - - public Endpoint getShuffleEndpoint() { - final int shufflePort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_YARN_SHUFFLE_PORT); - // HTTP today, but might not be - return RegistryTypeUtils.inetAddrEndpoint(IPC_SHUFFLE, ProtocolTypes.PROTOCOL_TCP, hostname, - shufflePort); - } - - public Endpoint getServicesEndpoint() { - final int servicePort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_WEB_PORT); - final boolean isSSL = HiveConf.getBoolVar(conf, ConfVars.LLAP_DAEMON_WEB_SSL); - final String scheme = isSSL ? "https" : "http"; - final URL serviceURL; - try { - serviceURL = new URL(scheme, hostname, servicePort, ""); - return RegistryTypeUtils.webEndpoint(IPC_SERVICES, serviceURL.toURI()); - } catch (MalformedURLException e) { - throw new RuntimeException(e); - } catch (URISyntaxException e) { - throw new RuntimeException("llap service URI for " + hostname + " is invalid", e); - } - } - - public Endpoint getMngEndpoint() { - return RegistryTypeUtils.ipcEndpoint(IPC_MNG, new InetSocketAddress(hostname, - HiveConf.getIntVar(conf, ConfVars.LLAP_MANAGEMENT_RPC_PORT))); - } - - private final String getPath() { - return this.path; - } - - @Override - public void register() throws IOException { - String path = getPath(); - ServiceRecord srv = new ServiceRecord(); - srv.addInternalEndpoint(getRpcEndpoint()); - srv.addInternalEndpoint(getMngEndpoint()); - srv.addInternalEndpoint(getShuffleEndpoint()); - srv.addExternalEndpoint(getServicesEndpoint()); - - for (Map.Entry kv : this.conf) { - if (kv.getKey().startsWith(LlapConfiguration.LLAP_DAEMON_PREFIX) - || kv.getKey().startsWith("hive.llap.")) { - // TODO: read this somewhere useful, like the task scheduler - srv.set(kv.getKey(), kv.getValue()); - } - } - - // restart sensitive instance id - srv.set(UNIQUE_IDENTIFIER, uniq.toString()); - - client.mknode(RegistryPathUtils.parentOf(path), true); - - // FIXME: YARN registry needs to expose Ephemeral_Seq nodes & return the paths - client.zkCreate(path, CreateMode.EPHEMERAL_SEQUENTIAL, encoder.toBytes(srv), - client.getClientAcls()); - } - - @Override - public void unregister() throws IOException { - // Nothing for the zkCreate models - } - - private class DynamicServiceInstance implements ServiceInstance { - - private final ServiceRecord srv; - private boolean alive = true; - private final String host; - private final int rpcPort; - private final int mngPort; - private final int shufflePort; - - public DynamicServiceInstance(ServiceRecord srv) throws IOException { - this.srv = srv; - - final Endpoint shuffle = srv.getInternalEndpoint(IPC_SHUFFLE); - final Endpoint rpc = srv.getInternalEndpoint(IPC_LLAP); - final Endpoint mng = srv.getInternalEndpoint(IPC_MNG); - - this.host = - RegistryTypeUtils.getAddressField(rpc.addresses.get(0), - AddressTypes.ADDRESS_HOSTNAME_FIELD); - this.rpcPort = - Integer.valueOf(RegistryTypeUtils.getAddressField(rpc.addresses.get(0), - AddressTypes.ADDRESS_PORT_FIELD)); - this.mngPort = - Integer.valueOf(RegistryTypeUtils.getAddressField(mng.addresses.get(0), - AddressTypes.ADDRESS_PORT_FIELD)); - this.shufflePort = - Integer.valueOf(RegistryTypeUtils.getAddressField(shuffle.addresses.get(0), - AddressTypes.ADDRESS_PORT_FIELD)); - } - - @Override - public String getWorkerIdentity() { - return srv.get(UNIQUE_IDENTIFIER); - } - - @Override - public String getHost() { - return host; - } - - @Override - public int getRpcPort() { - return rpcPort; - } - - @Override - public int getShufflePort() { - return shufflePort; - } - - @Override - public boolean isAlive() { - return alive ; - } - - public void kill() { - // May be possible to generate a notification back to the scheduler from here. - LOG.info("Killing service instance: " + this); - this.alive = false; - } - - @Override - public Map getProperties() { - return srv.attributes(); - } - - @Override - public Resource getResource() { - int memory = Integer.valueOf(srv.get(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname)); - int vCores = Integer.valueOf(srv.get(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname)); - return Resource.newInstance(memory, vCores); - } - - @Override - public String toString() { - return "DynamicServiceInstance [alive=" + alive + ", host=" + host + ":" + rpcPort + " with resources=" + getResource() +"]"; - } - - @Override - public int getManagementPort() { - return mngPort; - } - - // Relying on the identity hashCode and equality, since refreshing instances retains the old copy - // of an already known instance. - } - - private class DynamicServiceInstanceSet implements ServiceInstanceSet { - - // LinkedHashMap to retain iteration order. - private final Map instances = new LinkedHashMap<>(); - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); - private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock(); - - @Override - public Map getAll() { - // Return a copy. Instances may be modified during a refresh. - readLock.lock(); - try { - return new LinkedHashMap<>(instances); - } finally { - readLock.unlock(); - } - } - - @Override - public List getAllInstancesOrdered() { - List list = new LinkedList<>(); - readLock.lock(); - try { - list.addAll(instances.values()); - } finally { - readLock.unlock(); - } - Collections.sort(list, new Comparator() { - @Override - public int compare(ServiceInstance o1, ServiceInstance o2) { - return o2.getWorkerIdentity().compareTo(o2.getWorkerIdentity()); - } - }); - return list; - } - - @Override - public ServiceInstance getInstance(String name) { - readLock.lock(); - try { - return instances.get(name); - } finally { - readLock.unlock(); - } - } - - @Override - public void refresh() throws IOException { - /* call this from wherever */ - Map freshInstances = new HashMap(); - - String path = getPath(); - Map records = - RegistryUtils.listServiceRecords(client, RegistryPathUtils.parentOf(path)); - // Synchronize after reading the service records from the external service (ZK) - writeLock.lock(); - try { - Set latestKeys = new HashSet(); - LOG.info("Starting to refresh ServiceInstanceSet " + System.identityHashCode(this)); - for (ServiceRecord rec : records.values()) { - ServiceInstance instance = new DynamicServiceInstance(rec); - if (instance != null) { - if (instances != null && instances.containsKey(instance.getWorkerIdentity()) == false) { - // add a new object - freshInstances.put(instance.getWorkerIdentity(), instance); - if (LOG.isInfoEnabled()) { - LOG.info("Adding new worker " + instance.getWorkerIdentity() + " which mapped to " - + instance); - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Retaining running worker " + instance.getWorkerIdentity() + - " which mapped to " + instance); - } - } - } - latestKeys.add(instance.getWorkerIdentity()); - } - - if (instances != null) { - // deep-copy before modifying - Set oldKeys = new HashSet<>(instances.keySet()); - if (oldKeys.removeAll(latestKeys)) { - // This is all the records which have not checked in, and are effectively dead. - for (String k : oldKeys) { - // this is so that people can hold onto ServiceInstance references as placeholders for tasks - final DynamicServiceInstance dead = (DynamicServiceInstance) instances.get(k); - dead.kill(); - if (LOG.isInfoEnabled()) { - LOG.info("Deleting dead worker " + k + " which mapped to " + dead); - } - } - } - // oldKeys contains the set of dead instances at this point. - this.instances.keySet().removeAll(oldKeys); - this.instances.putAll(freshInstances); - } else { - this.instances.putAll(freshInstances); - } - } finally { - writeLock.unlock(); - } - } - - @Override - public Set getByHost(String host) { - // TODO Maybe store this as a map which is populated during construction, to avoid walking - // the map on each request. - readLock.lock(); - Set byHost = new HashSet(); - try { - for (ServiceInstance i : instances.values()) { - if (host.equals(i.getHost())) { - // all hosts in instances should be alive in this impl - byHost.add(i); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Locality comparing " + host + " to " + i.getHost()); - } - } - if (LOG.isDebugEnabled()) { - LOG.debug("Returning " + byHost.size() + " hosts for locality allocation on " + host); - } - return byHost; - } finally { - readLock.unlock(); - } - } - } - - @Override - public ServiceInstanceSet getInstances(String component) throws IOException { - Preconditions.checkArgument("LLAP".equals(component)); // right now there is only 1 component - if (this.client != null) { - instances.refresh(); - return instances; - } else { - Preconditions.checkNotNull(this.client, "Yarn registry client is not intialized"); - return null; - } - } - - @Override - public void start() { - if (client == null) return; - client.start(); - if (isDaemon) return; - refresher.scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - try { - instances.refresh(); - } catch (IOException ioe) { - LOG.warn("Could not refresh hosts during scheduled refresh", ioe); - } - } - }, 0, refreshDelay, TimeUnit.SECONDS); - } - - @Override - public void stop() { - if (client != null) { - client.stop(); - } - } -} diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java new file mode 100644 index 0000000..c69f069 --- /dev/null +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java @@ -0,0 +1,674 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.registry.impl; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import javax.security.auth.login.AppConfigurationEntry; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.ACLProvider; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.utils.CloseableUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; +import org.apache.hadoop.hive.llap.registry.ServiceInstance; +import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; +import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener; +import org.apache.hadoop.hive.llap.registry.ServiceRegistry; +import org.apache.hadoop.registry.client.binding.RegistryPathUtils; +import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; +import org.apache.hadoop.registry.client.binding.RegistryUtils; +import org.apache.hadoop.registry.client.binding.RegistryUtils.ServiceRecordMarshal; +import org.apache.hadoop.registry.client.types.AddressTypes; +import org.apache.hadoop.registry.client.types.Endpoint; +import org.apache.hadoop.registry.client.types.ProtocolTypes; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.util.KerberosUtil; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.client.ZooKeeperSaslClient; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +public class LlapZookeeperRegistryImpl implements ServiceRegistry { + + private static final Logger LOG = LoggerFactory.getLogger(LlapZookeeperRegistryImpl.class); + + /** + * IPC endpoint names. + */ + private static final String IPC_SERVICES = "services"; + private static final String IPC_MNG = "llapmng"; + private static final String IPC_SHUFFLE = "shuffle"; + private static final String IPC_LLAP = "llap"; + private final static String ROOT_NAMESPACE = "llap"; + + private final Configuration conf; + private final CuratorFramework zooKeeperClient; + private final String pathPrefix; + private PersistentEphemeralNode znode; + private String znodePath; // unique identity for this instance + private final ServiceRecordMarshal encoder; // to marshal/unmarshal znode data + + // to be used by clients of ServiceRegistry + private DynamicServiceInstanceSet instances; + private PathChildrenCache instancesCache; + + // TODO: do we need multiple listeners? + private ServiceInstanceStateChangeListener stateChangeListener; + + // get local hostname + private static final String hostname; + + static { + String localhost = "localhost"; + try { + localhost = InetAddress.getLocalHost().getCanonicalHostName(); + } catch (UnknownHostException uhe) { + // ignore + } + hostname = localhost; + } + + /** + * ACLProvider for providing appropriate ACLs to CuratorFrameworkFactory + */ + private final ACLProvider zooKeeperAclProvider = new ACLProvider() { + List nodeAcls = new ArrayList(); + + @Override + public List getDefaultAcl() { + if (UserGroupInformation.isSecurityEnabled()) { + // Read all to the world + nodeAcls.addAll(ZooDefs.Ids.READ_ACL_UNSAFE); + // Create/Delete/Write/Admin to the authenticated user + nodeAcls.add(new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.AUTH_IDS)); + } else { + // ACLs for znodes on a non-kerberized cluster + // Create/Read/Delete/Write/Admin to the world + nodeAcls.addAll(ZooDefs.Ids.OPEN_ACL_UNSAFE); + } + return nodeAcls; + } + + @Override + public List getAclForPath(String path) { + return getDefaultAcl(); + } + }; + + public LlapZookeeperRegistryImpl(String instanceName, Configuration conf) { + this.conf = new Configuration(conf); + this.conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); + String zkEnsemble = getQuorumServers(this.conf); + this.encoder = new RegistryUtils.ServiceRecordMarshal(); + int sessionTimeout = (int) HiveConf.getTimeVar(conf, ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, + TimeUnit.MILLISECONDS); + int baseSleepTime = (int) HiveConf + .getTimeVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, + TimeUnit.MILLISECONDS); + int maxRetries = HiveConf.getIntVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES); + + // Create a CuratorFramework instance to be used as the ZooKeeper client + // Use the zooKeeperAclProvider to create appropriate ACLs + this.zooKeeperClient = CuratorFrameworkFactory.builder() + .connectString(zkEnsemble) + .sessionTimeoutMs(sessionTimeout) + .aclProvider(zooKeeperAclProvider) + .namespace(ROOT_NAMESPACE) + .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)) + .build(); + + // sample path: /llap/hiveuser/hostname/workers/worker-0000000 + // worker-0000000 is the sequence number which will be retained until session timeout. If a + // worker does not respond due to communication interruptions it will retain the same sequence + // number when it returns back. If session timeout expires, the node will be deleted and new + // addition of the same node (restart) will get next sequence number + this.pathPrefix = "/" + RegistryUtils.currentUser() + "/" + instanceName + "/workers/worker-"; + this.instancesCache = null; + this.instances = null; + this.stateChangeListener = null; + LOG.info("Llap Zookeeper Registry is enabled with registryid: " + instanceName); + } + + /** + * Get the ensemble server addresses from the configuration. The format is: host1:port, + * host2:port.. + * + * @param conf + **/ + private String getQuorumServers(Configuration conf) { + String[] hosts = conf.get(ConfVars.HIVE_ZOOKEEPER_QUORUM.varname).split(","); + String port = conf.get(ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.varname, + ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.getDefaultValue()); + StringBuilder quorum = new StringBuilder(); + for (int i = 0; i < hosts.length; i++) { + quorum.append(hosts[i].trim()); + if (!hosts[i].contains(":")) { + // if the hostname doesn't contain a port, add the configured port to hostname + quorum.append(":"); + quorum.append(port); + } + + if (i != hosts.length - 1) { + quorum.append(","); + } + } + + return quorum.toString(); + } + + public Endpoint getRpcEndpoint() { + final int rpcPort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_RPC_PORT); + return RegistryTypeUtils.ipcEndpoint(IPC_LLAP, new InetSocketAddress(hostname, rpcPort)); + } + + public Endpoint getShuffleEndpoint() { + final int shufflePort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_YARN_SHUFFLE_PORT); + // HTTP today, but might not be + return RegistryTypeUtils.inetAddrEndpoint(IPC_SHUFFLE, ProtocolTypes.PROTOCOL_TCP, hostname, + shufflePort); + } + + public Endpoint getServicesEndpoint() { + final int servicePort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_WEB_PORT); + final boolean isSSL = HiveConf.getBoolVar(conf, ConfVars.LLAP_DAEMON_WEB_SSL); + final String scheme = isSSL ? "https" : "http"; + final URL serviceURL; + try { + serviceURL = new URL(scheme, hostname, servicePort, ""); + return RegistryTypeUtils.webEndpoint(IPC_SERVICES, serviceURL.toURI()); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } catch (URISyntaxException e) { + throw new RuntimeException("llap service URI for " + hostname + " is invalid", e); + } + } + + public Endpoint getMngEndpoint() { + return RegistryTypeUtils.ipcEndpoint(IPC_MNG, new InetSocketAddress(hostname, + HiveConf.getIntVar(conf, ConfVars.LLAP_MANAGEMENT_RPC_PORT))); + } + + @Override + public void register() throws IOException { + ServiceRecord srv = new ServiceRecord(); + Endpoint rpcEndpoint = getRpcEndpoint(); + srv.addInternalEndpoint(rpcEndpoint); + srv.addInternalEndpoint(getMngEndpoint()); + srv.addInternalEndpoint(getShuffleEndpoint()); + srv.addExternalEndpoint(getServicesEndpoint()); + + for (Map.Entry kv : this.conf) { + if (kv.getKey().startsWith(LlapConfiguration.LLAP_DAEMON_PREFIX) + || kv.getKey().startsWith("hive.llap.")) { + // TODO: read this somewhere useful, like the task scheduler + srv.set(kv.getKey(), kv.getValue()); + } + } + + // Create a znode under the rootNamespace parent for this instance of the server + try { + // PersistentEphemeralNode will make sure the ephemeral node created on server will be present + // even under connection or session interruption (will automatically handle retries) + znode = new PersistentEphemeralNode(zooKeeperClient, + PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, encoder.toBytes(srv)); + + // start the creation of znode + znode.start(); + + // We'll wait for 120s for node creation + long znodeCreationTimeout = 120; + if (!znode.waitForInitialCreate(znodeCreationTimeout, TimeUnit.SECONDS)) { + throw new Exception( + "Max znode creation wait time: " + znodeCreationTimeout + "s exhausted"); + } + + znodePath = znode.getActualPath(); + // Set a watch on the znode + if (zooKeeperClient.checkExists() + .usingWatcher(new NodeChangeWatcher()) + .forPath(znodePath) == null) { + // No node exists, throw exception + throw new Exception("Unable to create znode for this LLAP instance on ZooKeeper."); + } + LOG.info("Created a znode on ZooKeeper for LLAP instance: {} znodePath: {}", rpcEndpoint, + znodePath); + } catch (Exception e) { + LOG.error("Unable to create a znode for this server instance", e); + CloseableUtils.closeQuietly(znode); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Created zknode with path: {} service record: {}", znodePath, srv); + } + } + + private class NodeChangeWatcher implements Watcher { + @Override + public void process(WatchedEvent event) { + switch (event.getType()) { + case NodeCreated: + LOG.info("ZKNode created for this LLAP instance: Path - {}", event.getPath()); + break; + case NodeDataChanged: + LOG.info("ZKNode data changed for this LLAP instance: Path - {}", event.getPath()); + break; + case NodeDeleted: + LOG.info("ZKNode deleted for this LLAP instance: Path - {}", event.getPath()); + CloseableUtils.closeQuietly(znode); + break; + } + } + } + + @Override + public void unregister() throws IOException { + // Nothing for the zkCreate models + } + + private class DynamicServiceInstance implements ServiceInstance { + + private final ServiceRecord srv; + private boolean alive = true; + private final String host; + private final int rpcPort; + private final int mngPort; + private final int shufflePort; + + public DynamicServiceInstance(ServiceRecord srv) throws IOException { + this.srv = srv; + + final Endpoint shuffle = srv.getInternalEndpoint(IPC_SHUFFLE); + final Endpoint rpc = srv.getInternalEndpoint(IPC_LLAP); + final Endpoint mng = srv.getInternalEndpoint(IPC_MNG); + + this.host = + RegistryTypeUtils.getAddressField(rpc.addresses.get(0), + AddressTypes.ADDRESS_HOSTNAME_FIELD); + this.rpcPort = + Integer.valueOf(RegistryTypeUtils.getAddressField(rpc.addresses.get(0), + AddressTypes.ADDRESS_PORT_FIELD)); + this.mngPort = + Integer.valueOf(RegistryTypeUtils.getAddressField(mng.addresses.get(0), + AddressTypes.ADDRESS_PORT_FIELD)); + this.shufflePort = + Integer.valueOf(RegistryTypeUtils.getAddressField(shuffle.addresses.get(0), + AddressTypes.ADDRESS_PORT_FIELD)); + } + + @Override + public String getWorkerIdentity() { + return host + ":" + rpcPort; + } + + @Override + public String getHost() { + return host; + } + + @Override + public int getRpcPort() { + return rpcPort; + } + + @Override + public int getShufflePort() { + return shufflePort; + } + + @Override + public boolean isAlive() { + return alive; + } + + public void kill() { + // May be possible to generate a notification back to the scheduler from here. + LOG.info("Killing service instance: " + this); + this.alive = false; + } + + @Override + public Map getProperties() { + return srv.attributes(); + } + + @Override + public Resource getResource() { + int memory = Integer.valueOf(srv.get(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname)); + int vCores = Integer.valueOf(srv.get(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname)); + return Resource.newInstance(memory, vCores); + } + + @Override + public String toString() { + return "DynamicServiceInstance [alive=" + alive + ", host=" + host + ":" + rpcPort + + " with resources=" + getResource() + "]"; + } + + @Override + public int getManagementPort() { + return mngPort; + } + + // Relying on the identity hashCode and equality, since refreshing instances retains the old copy + // of an already known instance. + } + + private class DynamicServiceInstanceSet implements ServiceInstanceSet { + private final PathChildrenCache instancesCache; + + public DynamicServiceInstanceSet(final PathChildrenCache cache) { + this.instancesCache = cache; + } + + @Override + public synchronized Map getAll() { + Map instances = new LinkedHashMap<>(); + for (ChildData childData : instancesCache.getCurrentData()) { + if (childData != null) { + byte[] data = childData.getData(); + if (data != null) { + try { + ServiceRecord srv = encoder.fromBytes(childData.getPath(), data); + ServiceInstance instance = new DynamicServiceInstance(srv); + instances.put(childData.getPath(), instance); + } catch (IOException e) { + LOG.error("Unable to decode data for zkpath: {}." + + " Ignoring from current instances list..", childData.getPath()); + } + } + } + } + return instances; + } + + @Override + public List getAllInstancesOrdered() { + List list = new LinkedList<>(); + list.addAll(instances.getAll().values()); + Collections.sort(list, new Comparator() { + @Override + public int compare(ServiceInstance o1, ServiceInstance o2) { + return o2.getWorkerIdentity().compareTo(o2.getWorkerIdentity()); + } + }); + return list; + } + + @Override + public synchronized ServiceInstance getInstance(String name) { + ChildData childData = instancesCache.getCurrentData(name); + if (childData != null) { + byte[] data = childData.getData(); + if (data != null) { + try { + ServiceRecord srv = encoder.fromBytes(name, data); + ServiceInstance instance = new DynamicServiceInstance(srv); + return instance; + } catch (IOException e) { + LOG.error("Unable to decode data for zkpath: {}", name); + return null; + } + } + } + return null; + } + + @Override + public synchronized Set getByHost(String host) { + Set byHost = new HashSet<>(); + for (ChildData childData : instancesCache.getCurrentData()) { + if (childData != null) { + byte[] data = childData.getData(); + if (data != null) { + try { + ServiceRecord srv = encoder.fromBytes(childData.getPath(), data); + ServiceInstance instance = new DynamicServiceInstance(srv); + if (host.equals(instance.getHost())) { + byHost.add(instance); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Locality comparing " + host + " to " + instance.getHost()); + } + } catch (IOException e) { + LOG.error("Unable to decode data for zkpath: {}." + + " Ignoring host from current instances list..", childData.getPath()); + } + } + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Returning " + byHost.size() + " hosts for locality allocation on " + host); + } + return byHost; + } + } + + private class InstanceStateChangeListener implements PathChildrenCacheListener { + private final Logger LOG = LoggerFactory.getLogger(InstanceStateChangeListener.class); + + @Override + public void childEvent(final CuratorFramework client, final PathChildrenCacheEvent event) + throws Exception { + Preconditions.checkArgument(client != null + && client.getState() == CuratorFrameworkState.STARTED, "client is not started"); + ServiceInstance instance = null; + if (stateChangeListener != null) { + if (stateChangeListener != null) { + ChildData childData = event.getData(); + if (childData != null) { + byte[] data = childData.getData(); + if (data != null) { + try { + ServiceRecord srv = encoder.fromBytes(event.getData().getPath(), data); + instance = new DynamicServiceInstance(srv); + } catch (IOException e) { + LOG.error("Unable to decode data for zknode: {}." + + " Dropping notification of type: {}", childData.getPath(), event.getType()); + } + } + } + } + } + + if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) { + LOG.info("Added zknode {} to llap namespace. Notifying state change listener.", + event.getData().getPath()); + stateChangeListener.onCreate(instance); + } else if (event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED) { + LOG.info("Updated zknode {} in llap namespace. Notifying state change listener.", + event.getData().getPath()); + stateChangeListener.onUpdate(instance); + } else if (event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED) { + LOG.info("Removed zknode {} from llap namespace. Notifying state change listener.", + event.getData().getPath()); + stateChangeListener.onRemove(instance); + } + } + } + + @Override + public ServiceInstanceSet getInstances(String component) throws IOException { + Preconditions.checkArgument("LLAP".equals(component)); // right now there is only 1 component + // TODO: if not daemon? + startPathChildrenCache(); + return instances; + } + + @Override + public void setStateChangeListener(final ServiceInstanceStateChangeListener listener) { + this.stateChangeListener = listener; + } + + private void startPathChildrenCache() throws IOException { + Preconditions.checkArgument( + zooKeeperClient != null && zooKeeperClient.getState() == CuratorFrameworkState.STARTED, + "client is not started"); + + // lazily create PathChildrenCache + if (instancesCache == null) { + this.instancesCache = new PathChildrenCache(zooKeeperClient, + RegistryPathUtils.parentOf(pathPrefix).toString(), true); + instancesCache.getListenable().addListener(new InstanceStateChangeListener()); + try { + this.instancesCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); + } catch (Exception e) { + LOG.error("Unable to start curator PathChildrenCache. Exception: {}", e); + throw new IOException(e); + } + } + + // lazily create instances + if (instances == null) { + this.instances = new DynamicServiceInstanceSet(instancesCache); + } + + } + + @Override + public void start() throws IOException { + if (zooKeeperClient != null) { + setupZookeeperAuth(this.conf); + zooKeeperClient.start(); + } + } + + @Override + public void stop() throws IOException { + CloseableUtils.closeQuietly(znode); + CloseableUtils.closeQuietly(instancesCache); + CloseableUtils.closeQuietly(zooKeeperClient); + } + + + private void setupZookeeperAuth(final Configuration conf) throws IOException { + if (UserGroupInformation.isSecurityEnabled()) { + LOG.info("UGI security is enabled. Setting up ZK auth."); + + String llapPrincipal = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL); + if (llapPrincipal == null || llapPrincipal.isEmpty()) { + throw new IOException("Llap Kerberos principal is empty"); + } + + String llapKeytab = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE); + if (llapKeytab == null || llapKeytab.isEmpty()) { + throw new IOException("Llap Kerberos keytab is empty"); + } + + // Install the JAAS Configuration for the runtime + setZookeeperClientKerberosJaasConfig(llapPrincipal, llapKeytab); + } else { + LOG.info("UGI security is not enabled. Skipping setting up ZK auth."); + } + } + + /** + * Dynamically sets up the JAAS configuration that uses kerberos + * + * @param principal + * @param keyTabFile + * @throws IOException + */ + private void setZookeeperClientKerberosJaasConfig(String principal, String keyTabFile) + throws IOException { + // ZooKeeper property name to pick the correct JAAS conf section + final String SASL_LOGIN_CONTEXT_NAME = "LlapZooKeeperClient"; + System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, SASL_LOGIN_CONTEXT_NAME); + + principal = SecurityUtil.getServerPrincipal(principal, "0.0.0.0"); + JaasConfiguration jaasConf = new JaasConfiguration(SASL_LOGIN_CONTEXT_NAME, principal, + keyTabFile); + + // Install the Configuration in the runtime. + javax.security.auth.login.Configuration.setConfiguration(jaasConf); + } + + /** + * A JAAS configuration for ZooKeeper clients intended to use for SASL + * Kerberos. + */ + private static class JaasConfiguration extends javax.security.auth.login.Configuration { + // Current installed Configuration + private final javax.security.auth.login.Configuration baseConfig = javax.security.auth.login.Configuration + .getConfiguration(); + private final String loginContextName; + private final String principal; + private final String keyTabFile; + + public JaasConfiguration(String llapLoginContextName, String principal, String keyTabFile) { + this.loginContextName = llapLoginContextName; + this.principal = principal; + this.keyTabFile = keyTabFile; + } + + @Override + public AppConfigurationEntry[] getAppConfigurationEntry(String appName) { + if (loginContextName.equals(appName)) { + Map krbOptions = new HashMap(); + krbOptions.put("doNotPrompt", "true"); + krbOptions.put("storeKey", "true"); + krbOptions.put("useKeyTab", "true"); + krbOptions.put("principal", principal); + krbOptions.put("keyTab", keyTabFile); + krbOptions.put("refreshKrb5Config", "true"); + AppConfigurationEntry llapZooKeeperClientEntry = new AppConfigurationEntry( + KerberosUtil.getKrb5LoginModuleName(), + AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, krbOptions); + return new AppConfigurationEntry[]{llapZooKeeperClientEntry}; + } + // Try the base config + if (baseConfig != null) { + return baseConfig.getAppConfigurationEntry(appName); + } + return null; + } + } +} diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 7d7fa00..fe85f48 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -107,6 +107,14 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor "Work dirs must be specified"); Preconditions.checkArgument(shufflePort == 0 || (shufflePort > 1024 && shufflePort < 65536), "Shuffle Port must be betwee 1024 and 65535, or 0 for automatic selection"); + String hosts = HiveConf.getTrimmedVar(daemonConf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS); + if (hosts.startsWith("@")) { + String zkHosts = HiveConf.getTrimmedVar(daemonConf, ConfVars.HIVE_ZOOKEEPER_QUORUM); + LOG.info("Zookeeper Quorum: {}", zkHosts); + Preconditions.checkArgument(zkHosts != null && !zkHosts.trim().isEmpty(), + "LLAP service hosts startswith '@' but hive.zookeeper.quorum is not set." + + " hive.zookeeper.quorum must be set."); + } this.maxJvmMemory = getTotalHeapSize(); this.llapIoEnabled = ioEnabled; diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java index a00b631..4b760c3 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java @@ -146,7 +146,6 @@ public ByteString run() throws Exception { } Map daemons = activeInstances.getAll(); if (doForceRefresh || daemons == null || daemons.isEmpty()) { - activeInstances.refresh(); daemons = activeInstances.getAll(); if (daemons == null || daemons.isEmpty()) throw new RuntimeException("No LLAPs found"); } diff --git a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java index 6beb4f8..e76615e 100644 --- a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java +++ b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.registry.ServiceInstance; import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; +import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -91,7 +92,7 @@ // Tracks all instances, including ones which have been disabled in the past. // LinkedHashMap to provide the same iteration order when selecting a random host. @VisibleForTesting - final Map instanceToNodeMap = new LinkedHashMap<>(); + final Map instanceToNodeMap = new LinkedHashMap<>(); // TODO Ideally, remove elements from this once it's known that no tasks are linked to the instance (all deallocated) // Tracks tasks which could not be allocated immediately. @@ -258,6 +259,7 @@ public void onFailure(Throwable t) { } }); registry.start(); + registry.setServiceInstanceStateChangeListener(new NodeStateChangeListener()); activeInstances = registry.getInstances(); for (ServiceInstance inst : activeInstances.getAll().values()) { addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, numSchedulableTasksPerNode)); @@ -267,6 +269,30 @@ public void onFailure(Throwable t) { } } + private class NodeStateChangeListener implements ServiceInstanceStateChangeListener { + private final Logger LOG = LoggerFactory.getLogger(NodeStateChangeListener.class); + + @Override + public void onCreate(final ServiceInstance serviceInstance) { + addNode(serviceInstance, new NodeInfo(serviceInstance, nodeBlacklistConf, clock, + numSchedulableTasksPerNode)); + LOG.info("Added node with identity: {}", serviceInstance.getWorkerIdentity()); + } + + @Override + public void onUpdate(final ServiceInstance serviceInstance) { + instanceToNodeMap.put(serviceInstance.getWorkerIdentity(), new NodeInfo(serviceInstance, + nodeBlacklistConf, clock, numSchedulableTasksPerNode)); + LOG.info("Updated node with identity: {}", serviceInstance.getWorkerIdentity()); + } + + @Override + public void onRemove(final ServiceInstance serviceInstance) { + instanceToNodeMap.remove(serviceInstance.getWorkerIdentity()); + LOG.info("Removed node with identity: {}", serviceInstance.getWorkerIdentity()); + } + } + @Override public void shutdown() { writeLock.lock(); @@ -328,9 +354,9 @@ public Resource getAvailableResources() { int vcores = 0; readLock.lock(); try { - for (Entry entry : instanceToNodeMap.entrySet()) { - if (entry.getKey().isAlive() && !entry.getValue().isDisabled()) { - Resource r = entry.getKey().getResource(); + for (Entry entry : instanceToNodeMap.entrySet()) { + if (entry.getValue().getServiceInstance().isAlive() && !entry.getValue().isDisabled()) { + Resource r = entry.getValue().getServiceInstance().getResource(); memory += r.getMemory(); vcores += r.getVirtualCores(); } @@ -440,7 +466,7 @@ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEnd ServiceInstance assignedInstance = taskInfo.assignedInstance; assert assignedInstance != null; - NodeInfo nodeInfo = instanceToNodeMap.get(assignedInstance); + NodeInfo nodeInfo = instanceToNodeMap.get(assignedInstance.getWorkerIdentity()); assert nodeInfo != null; // Re-enable the node if preempted @@ -538,7 +564,6 @@ private SelectHostResult selectHost(TaskInfo request) { if (LOG.isDebugEnabled()) { LOG.debug("Refreshing instances since total memory is 0"); } - refreshInstances(); } // If there's no memory available, fail @@ -556,7 +581,7 @@ private SelectHostResult selectHost(TaskInfo request) { if (!instances.isEmpty()) { requestedHostExists = true; for (ServiceInstance inst : instances) { - NodeInfo nodeInfo = instanceToNodeMap.get(inst); + NodeInfo nodeInfo = instanceToNodeMap.get(inst.getWorkerIdentity()); if (nodeInfo != null && nodeInfo.canAcceptTask()) { LOG.info("Assigning " + inst + " when looking for " + host + "." + " FirstRequestedHost=" + (prefHostCount == 0) + @@ -584,20 +609,19 @@ private SelectHostResult selectHost(TaskInfo request) { } } /* fall through - miss in locality (random scheduling) */ - Entry[] all = - instanceToNodeMap.entrySet().toArray(new Entry[instanceToNodeMap.size()]); + Entry[] all = instanceToNodeMap.entrySet().toArray(new Entry[0]); // Check again if (all.length > 0) { int n = random.nextInt(all.length); // start at random offset and iterate whole list for (int i = 0; i < all.length; i++) { - Entry inst = all[(i + n) % all.length]; + Entry inst = all[(i + n) % all.length]; if (inst.getValue().canAcceptTask()) { LOG.info("Assigning " + inst + " when looking for any host, from #hosts=" + all.length + ", requestedHosts=" + ((requestedHosts == null || requestedHosts.length == 0) ? "null" : Arrays.toString(requestedHosts))); - return new SelectHostResult(inst.getKey(), inst.getValue()); + return new SelectHostResult(inst.getValue().getServiceInstance(), inst.getValue()); } } } @@ -607,24 +631,12 @@ private SelectHostResult selectHost(TaskInfo request) { } } - // TODO Each refresh operation should addNodes if they don't already exist. - // Even better would be to get notifications from the service impl when a node gets added or removed. - // Instead of having to walk through the entire list. The computation of a node getting added or - // removed already exists in the DynamicRegistry implementation. - private void refreshInstances() { - try { - activeInstances.refresh(); // handles its own sync - } catch (IOException ioe) { - LOG.warn("Could not refresh list of active instances", ioe); - } - } - private void scanForNodeChanges() { /* check again whether nodes are disabled or just missing */ writeLock.lock(); try { for (ServiceInstance inst : activeInstances.getAll().values()) { - if (inst.isAlive() && instanceToNodeMap.containsKey(inst) == false) { + if (inst.isAlive() && instanceToNodeMap.containsKey(inst.getWorkerIdentity()) == false) { /* that's a good node, not added to the allocations yet */ LOG.info("Found a new node: " + inst + "."); addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, numSchedulableTasksPerNode)); @@ -637,7 +649,7 @@ private void scanForNodeChanges() { private void addNode(ServiceInstance inst, NodeInfo node) { LOG.info("Adding node: " + inst); - instanceToNodeMap.put(inst, node); + instanceToNodeMap.put(inst.getWorkerIdentity(), node); // Trigger scheduling since a new node became available. trySchedulingPendingTasks(); } @@ -645,11 +657,6 @@ private void addNode(ServiceInstance inst, NodeInfo node) { private void reenableDisabledNode(NodeInfo nodeInfo) { writeLock.lock(); try { - if (nodeInfo.hadCommFailure()) { - // If the node being re-enabled was not marked busy previously, then it was disabled due to - // some other failure. Refresh the service list to see if it's been removed permanently. - refreshInstances(); - } LOG.info("Attempting to re-enable node: " + nodeInfo.getServiceInstance()); if (nodeInfo.getServiceInstance().isAlive()) { nodeInfo.enableNode(); @@ -666,7 +673,7 @@ private void reenableDisabledNode(NodeInfo nodeInfo) { private void disableInstance(ServiceInstance instance, boolean isCommFailure) { writeLock.lock(); try { - NodeInfo nodeInfo = instanceToNodeMap.get(instance); + NodeInfo nodeInfo = instanceToNodeMap.get(instance.getWorkerIdentity()); if (nodeInfo == null || nodeInfo.isDisabled()) { if (LOG.isDebugEnabled()) { LOG.debug("Node: " + instance + " already disabled, or invalid. Not doing anything."); @@ -1012,7 +1019,6 @@ public Void call() { if (LOG.isDebugEnabled()) { LOG.debug("Refreshing instances based on poll interval"); } - refreshInstances(); scanForNodeChanges(); } }