diff --git llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java index 381c16b..62695b7 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java @@ -67,6 +67,9 @@ public LlapConfiguration() { // Section for configs used in the AM // public static final String LLAP_DAEMON_SERVICE_HOSTS = LLAP_DAEMON_PREFIX + "service.hosts"; + public static final String LLAP_DAEMON_SERVICE_REFRESH_INTERVAL = LLAP_DAEMON_PREFIX + "service.refresh.interval"; + public static final int LLAP_DAEMON_SERVICE_REFRESH_INTERVAL_DEFAULT = 60; // seconds + public static final String LLAP_DAEMON_COMMUNICATOR_NUM_THREADS = LLAP_DAEMON_PREFIX + "communicator.num.threads"; public static final int LLAP_DAEMON_COMMUNICATOR_NUM_THREADS_DEFAULT = 5; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java index 4517375..67a099a 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java @@ -85,6 +85,9 @@ public LlapOptionsProcessor() { options.addOption(OptionBuilder.hasArg().withArgName("loglevel").withLongOpt("loglevel") .withDescription("log levels for the llap instance").create('l')); + options.addOption(OptionBuilder.hasArg().withArgName("chaosmonkey").withLongOpt("chaosmonkey") + .withDescription("chaosmonkey interval").create('m')); + // [-H|--help] options.addOption(new Option("H", "help", false, "Print help information")); } @@ -101,7 +104,7 @@ public LlapOptions processOptions(String argv[]) throws ParseException { String directory = commandLine.getOptionValue("directory"); String name = commandLine.getOptionValue("name", null); - // loglevel & args are parsed by the python processor + // loglevel, chaosmonkey & args are parsed by the python processor return new LlapOptions(name, instances, directory); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstance.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstance.java new file mode 100644 index 0000000..f0f22aa --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstance.java @@ -0,0 +1,73 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.daemon.registry; + +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.Resource; + +public interface ServiceInstance { + + /** + * Worker identity is a UUID (unique across restarts), to identify a node which died & was brought + * back on the same host/port + */ + public String getWorkerIdentity(); + + /** + * Hostname of the service instance + * + * @return + */ + public String getHost(); + + /** + * RPC Endpoint for service instance + * + * @return + */ + public int getRpcPort(); + + /** + * Shuffle Endpoint for service instance + * + * @return + */ + public int getShufflePort(); + + /** + * Return the last known state (without refreshing) + * + * @return + */ + + public boolean isAlive(); + + /** + * Config properties of the Service Instance (llap.daemon.*) + * + * @return + */ + + public Map getProperties(); + + /** + * Memory and Executors available for the LLAP tasks + * + * This does not include the size of the cache or the actual vCores allocated via Slider. + * + * @return + */ + public Resource getResource(); +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstanceSet.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstanceSet.java new file mode 100644 index 0000000..7ab36d4 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstanceSet.java @@ -0,0 +1,57 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.daemon.registry; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; + +public interface ServiceInstanceSet { + + /** + * Get an instance mapping which map worker identity to each instance. + * + * The worker identity does not collide between restarts, so each restart will have a unique id, + * while having the same host/ip pair. + * + * @return + */ + public Map getAll(); + + /** + * Get an instance by worker identity. + * + * @param name + * @return + */ + public ServiceInstance getInstance(String name); + + /** + * Get a list of service instances for a given host. + * + * The list could include dead and alive instances. + * + * @param host + * @return + */ + public Set getByHost(String host); + + /** + * Refresh the instance set from registry backing store. + * + * @throws IOException + */ + public void refresh() throws IOException; + +} \ No newline at end of file diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceRegistry.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceRegistry.java new file mode 100644 index 0000000..a0f9aac --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceRegistry.java @@ -0,0 +1,59 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.daemon.registry; + +import java.io.IOException; + +/** + * ServiceRegistry interface for switching between fixed host and dynamic registry implementations. + */ +public interface ServiceRegistry { + + /** + * Start the service registry + * + * @throws InterruptedException + */ + public void start() throws InterruptedException; + + /** + * Stop the service registry + * + * @throws InterruptedException + */ + public void stop() throws InterruptedException; + + /** + * Register the current instance - the implementation takes care of the endpoints to register. + * + * @throws IOException + */ + public void register() throws IOException; + + /** + * Remove the current registration cleanly (implementation defined cleanup) + * + * @throws IOException + */ + public void unregister() throws IOException; + + /** + * Client API to get the list of instances registered via the current registry key. + * + * @param component + * @return + * @throws IOException + */ + public ServiceInstanceSet getInstances(String component) throws IOException; +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java new file mode 100644 index 0000000..c600e74 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java @@ -0,0 +1,204 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.daemon.registry.impl; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; +import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstance; +import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstanceSet; +import org.apache.hadoop.hive.llap.daemon.registry.ServiceRegistry; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.log4j.Logger; + +public class LlapFixedRegistryImpl implements ServiceRegistry { + + private static final Logger LOG = Logger.getLogger(LlapFixedRegistryImpl.class); + + private final int port; + private final int shuffle; + private final String[] hosts; + private final int memory; + private final int vcores; + + private final Map srv = new HashMap(); + + public LlapFixedRegistryImpl(String hosts, Configuration conf) { + this.hosts = hosts.split(","); + this.port = + conf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT, + LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT); + this.shuffle = + conf.getInt(LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT, + LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT); + + for (Map.Entry kv : conf) { + if (kv.getKey().startsWith(LlapConfiguration.LLAP_DAEMON_PREFIX) + || kv.getKey().startsWith("hive.llap.")) { + // TODO: read this somewhere useful, like the task scheduler + srv.put(kv.getKey(), kv.getValue()); + } + } + + this.memory = + conf.getInt(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, + LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT); + this.vcores = + conf.getInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, + LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT); + } + + @Override + public void start() throws InterruptedException { + // nothing to start + } + + @Override + public void stop() throws InterruptedException { + // nothing to stop + } + + @Override + public void register() throws IOException { + // nothing to register + } + + @Override + public void unregister() throws IOException { + // nothing to unregister + } + + public static String getWorkerIdentity(String host) { + // trigger clean errors for anyone who mixes up identity with hosts + return "host-" + host; + } + + private final class FixedServiceInstance implements ServiceInstance { + + private final String host; + + public FixedServiceInstance(String host) { + try { + InetAddress inetAddress = InetAddress.getByName(host); + if (NetUtils.isLocalAddress(inetAddress)) { + InetSocketAddress socketAddress = new InetSocketAddress(0); + socketAddress = NetUtils.getConnectAddress(socketAddress); + LOG.info("Adding host identified as local: " + host + " as " + + socketAddress.getHostName()); + host = socketAddress.getHostName(); + } + } catch (UnknownHostException e) { + LOG.warn("Ignoring resolution issues for host: " + host, e); + } + this.host = host; + } + + @Override + public String getWorkerIdentity() { + return LlapFixedRegistryImpl.getWorkerIdentity(host); + } + + @Override + public String getHost() { + return host; + } + + @Override + public int getRpcPort() { + // TODO: allow >1 port per host? + return LlapFixedRegistryImpl.this.port; + } + + @Override + public int getShufflePort() { + return LlapFixedRegistryImpl.this.shuffle; + } + + @Override + public boolean isAlive() { + return true; + } + + @Override + public Map getProperties() { + Map properties = new HashMap<>(srv); + // no worker identity + return properties; + } + + @Override + public Resource getResource() { + return Resource.newInstance(memory, vcores); + } + + } + + private final class FixedServiceInstanceSet implements ServiceInstanceSet { + + private final Map instances = new HashMap(); + + public FixedServiceInstanceSet() { + for (String host : hosts) { + // trigger bugs in anyone who uses this as a hostname + instances.put(getWorkerIdentity(host), new FixedServiceInstance(host)); + } + } + + @Override + public Map getAll() { + return instances; + } + + @Override + public ServiceInstance getInstance(String name) { + return instances.get(name); + } + + @Override + public Set getByHost(String host) { + Set byHost = new HashSet(); + ServiceInstance inst = getInstance(getWorkerIdentity(host)); + if (inst != null) { + byHost.add(inst); + } + return byHost; + } + + @Override + public void refresh() throws IOException { + // I will do no such thing + } + + } + + @Override + public ServiceInstanceSet getInstances(String component) throws IOException { + return new FixedServiceInstanceSet(); + } + + @Override + public String toString() { + return String.format("FixedRegistry hosts=%s", StringUtils.join(",", this.hosts)); + } +} \ No newline at end of file diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java index 67596fe..79042c5 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java @@ -1,16 +1,29 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.hive.llap.daemon.registry.impl; import java.io.IOException; -import java.net.Inet4Address; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.Map; +import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; -import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemon; -import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstanceSet; +import org.apache.hadoop.hive.llap.daemon.registry.ServiceRegistry; import org.apache.hadoop.registry.client.api.RegistryOperationsFactory; import org.apache.hadoop.registry.client.binding.RegistryPathUtils; import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; @@ -30,25 +43,8 @@ public class LlapRegistryService extends AbstractService { private static final Logger LOG = Logger.getLogger(LlapRegistryService.class); - - public final static String SERVICE_CLASS = "org-apache-hive"; - - private RegistryOperationsService client; - private String instanceName; - private Configuration conf; - private ServiceRecordMarshal encoder; - - private static final String hostname; - static { - String localhost = "localhost"; - try { - localhost = InetAddress.getLocalHost().getCanonicalHostName(); - } catch (UnknownHostException uhe) { - // ignore - } - hostname = localhost; - } + private ServiceRegistry registry = null; public LlapRegistryService() { super("LlapRegistryService"); @@ -56,92 +52,48 @@ public LlapRegistryService() { @Override public void serviceInit(Configuration conf) { - String registryId = conf.getTrimmed(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS); - if (registryId.startsWith("@")) { - LOG.info("Llap Registry is enabled with registryid: " + registryId); - this.conf = new Configuration(conf); - conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); - // registry reference - instanceName = registryId.substring(1); - client = (RegistryOperationsService) RegistryOperationsFactory.createInstance(conf); - encoder = new RegistryUtils.ServiceRecordMarshal(); - + String hosts = conf.getTrimmed(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS); + if (hosts.startsWith("@")) { + registry = initRegistry(hosts.substring(1), conf); } else { - LOG.info("Llap Registry is disabled"); + registry = new LlapFixedRegistryImpl(hosts, conf); } + LOG.info("Using LLAP registry type " + registry); + } + + private ServiceRegistry initRegistry(String instanceName, Configuration conf) { + return new LlapYarnRegistryImpl(instanceName, conf); } @Override public void serviceStart() throws Exception { - if (client != null) { - client.start(); + if (this.registry != null) { + this.registry.start(); } } + @Override public void serviceStop() throws Exception { - if (client != null) { - client.stop(); + if (this.registry != null) { + this.registry.start(); + } else { + LOG.warn("Stopping non-existent registry service"); } } - public Endpoint getRpcEndpoint() { - final int rpcPort = - conf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT, - LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT); - - return RegistryTypeUtils.ipcEndpoint("llap", new InetSocketAddress(hostname, rpcPort)); - } - - public Endpoint getShuffleEndpoint() { - final int shufflePort = - conf.getInt(LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT, - LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT); - // HTTP today, but might not be - return RegistryTypeUtils.inetAddrEndpoint("shuffle", ProtocolTypes.PROTOCOL_TCP, hostname, - shufflePort); - } - - private final String getPath() { - return RegistryPathUtils.join(RegistryUtils.componentPath(RegistryUtils.currentUser(), - SERVICE_CLASS, instanceName, "workers"), "worker-"); - } - public void registerWorker() throws IOException { - if (this.client != null) { - String path = getPath(); - ServiceRecord srv = new ServiceRecord(); - srv.addInternalEndpoint(getRpcEndpoint()); - srv.addInternalEndpoint(getShuffleEndpoint()); - - for (Map.Entry kv : this.conf) { - if (kv.getKey().startsWith(LlapConfiguration.LLAP_DAEMON_PREFIX)) { - // TODO: read this somewhere useful, like the allocator - srv.set(kv.getKey(), kv.getValue()); - } - } - - client.mknode(RegistryPathUtils.parentOf(path), true); - - // FIXME: YARN registry needs to expose Ephemeral_Seq nodes & return the paths - client.zkCreate(path, CreateMode.EPHEMERAL_SEQUENTIAL, encoder.toBytes(srv), - client.getClientAcls()); + if (this.registry != null) { + this.registry.register(); } } - public void unregisterWorker() { - if (this.client != null) { - // with ephemeral nodes, there's nothing to do here - // because the create didn't return paths + public void unregisterWorker() throws IOException { + if (this.registry != null) { + this.registry.unregister(); } } - public Map getWorkers() throws IOException { - if (this.client != null) { - String path = getPath(); - return RegistryUtils.listServiceRecords(client, RegistryPathUtils.parentOf(path)); - } else { - Preconditions.checkNotNull(this.client, "Yarn registry client is not intialized"); - return null; - } + public ServiceInstanceSet getInstances() throws IOException { + return this.registry.getInstances("LLAP"); } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapYarnRegistryImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapYarnRegistryImpl.java new file mode 100644 index 0000000..abf844f --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapYarnRegistryImpl.java @@ -0,0 +1,339 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.daemon.registry.impl; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.PathNotFoundException; +import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; +import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstance; +import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstanceSet; +import org.apache.hadoop.hive.llap.daemon.registry.ServiceRegistry; +import org.apache.hadoop.registry.client.api.RegistryOperationsFactory; +import org.apache.hadoop.registry.client.binding.RegistryPathUtils; +import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; +import org.apache.hadoop.registry.client.binding.RegistryUtils; +import org.apache.hadoop.registry.client.binding.RegistryUtils.ServiceRecordMarshal; +import org.apache.hadoop.registry.client.exceptions.InvalidRecordException; +import org.apache.hadoop.registry.client.impl.zk.RegistryOperationsService; +import org.apache.hadoop.registry.client.types.AddressTypes; +import org.apache.hadoop.registry.client.types.Endpoint; +import org.apache.hadoop.registry.client.types.ProtocolTypes; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.log4j.Logger; +import org.apache.zookeeper.CreateMode; + +import com.google.common.base.Preconditions; + +public class LlapYarnRegistryImpl implements ServiceRegistry { + + private static final Logger LOG = Logger.getLogger(LlapYarnRegistryImpl.class); + + private RegistryOperationsService client; + private String instanceName; + private Configuration conf; + private ServiceRecordMarshal encoder; + + private final DynamicServiceInstanceSet instances = new DynamicServiceInstanceSet(); + + private static final UUID uniq = UUID.randomUUID(); + private static final String hostname; + + private static final String UNIQUE_IDENTIFIER = "llap.unique.id"; + + private final static String SERVICE_CLASS = "org-apache-hive"; + + final ScheduledExecutorService refresher = Executors.newScheduledThreadPool(1); + final long refreshDelay; + + static { + String localhost = "localhost"; + try { + localhost = InetAddress.getLocalHost().getCanonicalHostName(); + } catch (UnknownHostException uhe) { + // ignore + } + hostname = localhost; + } + + public LlapYarnRegistryImpl(String instanceName, Configuration conf) { + + LOG.info("Llap Registry is enabled with registryid: " + instanceName); + this.conf = new Configuration(conf); + this.instanceName = instanceName; + conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); + // registry reference + client = (RegistryOperationsService) RegistryOperationsFactory.createInstance(conf); + encoder = new RegistryUtils.ServiceRecordMarshal(); + refreshDelay = + conf.getInt(LlapConfiguration.LLAP_DAEMON_SERVICE_REFRESH_INTERVAL, + LlapConfiguration.LLAP_DAEMON_SERVICE_REFRESH_INTERVAL_DEFAULT); + Preconditions.checkArgument(refreshDelay > 0, + "Refresh delay for registry has to be positive = %d", refreshDelay); + } + + public Endpoint getRpcEndpoint() { + final int rpcPort = + conf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT, + LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT); + return RegistryTypeUtils.ipcEndpoint("llap", new InetSocketAddress(hostname, rpcPort)); + } + + public Endpoint getShuffleEndpoint() { + final int shufflePort = + conf.getInt(LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT, + LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT); + // HTTP today, but might not be + return RegistryTypeUtils.inetAddrEndpoint("shuffle", ProtocolTypes.PROTOCOL_TCP, hostname, + shufflePort); + } + + private final String getPath() { + return RegistryPathUtils.join(RegistryUtils.componentPath(RegistryUtils.currentUser(), + SERVICE_CLASS, instanceName, "workers"), "worker-"); + } + + @Override + public void register() throws IOException { + String path = getPath(); + ServiceRecord srv = new ServiceRecord(); + srv.addInternalEndpoint(getRpcEndpoint()); + srv.addInternalEndpoint(getShuffleEndpoint()); + + for (Map.Entry kv : this.conf) { + if (kv.getKey().startsWith(LlapConfiguration.LLAP_DAEMON_PREFIX) + || kv.getKey().startsWith("hive.llap.")) { + // TODO: read this somewhere useful, like the task scheduler + srv.set(kv.getKey(), kv.getValue()); + } + } + + // restart sensitive instance id + srv.set(UNIQUE_IDENTIFIER, uniq.toString()); + + client.mknode(RegistryPathUtils.parentOf(path), true); + + // FIXME: YARN registry needs to expose Ephemeral_Seq nodes & return the paths + client.zkCreate(path, CreateMode.EPHEMERAL_SEQUENTIAL, encoder.toBytes(srv), + client.getClientAcls()); + } + + @Override + public void unregister() throws IOException { + // Nothing for the zkCreate models + } + + private class DynamicServiceInstance implements ServiceInstance { + + private final ServiceRecord srv; + private boolean alive = true; + private final String host; + private final int rpcPort; + private final int shufflePort; + + public DynamicServiceInstance(ServiceRecord srv) throws IOException { + this.srv = srv; + + final Endpoint shuffle = srv.getInternalEndpoint("shuffle"); + final Endpoint rpc = srv.getInternalEndpoint("llap"); + + this.host = + RegistryTypeUtils.getAddressField(rpc.addresses.get(0), + AddressTypes.ADDRESS_HOSTNAME_FIELD); + this.rpcPort = + Integer.valueOf(RegistryTypeUtils.getAddressField(rpc.addresses.get(0), + AddressTypes.ADDRESS_PORT_FIELD)); + this.shufflePort = + Integer.valueOf(RegistryTypeUtils.getAddressField(shuffle.addresses.get(0), + AddressTypes.ADDRESS_PORT_FIELD)); + } + + @Override + public String getWorkerIdentity() { + return srv.get(UNIQUE_IDENTIFIER); + } + + @Override + public String getHost() { + return host; + } + + @Override + public int getRpcPort() { + return rpcPort; + } + + @Override + public int getShufflePort() { + return shufflePort; + } + + @Override + public boolean isAlive() { + return alive ; + } + + public void kill() { + LOG.info("Killing " + this); + this.alive = false; + } + + @Override + public Map getProperties() { + return srv.attributes(); + } + + @Override + public Resource getResource() { + int memory = Integer.valueOf(srv.get(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB)); + int vCores = Integer.valueOf(srv.get(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS)); + return Resource.newInstance(memory, vCores); + } + + @Override + public String toString() { + return "DynamicServiceInstance [alive=" + alive + ", host=" + host + ":" + rpcPort + "]"; + } + } + + private class DynamicServiceInstanceSet implements ServiceInstanceSet { + + Map instances; + + @Override + public Map getAll() { + return instances; + } + + @Override + public ServiceInstance getInstance(String name) { + return instances.get(name); + } + + @Override + public synchronized void refresh() throws IOException { + /* call this from wherever */ + Map freshInstances = new HashMap(); + + String path = getPath(); + Map records = + RegistryUtils.listServiceRecords(client, RegistryPathUtils.parentOf(path)); + Set latestKeys = new HashSet(); + LOG.info("Starting to refresh ServiceInstanceSet " + System.identityHashCode(this)); + for (ServiceRecord rec : records.values()) { + ServiceInstance instance = new DynamicServiceInstance(rec); + if (instance != null) { + if (instances != null && instances.containsKey(instance.getWorkerIdentity()) == false) { + // add a new object + freshInstances.put(instance.getWorkerIdentity(), instance); + if (LOG.isInfoEnabled()) { + LOG.info("Adding new worker " + instance.getWorkerIdentity() + " which mapped to " + + instance); + } + } else if (LOG.isDebugEnabled()) { + LOG.debug("Retaining running worker " + instance.getWorkerIdentity() + " which mapped to " + instance); + } + } + latestKeys.add(instance.getWorkerIdentity()); + } + + if (instances != null) { + // deep-copy before modifying + Set oldKeys = new HashSet(instances.keySet()); + if (oldKeys.removeAll(latestKeys)) { + for (String k : oldKeys) { + // this is so that people can hold onto ServiceInstance references as placeholders for tasks + final DynamicServiceInstance dead = (DynamicServiceInstance) instances.get(k); + dead.kill(); + if (LOG.isInfoEnabled()) { + LOG.info("Deleting dead worker " + k + " which mapped to " + dead); + } + } + } + this.instances.keySet().removeAll(oldKeys); + this.instances.putAll(freshInstances); + } else { + this.instances = freshInstances; + } + } + + @Override + public Set getByHost(String host) { + Set byHost = new HashSet(); + for (ServiceInstance i : instances.values()) { + if (host.equals(i.getHost())) { + // all hosts in instances should be alive in this impl + byHost.add(i); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Locality comparing " + host + " to " + i.getHost()); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Returning " + byHost.size() + " hosts for locality allocation on " + host); + } + return byHost; + } + } + + @Override + public ServiceInstanceSet getInstances(String component) throws IOException { + Preconditions.checkArgument("LLAP".equals(component)); // right now there is only 1 component + if (this.client != null) { + instances.refresh(); + return instances; + } else { + Preconditions.checkNotNull(this.client, "Yarn registry client is not intialized"); + return null; + } + } + + @Override + public void start() { + if (client != null) { + client.start(); + refresher.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + try { + instances.refresh(); + } catch (IOException ioe) { + LOG.warn("Could not refresh hosts during scheduled refresh", ioe); + } + } + }, 0, refreshDelay, TimeUnit.SECONDS); + } + } + + @Override + public void stop() { + if (client != null) { + client.stop(); + } + } +} diff --git llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java index a50b159..147e0a9 100644 --- llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java +++ llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java @@ -15,16 +15,12 @@ package org.apache.tez.dag.app.rm; import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.UnknownHostException; import java.util.Arrays; import java.util.Comparator; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -42,24 +38,16 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.primitives.Ints; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; -import org.apache.hadoop.registry.client.types.AddressTypes; -import org.apache.hadoop.registry.client.types.Endpoint; -import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; +import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstance; +import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstanceSet; +import org.apache.hadoop.hive.llap.daemon.registry.impl.LlapRegistryService; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -67,12 +55,16 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; -import org.apache.hadoop.hive.llap.daemon.registry.impl.LlapRegistryService; import org.apache.hadoop.yarn.util.Clock; import org.apache.tez.dag.api.TaskAttemptEndReason; import org.apache.tez.dag.app.AppContext; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; public class LlapTaskSchedulerService extends TaskSchedulerService { @@ -83,59 +75,55 @@ private final ExecutorService appCallbackExecutor; private final TaskSchedulerAppCallback appClientDelegate; - // Set of active hosts + // interface into the registry service + private ServiceInstanceSet activeInstances; + @VisibleForTesting - final LinkedHashMap activeHosts = new LinkedHashMap<>(); - // Populated each time activeHosts is modified + final Map instanceToNodeMap = new HashMap<>(); + @VisibleForTesting - String []activeHostList; + final Set instanceBlackList = new HashSet(); - // Set of all hosts in the system. @VisibleForTesting - final ConcurrentMap allHosts = new ConcurrentHashMap<>(); - // Tracks currently allocated containers. - private final Map containerToHostMap = new HashMap<>(); + final Map containerToInstanceMap = new HashMap<>(); // Tracks tasks which could not be allocated immediately. @VisibleForTesting - final TreeMap> pendingTasks = - new TreeMap<>(new Comparator() { - @Override - public int compare(Priority o1, Priority o2) { - return o1.getPriority() - o2.getPriority(); - } - }); + final TreeMap> pendingTasks = new TreeMap<>(new Comparator() { + @Override + public int compare(Priority o1, Priority o2) { + return o1.getPriority() - o2.getPriority(); + } + }); // Tracks running and queued tasks. Cleared after a task completes. - private final ConcurrentMap knownTasks = - new ConcurrentHashMap<>(); + private final ConcurrentMap knownTasks = new ConcurrentHashMap<>(); @VisibleForTesting final DelayQueue disabledNodes = new DelayQueue<>(); private final ContainerFactory containerFactory; private final Random random = new Random(); - private final int containerPort; private final Clock clock; private final ListeningExecutorService executor; private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock(); - - // TODO Track resources used by this application on specific hosts, and make scheduling decisions accordingly. + // TODO Track resources used by this application on specific hosts, and make scheduling decisions + // accordingly. // Ideally implement in a way where updates from ZK, if they do come, can just be plugged in. // A heap based on available capacity - which is updated each time stats are updated, // or anytime assignment numbers are changed. Especially for random allocations (no host request). - // For non-random allocations - Walk through all pending tasks to get local assignments, then start assigning them to non local hosts. + // For non-random allocations - Walk through all pending tasks to get local assignments, then + // start assigning them to non local hosts. // Also setup a max over-subscribe limit as part of this. private final AtomicBoolean isStopped = new AtomicBoolean(false); private final long nodeReEnableTimeout; - // Per daemon private final int memoryPerInstance; private final int coresPerInstance; @@ -144,9 +132,9 @@ public int compare(Priority o1, Priority o2) { // Per Executor Thread private final Resource resourcePerExecutor; - private final boolean initFromRegistry; private final LlapRegistryService registry = new LlapRegistryService(); - private final PendingTaskSchedulerCallable pendingTaskSchedulerCallable = new PendingTaskSchedulerCallable(); + private final PendingTaskSchedulerCallable pendingTaskSchedulerCallable = + new PendingTaskSchedulerCallable(); private ListenableFuture pendingTaskSchedulerFuture; @VisibleForTesting @@ -156,14 +144,9 @@ public int compare(Priority o1, Priority o2) { @VisibleForTesting StatsPerDag dagStats = new StatsPerDag(); - - - - public LlapTaskSchedulerService(TaskSchedulerAppCallback appClient, AppContext appContext, - String clientHostname, int clientPort, String trackingUrl, - long customAppIdIdentifier, - Configuration conf) { + String clientHostname, int clientPort, String trackingUrl, long customAppIdIdentifier, + Configuration conf) { // Accepting configuration here to allow setting up fields as final super(LlapTaskSchedulerService.class.getName()); @@ -171,17 +154,18 @@ public LlapTaskSchedulerService(TaskSchedulerAppCallback appClient, AppContext a this.appClientDelegate = createAppCallbackDelegate(appClient); this.clock = appContext.getClock(); this.containerFactory = new ContainerFactory(appContext, customAppIdIdentifier); - this.memoryPerInstance = conf - .getInt(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, + this.memoryPerInstance = + conf.getInt(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT); - this.coresPerInstance = conf - .getInt(LlapConfiguration.LLAP_DAEMON_VCPUS_PER_INSTANCE, + this.coresPerInstance = + conf.getInt(LlapConfiguration.LLAP_DAEMON_VCPUS_PER_INSTANCE, LlapConfiguration.LLAP_DAEMON_VCPUS_PER_INSTANCE_DEFAULT); - this.executorsPerInstance = conf.getInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, - LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT); - this.nodeReEnableTimeout = conf.getLong( - LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_NODE_REENABLE_TIMEOUT_MILLIS, - LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_NODE_REENABLE_TIMEOUT_MILLIS_DEFAULT); + this.executorsPerInstance = + conf.getInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, + LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT); + this.nodeReEnableTimeout = + conf.getLong(LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_NODE_REENABLE_TIMEOUT_MILLIS, + LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_NODE_REENABLE_TIMEOUT_MILLIS_DEFAULT); int memoryPerExecutor = (int) (memoryPerInstance / (float) executorsPerInstance); int coresPerExecutor = (int) (coresPerInstance / (float) executorsPerInstance); @@ -189,121 +173,39 @@ public LlapTaskSchedulerService(TaskSchedulerAppCallback appClient, AppContext a String instanceId = conf.getTrimmed(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS); - Preconditions.checkNotNull(instanceId, - LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS + " must be defined"); - - if (!instanceId.startsWith("@")) { // Manual setup. Not via the service registry - initFromRegistry = false; - String[] hosts = conf.getTrimmedStrings(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS); - Preconditions.checkState(hosts != null && hosts.length != 0, - LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS + "must be defined"); - for (String host : hosts) { - // If reading addresses from conf, try resolving local addresses so that - // this matches with the address reported by daemons. - InetAddress inetAddress = null; - try { - inetAddress = InetAddress.getByName(host); - if (NetUtils.isLocalAddress(inetAddress)) { - InetSocketAddress socketAddress = new InetSocketAddress(0); - socketAddress = NetUtils.getConnectAddress(socketAddress); - LOG.info("Adding host identified as local: " + host + " as " + socketAddress.getHostName()); - host = socketAddress.getHostName(); - } - } catch (UnknownHostException e) { - LOG.warn("Ignoring resolution issues for host: " + host, e); - } - NodeInfo nodeInfo = new NodeInfo(host, BACKOFF_FACTOR, clock); - activeHosts.put(host, nodeInfo); - allHosts.put(host, nodeInfo); - } - activeHostList = activeHosts.keySet().toArray(new String[activeHosts.size()]); - } else { - initFromRegistry = true; - } + Preconditions.checkNotNull(instanceId, LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS + + " must be defined"); - this.containerPort = conf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT, - LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT); - ExecutorService executorService = Executors.newFixedThreadPool(1, - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapScheduler").build()); + ExecutorService executorService = + Executors.newFixedThreadPool(1, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapScheduler").build()); executor = MoreExecutors.listeningDecorator(executorService); - if (activeHosts.size() > 0) { - LOG.info("Running with configuration: " + - "memoryPerInstance=" + memoryPerInstance + - ", vCoresPerInstance=" + coresPerInstance + - ", executorsPerInstance=" + executorsPerInstance + - ", resourcePerInstanceInferred=" + resourcePerExecutor + - ", hosts=" + allHosts.keySet() + - ", rpcPort=" + containerPort + - ", nodeReEnableTimeout=" + nodeReEnableTimeout + - ", nodeReEnableBackOffFactor=" + BACKOFF_FACTOR); - } else { - LOG.info("Running with configuration: " + - "memoryPerInstance=" + memoryPerInstance + - ", vCoresPerInstance=" + coresPerInstance + - ", executorsPerInstance=" + executorsPerInstance + - ", resourcePerInstanceInferred=" + resourcePerExecutor + - ", hosts=" + - ", rpcPort=" + - ", nodeReEnableTimeout=" + nodeReEnableTimeout + - ", nodeReEnableBackOffFactor=" + BACKOFF_FACTOR); - } - + LOG.info("Running with configuration: " + "memoryPerInstance=" + memoryPerInstance + + ", vCoresPerInstance=" + coresPerInstance + ", executorsPerInstance=" + + executorsPerInstance + ", resourcePerInstanceInferred=" + resourcePerExecutor + + ", nodeReEnableTimeout=" + nodeReEnableTimeout + ", nodeReEnableBackOffFactor=" + + BACKOFF_FACTOR); } @Override public void serviceInit(Configuration conf) { - if (initFromRegistry) { - registry.init(conf); - } + registry.init(conf); } - @Override public void serviceStart() throws IOException { - writeLock.lock(); try { pendingTaskSchedulerFuture = executor.submit(pendingTaskSchedulerCallable); - if (initFromRegistry) { - registry.start(); - if (activeHosts.size() > 0) { - return; - } - LOG.info("Reading YARN registry for service records"); - - Map workers = registry.getWorkers(); - for (ServiceRecord srv : workers.values()) { - Endpoint rpc = srv.getInternalEndpoint("llap"); - if (rpc != null) { - LOG.info("Examining endpoint: " + rpc); - final String host = - RegistryTypeUtils.getAddressField(rpc.addresses.get(0), - AddressTypes.ADDRESS_HOSTNAME_FIELD); - NodeInfo nodeInfo = new NodeInfo(host, BACKOFF_FACTOR, clock); - activeHosts.put(host, nodeInfo); - allHosts.put(host, nodeInfo); - } else { - - LOG.info("The SRV record was " + srv); - } - } - activeHostList = activeHosts.keySet().toArray(new String[activeHosts.size()]); - - - - LOG.info("Re-inited with configuration: " + - "memoryPerInstance=" + memoryPerInstance + - ", vCoresPerInstance=" + coresPerInstance + - ", executorsPerInstance=" + executorsPerInstance + - ", resourcePerInstanceInferred=" + resourcePerExecutor + - ", hosts=" + allHosts.keySet()); - + registry.start(); + activeInstances = registry.getInstances(); + for (ServiceInstance inst : activeInstances.getAll().values()) { + addNode(inst, new NodeInfo(inst, BACKOFF_FACTOR, clock)); } } finally { writeLock.unlock(); } - } @Override @@ -316,7 +218,7 @@ public void serviceStop() { pendingTaskSchedulerFuture.cancel(true); } executor.shutdownNow(); - if (initFromRegistry) { + if (registry != null) { registry.stop(); } appCallbackExecutor.shutdownNow(); @@ -327,18 +229,68 @@ public void serviceStop() { } @Override + public Resource getTotalResources() { + int memory = 0; + int vcores = 0; + readLock.lock(); + try { + for (ServiceInstance inst : activeInstances.getAll().values()) { + if (inst.isAlive()) { + Resource r = inst.getResource(); + LOG.info("Found instance " + inst + " with " + r); + memory += r.getMemory(); + vcores += r.getVirtualCores(); + } else { + LOG.info("Ignoring dead instance " + inst); + } + } + } finally { + readLock.unlock(); + } + + return Resource.newInstance(memory, vcores); + } + + /** + * The difference between this and getTotalResources() is that this only gives currently free + * resource instances, while the other lists all the instances that may become available in a + * while. + */ + @Override public Resource getAvailableResources() { - // TODO This needs information about all running executors, and the amount of memory etc available across the cluster. - // No lock required until this moves to using something other than allHosts - return Resource - .newInstance(Ints.checkedCast(allHosts.size() * memoryPerInstance), - allHosts.size() * coresPerInstance); + // need a state store eventually for current state & measure backoffs + int memory = 0; + int vcores = 0; + readLock.lock(); + try { + for (ServiceInstance inst : instanceToNodeMap.keySet()) { + if (inst.isAlive()) { + Resource r = inst.getResource(); + memory += r.getMemory(); + vcores += r.getVirtualCores(); + } + } + } finally { + readLock.unlock(); + } + + return Resource.newInstance(memory, vcores); } @Override public int getClusterNodeCount() { - // No lock required until this moves to using something other than allHosts - return allHosts.size(); + readLock.lock(); + try { + int n = 0; + for (ServiceInstance inst : activeInstances.getAll().values()) { + if (inst.isAlive()) { + n++; + } + } + return n; + } finally { + readLock.unlock(); + } } @Override @@ -350,14 +302,6 @@ public void resetMatchLocalityForAllHeldContainers() { } @Override - public Resource getTotalResources() { - // No lock required until this moves to using something other than allHosts - return Resource - .newInstance(Ints.checkedCast(allHosts.size() * memoryPerInstance), - allHosts.size() * coresPerInstance); - } - - @Override public void blacklistNode(NodeId nodeId) { LOG.info("DEBUG: BlacklistNode not supported"); } @@ -369,8 +313,9 @@ public void unblacklistNode(NodeId nodeId) { @Override public void allocateTask(Object task, Resource capability, String[] hosts, String[] racks, - Priority priority, Object containerSignature, Object clientCookie) { - TaskInfo taskInfo = new TaskInfo(task, clientCookie, priority, capability, hosts, racks, clock.getTime()); + Priority priority, Object containerSignature, Object clientCookie) { + TaskInfo taskInfo = + new TaskInfo(task, clientCookie, priority, capability, hosts, racks, clock.getTime()); writeLock.lock(); try { dagStats.registerTaskRequest(hosts, racks); @@ -383,13 +328,13 @@ public void allocateTask(Object task, Resource capability, String[] hosts, Strin } } - @Override public void allocateTask(Object task, Resource capability, ContainerId containerId, - Priority priority, Object containerSignature, Object clientCookie) { + Priority priority, Object containerSignature, Object clientCookie) { // Container affinity can be implemented as Host affinity for LLAP. Not required until // 1:1 edges are used in Hive. - TaskInfo taskInfo = new TaskInfo(task, clientCookie, priority, capability, null, null, clock.getTime()); + TaskInfo taskInfo = + new TaskInfo(task, clientCookie, priority, capability, null, null, clock.getTime()); writeLock.lock(); try { dagStats.registerTaskRequest(null, null); @@ -411,43 +356,49 @@ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEnd try { taskInfo = knownTasks.remove(task); if (taskInfo == null) { - LOG.error("Could not determine ContainerId for task: " + task + - " . Could have hit a race condition. Ignoring." + - " The query may hang since this \"unknown\" container is now taking up a slot permanently"); + LOG.error("Could not determine ContainerId for task: " + + task + + " . Could have hit a race condition. Ignoring." + + " The query may hang since this \"unknown\" container is now taking up a slot permanently"); return false; } if (taskInfo.containerId == null) { if (taskInfo.assigned) { - LOG.error( - "Task: " + task + " assigned, but could not find the corresponding containerId." + - " The query may hang since this \"unknown\" container is now taking up a slot permanently"); + LOG.error("Task: " + + task + + " assigned, but could not find the corresponding containerId." + + " The query may hang since this \"unknown\" container is now taking up a slot permanently"); } else { - LOG.info("Ignoring deallocate request for task " + task + - " which hasn't been assigned to a container"); + LOG.info("Ignoring deallocate request for task " + task + + " which hasn't been assigned to a container"); removePendingTask(taskInfo); } return false; } - String hostForContainer = containerToHostMap.remove(taskInfo.containerId); + String hostForContainer = containerToInstanceMap.remove(taskInfo.containerId); assert hostForContainer != null; - String assignedHost = taskInfo.assignedHost; - assert assignedHost != null; + ServiceInstance assignedInstance = taskInfo.assignedInstance; + assert assignedInstance != null; if (taskSucceeded) { - // The node may have been blacklisted at this point - which means it may not be in the activeNodeList. - NodeInfo nodeInfo = allHosts.get(assignedHost); + // The node may have been blacklisted at this point - which means it may not be in the + // activeNodeList. + NodeInfo nodeInfo = instanceToNodeMap.get(assignedInstance); assert nodeInfo != null; nodeInfo.registerTaskSuccess(); - // TODO Consider un-blacklisting the node since at least 1 slot should have become available on the node. - } else if (!taskSucceeded && endReason != null && EnumSet - .of(TaskAttemptEndReason.SERVICE_BUSY, TaskAttemptEndReason.COMMUNICATION_ERROR) - .contains(endReason)) { + // TODO Consider un-blacklisting the node since at least 1 slot should have become available + // on the node. + } else if (!taskSucceeded + && endReason != null + && EnumSet + .of(TaskAttemptEndReason.SERVICE_BUSY, TaskAttemptEndReason.COMMUNICATION_ERROR) + .contains(endReason)) { if (endReason == TaskAttemptEndReason.COMMUNICATION_ERROR) { - dagStats.registerCommFailure(taskInfo.assignedHost); + dagStats.registerCommFailure(taskInfo.assignedInstance.getHost()); } else if (endReason == TaskAttemptEndReason.SERVICE_BUSY) { - dagStats.registerTaskRejected(taskInfo.assignedHost); + dagStats.registerTaskRejected(taskInfo.assignedInstance.getHost()); } - disableNode(assignedHost); + disableInstance(assignedInstance, endReason == TaskAttemptEndReason.SERVICE_BUSY); } } finally { writeLock.unlock(); @@ -479,87 +430,137 @@ private ExecutorService createAppCallbackExecutorService() { } @VisibleForTesting - TaskSchedulerAppCallback createAppCallbackDelegate( - TaskSchedulerAppCallback realAppClient) { - return new TaskSchedulerAppCallbackWrapper(realAppClient, - appCallbackExecutor); + TaskSchedulerAppCallback createAppCallbackDelegate(TaskSchedulerAppCallback realAppClient) { + return new TaskSchedulerAppCallbackWrapper(realAppClient, appCallbackExecutor); } /** * @param requestedHosts the list of preferred hosts. null implies any host * @return */ - private String selectHost(String[] requestedHosts) { - // TODO Change this to work off of what we think is remaining capacity for a host - + private ServiceInstance selectHost(TaskInfo request) { + String[] requestedHosts = request.requestedHosts; readLock.lock(); // Read-lock. Not updating any stats at the moment. try { - // Check if any hosts are active. If there's any active host, an allocation will happen. - if (activeHosts.size() == 0) { + // Check if any hosts are active. + if (getAvailableResources().getMemory() <= 0) { + refreshInstances(); + } + + // If there's no memory available, fail + if (getTotalResources().getMemory() <= 0) { return null; } - String host = null; - if (requestedHosts != null && requestedHosts.length > 0) { - // Pick the first host always. Weak attempt at cache affinity. - host = requestedHosts[0]; - if (activeHosts.get(host) != null) { - LOG.info("Selected host: " + host + " from requested hosts: " + - Arrays.toString(requestedHosts)); - } else { - LOG.info("Preferred host: " + host + " not present. Attempting to select another one"); - host = null; - for (String h : requestedHosts) { - if (activeHosts.get(h) != null) { - host = h; - break; + if (requestedHosts != null) { + for (String host : requestedHosts) { + // Pick the first host always. Weak attempt at cache affinity. + Set instances = activeInstances.getByHost(host); + if (!instances.isEmpty()) { + for (ServiceInstance inst : instances) { + if (inst.isAlive() && instanceToNodeMap.containsKey(inst)) { + // only allocate from the "available" list + // TODO Change this to work off of what we think is remaining capacity for an + // instance + LOG.info("Assigning " + inst + " when looking for " + host); + return inst; + } } } - if (host == null) { - host = activeHostList[random.nextInt(activeHostList.length)]; - LOG.info("Requested hosts: " + Arrays.toString(requestedHosts) + - " not present. Randomizing the host"); + } + } + /* fall through - miss in locality (random scheduling) */ + ServiceInstance[] all = instanceToNodeMap.keySet().toArray(new ServiceInstance[0]); + // Check again + if (all.length > 0) { + int n = random.nextInt(all.length); + // start at random offset and iterate whole list + for (int i = 0; i < all.length; i++) { + ServiceInstance inst = all[(i + n) % all.length]; + if (inst.isAlive()) { + LOG.info("Assigning " + inst + " when looking for any host"); + return inst; } } - } else { - host = activeHostList[random.nextInt(activeHostList.length)]; - LOG.info("Selected random host: " + host + " since the request contained no host information"); } - return host; } finally { readLock.unlock(); } + + /* check again whether nodes are disabled or just missing */ + writeLock.lock(); + try { + for (ServiceInstance inst : activeInstances.getAll().values()) { + if (inst.isAlive() && instanceBlackList.contains(inst) == false + && instanceToNodeMap.containsKey(inst) == false) { + /* that's a good node, not added to the allocations yet */ + addNode(inst, new NodeInfo(inst, BACKOFF_FACTOR, clock)); + // mark it as disabled to let the pending tasks go there + disableInstance(inst, true); + } + } + /* do not allocate nodes from this process, as then the pending tasks will get starved */ + } finally { + writeLock.unlock(); + } + return null; } + private void refreshInstances() { + try { + activeInstances.refresh(); // handles its own sync + } catch (IOException ioe) { + LOG.warn("Could not refresh list of active instances", ioe); + } + } + + private void addNode(ServiceInstance inst, NodeInfo node) { + instanceToNodeMap.put(inst, node); + } private void reenableDisabledNode(NodeInfo nodeInfo) { writeLock.lock(); try { - nodeInfo.enableNode(); - activeHosts.put(nodeInfo.hostname, nodeInfo); - activeHostList = activeHosts.keySet().toArray(new String[activeHosts.size()]); + if (!nodeInfo.isBusy()) { + refreshInstances(); + } + if (nodeInfo.host.isAlive()) { + nodeInfo.enableNode(); + instanceBlackList.remove(nodeInfo.host); + instanceToNodeMap.put(nodeInfo.host, nodeInfo); + } else { + if (LOG.isInfoEnabled()) { + LOG.info("Removing dead node " + nodeInfo); + } + } } finally { writeLock.unlock(); } } - private void disableNode(String hostname) { + private void disableInstance(ServiceInstance instance, boolean busy) { writeLock.lock(); try { - NodeInfo nodeInfo = activeHosts.remove(hostname); + NodeInfo nodeInfo = instanceToNodeMap.remove(instance); if (nodeInfo == null) { - LOG.debug("Node: " + hostname + " already disabled, or invalid. Not doing anything."); + if (LOG.isDebugEnabled()) { + LOG.debug("Node: " + instance + " already disabled, or invalid. Not doing anything."); + } } else { + instanceBlackList.add(instance); nodeInfo.disableNode(nodeReEnableTimeout); + nodeInfo.setBusy(busy); // daemon failure vs daemon busy + // TODO: handle task to container map events in case of hard failures disabledNodes.add(nodeInfo); + if (LOG.isInfoEnabled()) { + LOG.info("Disabling instance " + instance + " for " + nodeReEnableTimeout + " seconds"); + } } - activeHostList = activeHosts.keySet().toArray(new String[activeHosts.size()]); } finally { writeLock.unlock(); } } - private void addPendingTask(TaskInfo taskInfo) { writeLock.lock(); try { @@ -582,8 +583,8 @@ private void removePendingTask(TaskInfo taskInfo) { Priority priority = taskInfo.priority; List taskInfoList = pendingTasks.get(priority); if (taskInfoList == null || taskInfoList.isEmpty() || !taskInfoList.remove(taskInfo)) { - LOG.warn( - "Could not find task: " + taskInfo.task + " in pending list, at priority: " + priority); + LOG.warn("Could not find task: " + taskInfo.task + " in pending list, at priority: " + + priority); } } finally { writeLock.unlock(); @@ -593,7 +594,8 @@ private void removePendingTask(TaskInfo taskInfo) { private void schedulePendingTasks() { writeLock.lock(); try { - Iterator>> pendingIterator = pendingTasks.entrySet().iterator(); + Iterator>> pendingIterator = + pendingTasks.entrySet().iterator(); while (pendingIterator.hasNext()) { Entry> entry = pendingIterator.next(); List taskListAtPriority = entry.getValue(); @@ -625,18 +627,20 @@ private void schedulePendingTasks() { } private boolean scheduleTask(TaskInfo taskInfo) { - String host = selectHost(taskInfo.requestedHosts); + ServiceInstance host = selectHost(taskInfo); if (host == null) { return false; } else { Container container = - containerFactory.createContainer(resourcePerExecutor, taskInfo.priority, host, containerPort); + containerFactory.createContainer(resourcePerExecutor, taskInfo.priority, host.getHost(), + host.getRpcPort()); writeLock.lock(); // While updating local structures try { - dagStats.registerTaskAllocated(taskInfo.requestedHosts, taskInfo.requestedRacks, host); + dagStats.registerTaskAllocated(taskInfo.requestedHosts, taskInfo.requestedRacks, + host.getHost()); taskInfo.setAssignmentInfo(host, container.getId()); knownTasks.putIfAbsent(taskInfo.task, taskInfo); - containerToHostMap.put(container.getId(), host); + containerToInstanceMap.put(container.getId(), host.getWorkerIdentity()); } finally { writeLock.unlock(); } @@ -683,16 +687,17 @@ public void shutdown() { @VisibleForTesting static class NodeInfo implements Delayed { private final float constBackOffFactor; - final String hostname; + final ServiceInstance host; private final Clock clock; long expireTimeMillis = -1; private long numSuccessfulTasks = 0; private long numSuccessfulTasksAtLastBlacklist = -1; float cumulativeBackoffFactor = 1.0f; + private boolean busy; - NodeInfo(String hostname, float backoffFactor, Clock clock) { - this.hostname = hostname; + NodeInfo(ServiceInstance host, float backoffFactor, Clock clock) { + this.host = host; constBackOffFactor = backoffFactor; this.clock = clock; } @@ -716,9 +721,18 @@ void disableNode(long duration) { } void registerTaskSuccess() { + this.busy = false; // if a task exited, we might have free slots numSuccessfulTasks++; } + public void setBusy(boolean busy) { + this.busy = busy; + } + + public boolean isBusy() { + return busy; + } + @Override public long getDelay(TimeUnit unit) { return expireTimeMillis - clock.getTime(); @@ -738,18 +752,13 @@ public int compareTo(Delayed o) { @Override public String toString() { - return "NodeInfo{" + - "constBackOffFactor=" + constBackOffFactor + - ", hostname='" + hostname + '\'' + - ", expireTimeMillis=" + expireTimeMillis + - ", numSuccessfulTasks=" + numSuccessfulTasks + - ", numSuccessfulTasksAtLastBlacklist=" + numSuccessfulTasksAtLastBlacklist + - ", cumulativeBackoffFactor=" + cumulativeBackoffFactor + - '}'; + return "NodeInfo{" + "constBackOffFactor=" + constBackOffFactor + ", host=" + host + + ", expireTimeMillis=" + expireTimeMillis + ", numSuccessfulTasks=" + numSuccessfulTasks + + ", numSuccessfulTasksAtLastBlacklist=" + numSuccessfulTasksAtLastBlacklist + + ", cumulativeBackoffFactor=" + cumulativeBackoffFactor + '}'; } } - @VisibleForTesting static class StatsPerDag { int numRequestedAllocations = 0; @@ -775,12 +784,13 @@ public String toString() { sb.append("NumRejectedTasks=").append(numRejectedTasks).append(", "); sb.append("NumCommFailures=").append(numCommFailures).append(", "); sb.append("NumDelayedAllocations=").append(numDelayedAllocations).append(", "); - sb.append("LocalityBasedAllocationsPerHost=").append(localityBasedNumAllocationsPerHost).append(", "); + sb.append("LocalityBasedAllocationsPerHost=").append(localityBasedNumAllocationsPerHost) + .append(", "); sb.append("NumAllocationsPerHost=").append(numAllocationsPerHost); return sb.toString(); } - void registerTaskRequest(String []requestedHosts, String[] requestedRacks) { + void registerTaskRequest(String[] requestedHosts, String[] requestedRacks) { numRequestedAllocations++; // TODO Change after HIVE-9987. For now, there's no rack matching. if (requestedHosts != null && requestedHosts.length != 0) { @@ -790,7 +800,8 @@ void registerTaskRequest(String []requestedHosts, String[] requestedRacks) { } } - void registerTaskAllocated(String[] requestedHosts, String [] requestedRacks, String allocatedHost) { + void registerTaskAllocated(String[] requestedHosts, String[] requestedRacks, + String allocatedHost) { // TODO Change after HIVE-9987. For now, there's no rack matching. if (requestedHosts != null && requestedHosts.length != 0) { Set requestedHostSet = new HashSet<>(Arrays.asList(requestedHosts)); @@ -837,12 +848,11 @@ private void _registerAllocationInHostMap(String host, Map 0 } if not exists(output): diff --git llap-server/src/main/resources/templates.py llap-server/src/main/resources/templates.py index 945754a..173aefb 100644 --- llap-server/src/main/resources/templates.py +++ llap-server/src/main/resources/templates.py @@ -78,7 +78,11 @@ "site.global.daemon_args": "%(daemon_args)s", "site.global.library_path": "%(hadoop_home)s/lib/native", "site.global.memory_val": "%(heap)d", - "site.global.pid_file": "${AGENT_WORK_ROOT}/app/run/llap-daemon.pid" + "site.global.pid_file": "${AGENT_WORK_ROOT}/app/run/llap-daemon.pid", + "internal.chaos.monkey.probability.amlaunchfailure": "0", + "internal.chaos.monkey.probability.containerfailure": "%(monkey_percentage)d", + "internal.chaos.monkey.interval.seconds": "%(monkey_interval)d", + "internal.chaos.monkey.enabled": "%(monkey_enabled)s" }, "components": { "slider-appmaster": { @@ -115,5 +119,4 @@ slider destroy %(name)s slider install-package --name LLAP --package $BASEDIR/llap-%(version)s.zip --replacepkg slider create %(name)s --resources $BASEDIR/resources.json --template $BASEDIR/appConfig.json -slider status %(name)s """ diff --git llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java index efd4c50..1319677 100644 --- llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java +++ llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java @@ -46,6 +46,7 @@ import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback; import org.junit.Test; import org.mockito.ArgumentCaptor; +import org.mortbay.log.Log; public class TestLlapTaskSchedulerService { @@ -96,7 +97,6 @@ public void testNodeDisabled() { try { Priority priority1 = Priority.newInstance(1); String[] hosts1 = new String[]{HOST1}; - Object task1 = new Object(); Object clientCookie1 = new Object(); tsWrapper.allocateTask(task1, hosts1, priority1, clientCookie1); @@ -111,11 +111,10 @@ public void testNodeDisabled() { // Verify that the node is blacklisted assertEquals(1, tsWrapper.ts.dagStats.numRejectedTasks); - assertEquals(2, tsWrapper.ts.activeHosts.size()); - assertEquals(2, tsWrapper.ts.activeHostList.length); + assertEquals(2, tsWrapper.ts.instanceToNodeMap.size()); LlapTaskSchedulerService.NodeInfo disabledNodeInfo = tsWrapper.ts.disabledNodes.peek(); assertNotNull(disabledNodeInfo); - assertEquals(HOST1, disabledNodeInfo.hostname); + assertEquals(HOST1, disabledNodeInfo.host.getHost()); assertEquals((10000l), disabledNodeInfo.getDelay(TimeUnit.NANOSECONDS)); assertEquals((10000l + 10000l), disabledNodeInfo.expireTimeMillis); @@ -164,8 +163,7 @@ public void testNodeReEnabled() throws InterruptedException { // Verify that the node is blacklisted assertEquals(3, tsWrapper.ts.dagStats.numRejectedTasks); - assertEquals(0, tsWrapper.ts.activeHosts.size()); - assertEquals(0, tsWrapper.ts.activeHostList.length); + assertEquals(0, tsWrapper.ts.instanceToNodeMap.size()); assertEquals(3, tsWrapper.ts.disabledNodes.size());