diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java index 4517375..32ba706 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java @@ -84,6 +84,9 @@ public LlapOptionsProcessor() { options.addOption(OptionBuilder.hasArg().withArgName("loglevel").withLongOpt("loglevel") .withDescription("log levels for the llap instance").create('l')); + + options.addOption(OptionBuilder.hasArg().withArgName("chaosmonkey").withLongOpt("chaosmonkey") + .withDescription("chaosmonkey interval").create('m')); // [-H|--help] options.addOption(new Option("H", "help", false, "Print help information")); @@ -101,7 +104,7 @@ public LlapOptions processOptions(String argv[]) throws ParseException { String directory = commandLine.getOptionValue("directory"); String name = commandLine.getOptionValue("name", null); - // loglevel & args are parsed by the python processor + // loglevel, chaosmonkey & args are parsed by the python processor return new LlapOptions(name, instances, directory); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstance.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstance.java new file mode 100644 index 0000000..eb2fead --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstance.java @@ -0,0 +1,72 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.daemon.registry; + +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.Resource; + +public interface ServiceInstance { + + /** + * Worker identity is a UUID (unique across restarts), to identify a node which died & was brought + * back on the same host/port + */ + public String getWorkerIdentity(); + + /** + * Hostname of the service instance + * + * @return + */ + public String getHost(); + + /** + * RPC Endpoint for service instance + * + * @return + */ + public int getRpcPort(); + + /** + * Shuffle Endpoint for service instance + * + * @return + */ + public int getShufflePort(); + + /** + * Return the last known state (without refreshing) + * + * @return + */ + + public boolean isAlive(); + + /** + * Config properties of the Service Instance (llap.daemon.*) + * @return + */ + + public Map getProperties(); + + /** + * Memory and Executors available for the LLAP tasks + * + * This does not include the size of the cache or the actual vCores allocated via Slider. + * + * @return + */ + public Resource getResource(); +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstanceSet.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstanceSet.java new file mode 100644 index 0000000..f459e89 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstanceSet.java @@ -0,0 +1,56 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.daemon.registry; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; + +public interface ServiceInstanceSet { + + /** + * Get an instance mapping which map worker identity to each instance. + * + * The worker identity does not collide between restarts, so each restart will have a unique id, + * while having the same host/ip pair. + * + * @return + */ + public Map getInstances(); + + /** + * Get an instance by worker identity. + * + * @param name + * @return + */ + public ServiceInstance getInstance(String name); + + /** + * Get a list of service instances for a given host. + * + * The list could include dead and alive instances. + * @param host + * @return + */ + public Set getByHost(String host); + + /** + * Refresh the instance set from registry backing store. + * + * @throws IOException + */ + public void refresh() throws IOException; + +} \ No newline at end of file diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceRegistry.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceRegistry.java new file mode 100644 index 0000000..008cc83 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceRegistry.java @@ -0,0 +1,55 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.daemon.registry; + +import java.io.IOException; + +/** + * ServiceRegistry interface for switching between fixed host and dynamic registry implementations. + */ +public interface ServiceRegistry { + + /** + * Start the service registry + * @throws InterruptedException + */ + public void start() throws InterruptedException; + + /** + * Stop the service registry + * @throws InterruptedException + */ + public void stop() throws InterruptedException; + + /** + * Register the current instance - the implementation takes care of the endpoints to register. + * @throws IOException + */ + public void register() throws IOException; + + /** + * Remove the current registration cleanly (implementation defined cleanup) + * @throws IOException + */ + public void unregister() throws IOException; + + /** + * Client API to get the list of instances registered via the current registry key. + * + * @param component + * @return + * @throws IOException + */ + public ServiceInstanceSet getInstances(String component) throws IOException; +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java new file mode 100644 index 0000000..35e882c --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java @@ -0,0 +1,184 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.daemon.registry.impl; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; +import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstance; +import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstanceSet; +import org.apache.hadoop.hive.llap.daemon.registry.ServiceRegistry; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.Resource; + +public class LlapFixedRegistryImpl implements ServiceRegistry { + private final int port; + private final int shuffle; + private final String[] hosts; + private final int memory; + private final int vcores; + + private final Map srv = new HashMap(); + + public LlapFixedRegistryImpl(String hosts, Configuration conf) { + this.hosts = hosts.split(","); + this.port = + conf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT, + LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT); + this.shuffle = + conf.getInt(LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT, + LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT); + + for (Map.Entry kv : conf) { + if (kv.getKey().startsWith(LlapConfiguration.LLAP_DAEMON_PREFIX) + || kv.getKey().startsWith("hive.llap.")) { + // TODO: read this somewhere useful, like the task scheduler + srv.put(kv.getKey(), kv.getValue()); + } + } + + this.memory = + conf.getInt(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, + LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT); + this.vcores = + conf.getInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, + LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT); + } + + @Override + public void start() throws InterruptedException { + // nothing to start + } + + @Override + public void stop() throws InterruptedException { + // nothing to stop + } + + @Override + public void register() throws IOException { + // nothing to register + } + + @Override + public void unregister() throws IOException { + // nothing to unregister + } + + public static String getWorkerIdentity(String host) { + // trigger clean errors for anyone who mixes up identity with hosts + return "host-" + host; + } + + private final class FixedServiceInstance implements ServiceInstance { + + private final String host; + + public FixedServiceInstance(String host) { + this.host = host; + } + + @Override + public String getWorkerIdentity() { + return LlapFixedRegistryImpl.getWorkerIdentity(host); + } + + @Override + public String getHost() { + return host; + } + + @Override + public int getRpcPort() { + // TODO: allow >1 port per host? + return LlapFixedRegistryImpl.this.port; + } + + @Override + public int getShufflePort() { + return LlapFixedRegistryImpl.this.shuffle; + } + + @Override + public boolean isAlive() { + return true; + } + + @Override + public Map getProperties() { + Map properties = new HashMap<>(srv); + // no worker identity + return properties; + } + + @Override + public Resource getResource() { + return Resource.newInstance(memory, vcores); + } + + } + + private final class FixedServiceInstanceSet implements ServiceInstanceSet { + + private final Map instances = new HashMap(); + + public FixedServiceInstanceSet() { + for (String host : hosts) { + // trigger bugs in anyone who uses this as a hostname + instances.put(getWorkerIdentity(host), new FixedServiceInstance(host)); + } + } + + @Override + public Map getInstances() { + return instances; + } + + @Override + public ServiceInstance getInstance(String name) { + return instances.get(name); + } + + @Override + public Set getByHost(String host) { + Set byHost = new HashSet(); + ServiceInstance inst = getInstance(getWorkerIdentity(host)); + if (inst != null) { + byHost.add(inst); + } + return byHost; + } + + @Override + public void refresh() throws IOException { + // I will do no such thing + } + + } + + @Override + public ServiceInstanceSet getInstances(String component) throws IOException { + return new FixedServiceInstanceSet(); + } + + @Override + public String toString() { + return String.format("FixedRegistry hosts=%s", StringUtils.join(",", this.hosts)); + } +} \ No newline at end of file diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java index 67596fe..79042c5 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java @@ -1,16 +1,29 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.hive.llap.daemon.registry.impl; import java.io.IOException; -import java.net.Inet4Address; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.Map; +import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; -import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemon; -import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstanceSet; +import org.apache.hadoop.hive.llap.daemon.registry.ServiceRegistry; import org.apache.hadoop.registry.client.api.RegistryOperationsFactory; import org.apache.hadoop.registry.client.binding.RegistryPathUtils; import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; @@ -30,25 +43,8 @@ public class LlapRegistryService extends AbstractService { private static final Logger LOG = Logger.getLogger(LlapRegistryService.class); - - public final static String SERVICE_CLASS = "org-apache-hive"; - - private RegistryOperationsService client; - private String instanceName; - private Configuration conf; - private ServiceRecordMarshal encoder; - - private static final String hostname; - static { - String localhost = "localhost"; - try { - localhost = InetAddress.getLocalHost().getCanonicalHostName(); - } catch (UnknownHostException uhe) { - // ignore - } - hostname = localhost; - } + private ServiceRegistry registry = null; public LlapRegistryService() { super("LlapRegistryService"); @@ -56,92 +52,48 @@ public LlapRegistryService() { @Override public void serviceInit(Configuration conf) { - String registryId = conf.getTrimmed(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS); - if (registryId.startsWith("@")) { - LOG.info("Llap Registry is enabled with registryid: " + registryId); - this.conf = new Configuration(conf); - conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); - // registry reference - instanceName = registryId.substring(1); - client = (RegistryOperationsService) RegistryOperationsFactory.createInstance(conf); - encoder = new RegistryUtils.ServiceRecordMarshal(); - + String hosts = conf.getTrimmed(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS); + if (hosts.startsWith("@")) { + registry = initRegistry(hosts.substring(1), conf); } else { - LOG.info("Llap Registry is disabled"); + registry = new LlapFixedRegistryImpl(hosts, conf); } + LOG.info("Using LLAP registry type " + registry); + } + + private ServiceRegistry initRegistry(String instanceName, Configuration conf) { + return new LlapYarnRegistryImpl(instanceName, conf); } @Override public void serviceStart() throws Exception { - if (client != null) { - client.start(); + if (this.registry != null) { + this.registry.start(); } } + @Override public void serviceStop() throws Exception { - if (client != null) { - client.stop(); + if (this.registry != null) { + this.registry.start(); + } else { + LOG.warn("Stopping non-existent registry service"); } } - public Endpoint getRpcEndpoint() { - final int rpcPort = - conf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT, - LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT); - - return RegistryTypeUtils.ipcEndpoint("llap", new InetSocketAddress(hostname, rpcPort)); - } - - public Endpoint getShuffleEndpoint() { - final int shufflePort = - conf.getInt(LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT, - LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT); - // HTTP today, but might not be - return RegistryTypeUtils.inetAddrEndpoint("shuffle", ProtocolTypes.PROTOCOL_TCP, hostname, - shufflePort); - } - - private final String getPath() { - return RegistryPathUtils.join(RegistryUtils.componentPath(RegistryUtils.currentUser(), - SERVICE_CLASS, instanceName, "workers"), "worker-"); - } - public void registerWorker() throws IOException { - if (this.client != null) { - String path = getPath(); - ServiceRecord srv = new ServiceRecord(); - srv.addInternalEndpoint(getRpcEndpoint()); - srv.addInternalEndpoint(getShuffleEndpoint()); - - for (Map.Entry kv : this.conf) { - if (kv.getKey().startsWith(LlapConfiguration.LLAP_DAEMON_PREFIX)) { - // TODO: read this somewhere useful, like the allocator - srv.set(kv.getKey(), kv.getValue()); - } - } - - client.mknode(RegistryPathUtils.parentOf(path), true); - - // FIXME: YARN registry needs to expose Ephemeral_Seq nodes & return the paths - client.zkCreate(path, CreateMode.EPHEMERAL_SEQUENTIAL, encoder.toBytes(srv), - client.getClientAcls()); + if (this.registry != null) { + this.registry.register(); } } - public void unregisterWorker() { - if (this.client != null) { - // with ephemeral nodes, there's nothing to do here - // because the create didn't return paths + public void unregisterWorker() throws IOException { + if (this.registry != null) { + this.registry.unregister(); } } - public Map getWorkers() throws IOException { - if (this.client != null) { - String path = getPath(); - return RegistryUtils.listServiceRecords(client, RegistryPathUtils.parentOf(path)); - } else { - Preconditions.checkNotNull(this.client, "Yarn registry client is not intialized"); - return null; - } + public ServiceInstanceSet getInstances() throws IOException { + return this.registry.getInstances("LLAP"); } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapYarnRegistryImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapYarnRegistryImpl.java new file mode 100644 index 0000000..297b9aa --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapYarnRegistryImpl.java @@ -0,0 +1,291 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.daemon.registry.impl; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.PathNotFoundException; +import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; +import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstance; +import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstanceSet; +import org.apache.hadoop.hive.llap.daemon.registry.ServiceRegistry; +import org.apache.hadoop.registry.client.api.RegistryOperationsFactory; +import org.apache.hadoop.registry.client.binding.RegistryPathUtils; +import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; +import org.apache.hadoop.registry.client.binding.RegistryUtils; +import org.apache.hadoop.registry.client.binding.RegistryUtils.ServiceRecordMarshal; +import org.apache.hadoop.registry.client.exceptions.InvalidRecordException; +import org.apache.hadoop.registry.client.impl.zk.RegistryOperationsService; +import org.apache.hadoop.registry.client.types.AddressTypes; +import org.apache.hadoop.registry.client.types.Endpoint; +import org.apache.hadoop.registry.client.types.ProtocolTypes; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.log4j.Logger; +import org.apache.zookeeper.CreateMode; + +import com.google.common.base.Preconditions; + +public class LlapYarnRegistryImpl implements ServiceRegistry { + + private static final Logger LOG = Logger.getLogger(LlapYarnRegistryImpl.class); + + private RegistryOperationsService client; + private String instanceName; + private Configuration conf; + private ServiceRecordMarshal encoder; + + private static final UUID uniq = UUID.randomUUID(); + private static final String hostname; + + private static final String UNIQUE_IDENTIFIER = "llap.unique.id"; + + private final static String SERVICE_CLASS = "org-apache-hive"; + + final ScheduledExecutorService refresher = Executors.newScheduledThreadPool(1); + + static { + String localhost = "localhost"; + try { + localhost = InetAddress.getLocalHost().getCanonicalHostName(); + } catch (UnknownHostException uhe) { + // ignore + } + hostname = localhost; + } + + public LlapYarnRegistryImpl(String instanceName, Configuration conf) { + + LOG.info("Llap Registry is enabled with registryid: " + instanceName); + this.conf = new Configuration(conf); + this.instanceName = instanceName; + conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); + // registry reference + client = (RegistryOperationsService) RegistryOperationsFactory.createInstance(conf); + encoder = new RegistryUtils.ServiceRecordMarshal(); + + } + + public Endpoint getRpcEndpoint() { + final int rpcPort = + conf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT, + LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT); + return RegistryTypeUtils.ipcEndpoint("llap", new InetSocketAddress(hostname, rpcPort)); + } + + public Endpoint getShuffleEndpoint() { + final int shufflePort = + conf.getInt(LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT, + LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT); + // HTTP today, but might not be + return RegistryTypeUtils.inetAddrEndpoint("shuffle", ProtocolTypes.PROTOCOL_TCP, hostname, + shufflePort); + } + + private final String getPath() { + return RegistryPathUtils.join(RegistryUtils.componentPath(RegistryUtils.currentUser(), + SERVICE_CLASS, instanceName, "workers"), "worker-"); + } + + @Override + public void register() throws IOException { + String path = getPath(); + ServiceRecord srv = new ServiceRecord(); + srv.addInternalEndpoint(getRpcEndpoint()); + srv.addInternalEndpoint(getShuffleEndpoint()); + + for (Map.Entry kv : this.conf) { + if (kv.getKey().startsWith(LlapConfiguration.LLAP_DAEMON_PREFIX) + || kv.getKey().startsWith("hive.llap.")) { + // TODO: read this somewhere useful, like the task scheduler + srv.set(kv.getKey(), kv.getValue()); + } + } + + // restart sensitive instance id + srv.set(UNIQUE_IDENTIFIER, uniq.toString()); + + client.mknode(RegistryPathUtils.parentOf(path), true); + + // FIXME: YARN registry needs to expose Ephemeral_Seq nodes & return the paths + client.zkCreate(path, CreateMode.EPHEMERAL_SEQUENTIAL, encoder.toBytes(srv), + client.getClientAcls()); + } + + @Override + public void unregister() throws IOException { + // Nothing for the zkCreate models + } + + private class DynamicServiceInstance implements ServiceInstance { + private final ServiceRecord srv; + private boolean alive = true; + private final String host; + private final int rpcPort; + private final int shufflePort; + + public DynamicServiceInstance(ServiceRecord srv) throws IOException { + this.srv = srv; + + final Endpoint shuffle = srv.getInternalEndpoint("shuffle"); + final Endpoint rpc = srv.getInternalEndpoint("llap"); + + this.host = + RegistryTypeUtils.getAddressField(rpc.addresses.get(0), + AddressTypes.ADDRESS_HOSTNAME_FIELD); + this.rpcPort = + Integer.valueOf(RegistryTypeUtils.getAddressField(rpc.addresses.get(0), + AddressTypes.ADDRESS_PORT_FIELD)); + this.shufflePort = + Integer.valueOf(RegistryTypeUtils.getAddressField(shuffle.addresses.get(0), + AddressTypes.ADDRESS_PORT_FIELD)); + } + + @Override + public String getWorkerIdentity() { + return srv.get(UNIQUE_IDENTIFIER); + } + + @Override + public String getHost() { + return host; + } + + @Override + public int getRpcPort() { + return rpcPort; + } + + @Override + public int getShufflePort() { + return shufflePort; + } + + @Override + public boolean isAlive() { + return alive ; + } + + public void kill() { + this.alive = false; + } + + @Override + public Map getProperties() { + return srv.attributes(); + } + + @Override + public Resource getResource() { + int memory = Integer.valueOf(srv.get(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB)); + int vCores = Integer.valueOf(srv.get(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS)); + return Resource.newInstance(memory, vCores); + } + } + + private class DynamicServiceInstanceSet implements ServiceInstanceSet { + + Map instances; + + @Override + public Map getInstances() { + return instances; + } + + @Override + public ServiceInstance getInstance(String name) { + return instances.get(name); + } + + @Override + public synchronized void refresh() throws IOException { + /* call this from wherever */ + Map freshInstances = new HashMap(); + + String path = getPath(); + Map records = + RegistryUtils.listServiceRecords(client, RegistryPathUtils.parentOf(path)); + + for (ServiceRecord rec : records.values()) { + ServiceInstance instance = new DynamicServiceInstance(rec); + freshInstances.put(instance.getWorkerIdentity(), instance); + } + + if (instances != null) { + Set oldKeys = instances.keySet(); + Set newKeys = freshInstances.keySet(); + if (oldKeys.removeAll(newKeys)) { + for (String k : oldKeys) { + // this is so that people can hold onto ServiceInstance references as placeholders for tasks + ((DynamicServiceInstance) instances.get(k)).kill(); + } + } + } + instances = freshInstances; + } + + @Override + public Set getByHost(String host) { + Set byHost = new HashSet(); + for (ServiceInstance i : instances.values()) { + if (host.equals(i.getHost())) { + // all hosts in instances should be alive in this impl + byHost.add(i); + } + } + return byHost; + } + } + + final DynamicServiceInstanceSet instances = new DynamicServiceInstanceSet(); + + @Override + public ServiceInstanceSet getInstances(String component) throws IOException { + Preconditions.checkArgument("LLAP".equals(component)); // right now there is only 1 component + if (this.client != null) { + instances.refresh(); + return instances; + } else { + Preconditions.checkNotNull(this.client, "Yarn registry client is not intialized"); + return null; + } + } + + @Override + public void start() { + if (client != null) { + client.start(); + } + } + + @Override + public void stop() { + if (client != null) { + client.stop(); + } + } +} diff --git llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java index a50b159..bee508e 100644 --- llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java +++ llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java @@ -15,16 +15,12 @@ package org.apache.tez.dag.app.rm; import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.UnknownHostException; import java.util.Arrays; import java.util.Comparator; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -42,24 +38,16 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.primitives.Ints; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; -import org.apache.hadoop.registry.client.types.AddressTypes; -import org.apache.hadoop.registry.client.types.Endpoint; -import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; +import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstance; +import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstanceSet; +import org.apache.hadoop.hive.llap.daemon.registry.impl.LlapRegistryService; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -67,12 +55,16 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; -import org.apache.hadoop.hive.llap.daemon.registry.impl.LlapRegistryService; import org.apache.hadoop.yarn.util.Clock; import org.apache.tez.dag.api.TaskAttemptEndReason; import org.apache.tez.dag.app.AppContext; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; public class LlapTaskSchedulerService extends TaskSchedulerService { @@ -83,19 +75,16 @@ private final ExecutorService appCallbackExecutor; private final TaskSchedulerAppCallback appClientDelegate; - // Set of active hosts - @VisibleForTesting - final LinkedHashMap activeHosts = new LinkedHashMap<>(); - // Populated each time activeHosts is modified - @VisibleForTesting - String []activeHostList; + // interface into the registry service + private ServiceInstanceSet activeInstances; - // Set of all hosts in the system. @VisibleForTesting - final ConcurrentMap allHosts = new ConcurrentHashMap<>(); + final Map instanceToNodeMap = new HashMap<>(); + + @VisibleForTesting // Tracks currently allocated containers. - private final Map containerToHostMap = new HashMap<>(); + final Map containerToInstanceMap = new HashMap<>(); // Tracks tasks which could not be allocated immediately. @VisibleForTesting @@ -116,7 +105,6 @@ public int compare(Priority o1, Priority o2) { private final ContainerFactory containerFactory; private final Random random = new Random(); - private final int containerPort; private final Clock clock; private final ListeningExecutorService executor; private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -144,7 +132,6 @@ public int compare(Priority o1, Priority o2) { // Per Executor Thread private final Resource resourcePerExecutor; - private final boolean initFromRegistry; private final LlapRegistryService registry = new LlapRegistryService(); private final PendingTaskSchedulerCallable pendingTaskSchedulerCallable = new PendingTaskSchedulerCallable(); private ListenableFuture pendingTaskSchedulerFuture; @@ -156,10 +143,6 @@ public int compare(Priority o1, Priority o2) { @VisibleForTesting StatsPerDag dagStats = new StatsPerDag(); - - - - public LlapTaskSchedulerService(TaskSchedulerAppCallback appClient, AppContext appContext, String clientHostname, int clientPort, String trackingUrl, long customAppIdIdentifier, @@ -192,118 +175,39 @@ public LlapTaskSchedulerService(TaskSchedulerAppCallback appClient, AppContext a Preconditions.checkNotNull(instanceId, LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS + " must be defined"); - if (!instanceId.startsWith("@")) { // Manual setup. Not via the service registry - initFromRegistry = false; - String[] hosts = conf.getTrimmedStrings(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS); - Preconditions.checkState(hosts != null && hosts.length != 0, - LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS + "must be defined"); - for (String host : hosts) { - // If reading addresses from conf, try resolving local addresses so that - // this matches with the address reported by daemons. - InetAddress inetAddress = null; - try { - inetAddress = InetAddress.getByName(host); - if (NetUtils.isLocalAddress(inetAddress)) { - InetSocketAddress socketAddress = new InetSocketAddress(0); - socketAddress = NetUtils.getConnectAddress(socketAddress); - LOG.info("Adding host identified as local: " + host + " as " + socketAddress.getHostName()); - host = socketAddress.getHostName(); - } - } catch (UnknownHostException e) { - LOG.warn("Ignoring resolution issues for host: " + host, e); - } - NodeInfo nodeInfo = new NodeInfo(host, BACKOFF_FACTOR, clock); - activeHosts.put(host, nodeInfo); - allHosts.put(host, nodeInfo); - } - activeHostList = activeHosts.keySet().toArray(new String[activeHosts.size()]); - } else { - initFromRegistry = true; - } - - this.containerPort = conf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT, - LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT); ExecutorService executorService = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapScheduler").build()); executor = MoreExecutors.listeningDecorator(executorService); - if (activeHosts.size() > 0) { - LOG.info("Running with configuration: " + + LOG.info("Running with configuration: " + "memoryPerInstance=" + memoryPerInstance + ", vCoresPerInstance=" + coresPerInstance + ", executorsPerInstance=" + executorsPerInstance + ", resourcePerInstanceInferred=" + resourcePerExecutor + - ", hosts=" + allHosts.keySet() + - ", rpcPort=" + containerPort + ", nodeReEnableTimeout=" + nodeReEnableTimeout + ", nodeReEnableBackOffFactor=" + BACKOFF_FACTOR); - } else { - LOG.info("Running with configuration: " + - "memoryPerInstance=" + memoryPerInstance + - ", vCoresPerInstance=" + coresPerInstance + - ", executorsPerInstance=" + executorsPerInstance + - ", resourcePerInstanceInferred=" + resourcePerExecutor + - ", hosts=" + - ", rpcPort=" + - ", nodeReEnableTimeout=" + nodeReEnableTimeout + - ", nodeReEnableBackOffFactor=" + BACKOFF_FACTOR); - } - } @Override public void serviceInit(Configuration conf) { - if (initFromRegistry) { - registry.init(conf); - } + registry.init(conf); } @Override public void serviceStart() throws IOException { - writeLock.lock(); try { pendingTaskSchedulerFuture = executor.submit(pendingTaskSchedulerCallable); - if (initFromRegistry) { - registry.start(); - if (activeHosts.size() > 0) { - return; - } - LOG.info("Reading YARN registry for service records"); - - Map workers = registry.getWorkers(); - for (ServiceRecord srv : workers.values()) { - Endpoint rpc = srv.getInternalEndpoint("llap"); - if (rpc != null) { - LOG.info("Examining endpoint: " + rpc); - final String host = - RegistryTypeUtils.getAddressField(rpc.addresses.get(0), - AddressTypes.ADDRESS_HOSTNAME_FIELD); - NodeInfo nodeInfo = new NodeInfo(host, BACKOFF_FACTOR, clock); - activeHosts.put(host, nodeInfo); - allHosts.put(host, nodeInfo); - } else { - - LOG.info("The SRV record was " + srv); - } - } - activeHostList = activeHosts.keySet().toArray(new String[activeHosts.size()]); - - - - LOG.info("Re-inited with configuration: " + - "memoryPerInstance=" + memoryPerInstance + - ", vCoresPerInstance=" + coresPerInstance + - ", executorsPerInstance=" + executorsPerInstance + - ", resourcePerInstanceInferred=" + resourcePerExecutor + - ", hosts=" + allHosts.keySet()); + registry.start(); + activeInstances = registry.getInstances(); + for (ServiceInstance inst : activeInstances.getInstances().values()) { + instanceToNodeMap.put(inst, new NodeInfo(inst, BACKOFF_FACTOR, clock)); } } finally { writeLock.unlock(); } - } @Override @@ -316,7 +220,7 @@ public void serviceStop() { pendingTaskSchedulerFuture.cancel(true); } executor.shutdownNow(); - if (initFromRegistry) { + if (registry != null) { registry.stop(); } appCallbackExecutor.shutdownNow(); @@ -325,20 +229,69 @@ public void serviceStop() { writeLock.unlock(); } } + + + @Override + public Resource getTotalResources() { + int memory = 0; + int vcores = 0; + readLock.lock(); + try { + for (ServiceInstance inst : activeInstances.getInstances().values()) { + if (inst.isAlive()) { + Resource r = inst.getResource(); + memory += r.getMemory(); + vcores += r.getVirtualCores(); + } + } + } finally { + readLock.unlock(); + } + + return Resource.newInstance(memory, vcores); + } + + /** + * The difference between this and getTotalResources() is that this only gives + * currently free resource instances, while the other lists all the instances + * that may become available in a while. + */ @Override public Resource getAvailableResources() { - // TODO This needs information about all running executors, and the amount of memory etc available across the cluster. - // No lock required until this moves to using something other than allHosts - return Resource - .newInstance(Ints.checkedCast(allHosts.size() * memoryPerInstance), - allHosts.size() * coresPerInstance); + // need a state store eventually for current state & measure backoffs + int memory = 0; + int vcores = 0; + readLock.lock(); + try { + for (ServiceInstance inst : instanceToNodeMap.keySet()) { + if (inst.isAlive()) { + Resource r = inst.getResource(); + memory += r.getMemory(); + vcores += r.getVirtualCores(); + } + } + } finally { + readLock.unlock(); + } + + return Resource.newInstance(memory, vcores); } @Override public int getClusterNodeCount() { - // No lock required until this moves to using something other than allHosts - return allHosts.size(); + readLock.lock(); + try { + int n = 0; + for (ServiceInstance inst : activeInstances.getInstances().values()) { + if (inst.isAlive()) { + n++; + } + } + return n; + } finally { + readLock.unlock(); + } } @Override @@ -350,14 +303,6 @@ public void resetMatchLocalityForAllHeldContainers() { } @Override - public Resource getTotalResources() { - // No lock required until this moves to using something other than allHosts - return Resource - .newInstance(Ints.checkedCast(allHosts.size() * memoryPerInstance), - allHosts.size() * coresPerInstance); - } - - @Override public void blacklistNode(NodeId nodeId) { LOG.info("DEBUG: BlacklistNode not supported"); } @@ -428,14 +373,14 @@ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEnd } return false; } - String hostForContainer = containerToHostMap.remove(taskInfo.containerId); + String hostForContainer = containerToInstanceMap.remove(taskInfo.containerId); assert hostForContainer != null; - String assignedHost = taskInfo.assignedHost; - assert assignedHost != null; + ServiceInstance assignedInstance = taskInfo.assignedInstance; + assert assignedInstance != null; if (taskSucceeded) { // The node may have been blacklisted at this point - which means it may not be in the activeNodeList. - NodeInfo nodeInfo = allHosts.get(assignedHost); + NodeInfo nodeInfo = instanceToNodeMap.get(assignedInstance); assert nodeInfo != null; nodeInfo.registerTaskSuccess(); // TODO Consider un-blacklisting the node since at least 1 slot should have become available on the node. @@ -443,11 +388,11 @@ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEnd .of(TaskAttemptEndReason.SERVICE_BUSY, TaskAttemptEndReason.COMMUNICATION_ERROR) .contains(endReason)) { if (endReason == TaskAttemptEndReason.COMMUNICATION_ERROR) { - dagStats.registerCommFailure(taskInfo.assignedHost); + dagStats.registerCommFailure(taskInfo.assignedInstance.getHost()); } else if (endReason == TaskAttemptEndReason.SERVICE_BUSY) { - dagStats.registerTaskRejected(taskInfo.assignedHost); + dagStats.registerTaskRejected(taskInfo.assignedInstance.getHost()); } - disableNode(assignedHost); + disableInstance(assignedInstance, endReason == TaskAttemptEndReason.SERVICE_BUSY); } } finally { writeLock.unlock(); @@ -489,71 +434,98 @@ TaskSchedulerAppCallback createAppCallbackDelegate( * @param requestedHosts the list of preferred hosts. null implies any host * @return */ - private String selectHost(String[] requestedHosts) { - // TODO Change this to work off of what we think is remaining capacity for a host - + private ServiceInstance selectHost(TaskInfo request) { + String[] requestedHosts = request.requestedHosts; readLock.lock(); // Read-lock. Not updating any stats at the moment. try { - // Check if any hosts are active. If there's any active host, an allocation will happen. - if (activeHosts.size() == 0) { + // Check if any hosts are active. + if (getAvailableResources().getMemory() <= 0) { + refreshInstances(); + } + + // If there's no memory available, fail + if (getTotalResources().getMemory() <=0) { return null; } - String host = null; - if (requestedHosts != null && requestedHosts.length > 0) { + if (requestedHosts != null) { + for (String host : requestedHosts) { // Pick the first host always. Weak attempt at cache affinity. - host = requestedHosts[0]; - if (activeHosts.get(host) != null) { - LOG.info("Selected host: " + host + " from requested hosts: " + - Arrays.toString(requestedHosts)); - } else { - LOG.info("Preferred host: " + host + " not present. Attempting to select another one"); - host = null; - for (String h : requestedHosts) { - if (activeHosts.get(h) != null) { - host = h; - break; + Set instances = activeInstances.getByHost(host); + if (!instances.isEmpty()) { + for (ServiceInstance inst : instances) { + if (inst.isAlive() && instanceToNodeMap.containsKey(inst)) { + // only allocate from the "available" list + // TODO Change this to work off of what we think is remaining capacity for an instance + return inst; } } - if (host == null) { - host = activeHostList[random.nextInt(activeHostList.length)]; - LOG.info("Requested hosts: " + Arrays.toString(requestedHosts) + - " not present. Randomizing the host"); - } } - } else { - host = activeHostList[random.nextInt(activeHostList.length)]; - LOG.info("Selected random host: " + host + " since the request contained no host information"); + }} + /* fall through - miss in locality (random scheduling) */ + + // only allocate from the "available" list + ServiceInstance[] all = instanceToNodeMap.keySet().toArray(new ServiceInstance[0]); + + if (all.length == 0) { + return null; + } + + int n = random.nextInt(all.length); + // start at random offset and iterate whole list + for (int i = 0; i < all.length ; i++) { + ServiceInstance inst = all[(i+n) % all.length]; + if (inst.isAlive()) { + return inst; + } } - return host; } finally { readLock.unlock(); } + /* no matches */ + return null; } + private void refreshInstances() { + try { + activeInstances.refresh(); // handles its own sync + } catch(IOException ioe) { + LOG.warn("Could not refresh list of active instances", ioe); + } + } + private void reenableDisabledNode(NodeInfo nodeInfo) { writeLock.lock(); try { - nodeInfo.enableNode(); - activeHosts.put(nodeInfo.hostname, nodeInfo); - activeHostList = activeHosts.keySet().toArray(new String[activeHosts.size()]); + if (!nodeInfo.isBusy()) { + refreshInstances(); + } + if (nodeInfo.host.isAlive()) { + nodeInfo.enableNode(); + instanceToNodeMap.put(nodeInfo.host, nodeInfo); + } else { + if (LOG.isInfoEnabled()) { + LOG.info("Removing dead node " + nodeInfo); + } + } } finally { writeLock.unlock(); } } - private void disableNode(String hostname) { + private void disableInstance(ServiceInstance instance, boolean busy) { writeLock.lock(); try { - NodeInfo nodeInfo = activeHosts.remove(hostname); + NodeInfo nodeInfo = instanceToNodeMap.remove(instance); if (nodeInfo == null) { - LOG.debug("Node: " + hostname + " already disabled, or invalid. Not doing anything."); + LOG.debug("Node: " + instance + " already disabled, or invalid. Not doing anything."); } else { nodeInfo.disableNode(nodeReEnableTimeout); + nodeInfo.setBusy(busy); // daemon failure vs daemon busy + // TODO: handle task to container map events in case of hard failures disabledNodes.add(nodeInfo); } - activeHostList = activeHosts.keySet().toArray(new String[activeHosts.size()]); } finally { writeLock.unlock(); } @@ -625,18 +597,18 @@ private void schedulePendingTasks() { } private boolean scheduleTask(TaskInfo taskInfo) { - String host = selectHost(taskInfo.requestedHosts); + ServiceInstance host = selectHost(taskInfo); if (host == null) { return false; } else { Container container = - containerFactory.createContainer(resourcePerExecutor, taskInfo.priority, host, containerPort); + containerFactory.createContainer(resourcePerExecutor, taskInfo.priority, host.getHost(), host.getRpcPort()); writeLock.lock(); // While updating local structures try { - dagStats.registerTaskAllocated(taskInfo.requestedHosts, taskInfo.requestedRacks, host); + dagStats.registerTaskAllocated(taskInfo.requestedHosts, taskInfo.requestedRacks, host.getHost()); taskInfo.setAssignmentInfo(host, container.getId()); knownTasks.putIfAbsent(taskInfo.task, taskInfo); - containerToHostMap.put(container.getId(), host); + containerToInstanceMap.put(container.getId(), host.getWorkerIdentity()); } finally { writeLock.unlock(); } @@ -683,16 +655,17 @@ public void shutdown() { @VisibleForTesting static class NodeInfo implements Delayed { private final float constBackOffFactor; - final String hostname; + final ServiceInstance host; private final Clock clock; long expireTimeMillis = -1; private long numSuccessfulTasks = 0; private long numSuccessfulTasksAtLastBlacklist = -1; float cumulativeBackoffFactor = 1.0f; + private boolean busy; - NodeInfo(String hostname, float backoffFactor, Clock clock) { - this.hostname = hostname; + NodeInfo(ServiceInstance host, float backoffFactor, Clock clock) { + this.host = host; constBackOffFactor = backoffFactor; this.clock = clock; } @@ -716,9 +689,18 @@ void disableNode(long duration) { } void registerTaskSuccess() { + this.busy = false; // if a task exited, we might have free slots numSuccessfulTasks++; } + public void setBusy(boolean busy) { + this.busy = busy; + } + + public boolean isBusy() { + return busy; + } + @Override public long getDelay(TimeUnit unit) { return expireTimeMillis - clock.getTime(); @@ -740,7 +722,7 @@ public int compareTo(Delayed o) { public String toString() { return "NodeInfo{" + "constBackOffFactor=" + constBackOffFactor + - ", hostname='" + hostname + '\'' + + ", host=" + host + ", expireTimeMillis=" + expireTimeMillis + ", numSuccessfulTasks=" + numSuccessfulTasks + ", numSuccessfulTasksAtLastBlacklist=" + numSuccessfulTasksAtLastBlacklist + @@ -837,7 +819,7 @@ private void _registerAllocationInHostMap(String host, Map 0 } if not exists(output): diff --git llap-server/src/main/resources/templates.py llap-server/src/main/resources/templates.py index 945754a..173aefb 100644 --- llap-server/src/main/resources/templates.py +++ llap-server/src/main/resources/templates.py @@ -78,7 +78,11 @@ "site.global.daemon_args": "%(daemon_args)s", "site.global.library_path": "%(hadoop_home)s/lib/native", "site.global.memory_val": "%(heap)d", - "site.global.pid_file": "${AGENT_WORK_ROOT}/app/run/llap-daemon.pid" + "site.global.pid_file": "${AGENT_WORK_ROOT}/app/run/llap-daemon.pid", + "internal.chaos.monkey.probability.amlaunchfailure": "0", + "internal.chaos.monkey.probability.containerfailure": "%(monkey_percentage)d", + "internal.chaos.monkey.interval.seconds": "%(monkey_interval)d", + "internal.chaos.monkey.enabled": "%(monkey_enabled)s" }, "components": { "slider-appmaster": { @@ -115,5 +119,4 @@ slider destroy %(name)s slider install-package --name LLAP --package $BASEDIR/llap-%(version)s.zip --replacepkg slider create %(name)s --resources $BASEDIR/resources.json --template $BASEDIR/appConfig.json -slider status %(name)s """ diff --git llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java index efd4c50..1319677 100644 --- llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java +++ llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java @@ -46,6 +46,7 @@ import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback; import org.junit.Test; import org.mockito.ArgumentCaptor; +import org.mortbay.log.Log; public class TestLlapTaskSchedulerService { @@ -96,7 +97,6 @@ public void testNodeDisabled() { try { Priority priority1 = Priority.newInstance(1); String[] hosts1 = new String[]{HOST1}; - Object task1 = new Object(); Object clientCookie1 = new Object(); tsWrapper.allocateTask(task1, hosts1, priority1, clientCookie1); @@ -111,11 +111,10 @@ public void testNodeDisabled() { // Verify that the node is blacklisted assertEquals(1, tsWrapper.ts.dagStats.numRejectedTasks); - assertEquals(2, tsWrapper.ts.activeHosts.size()); - assertEquals(2, tsWrapper.ts.activeHostList.length); + assertEquals(2, tsWrapper.ts.instanceToNodeMap.size()); LlapTaskSchedulerService.NodeInfo disabledNodeInfo = tsWrapper.ts.disabledNodes.peek(); assertNotNull(disabledNodeInfo); - assertEquals(HOST1, disabledNodeInfo.hostname); + assertEquals(HOST1, disabledNodeInfo.host.getHost()); assertEquals((10000l), disabledNodeInfo.getDelay(TimeUnit.NANOSECONDS)); assertEquals((10000l + 10000l), disabledNodeInfo.expireTimeMillis); @@ -164,8 +163,7 @@ public void testNodeReEnabled() throws InterruptedException { // Verify that the node is blacklisted assertEquals(3, tsWrapper.ts.dagStats.numRejectedTasks); - assertEquals(0, tsWrapper.ts.activeHosts.size()); - assertEquals(0, tsWrapper.ts.activeHostList.length); + assertEquals(0, tsWrapper.ts.instanceToNodeMap.size()); assertEquals(3, tsWrapper.ts.disabledNodes.size());