diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstance.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstance.java new file mode 100644 index 0000000..7d610a8 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstance.java @@ -0,0 +1,25 @@ +package org.apache.hadoop.hive.llap.daemon.registry; + +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.Resource; + +public interface ServiceInstance { + + /* is a UUID for LLAP registry installs, but host:port for the host installs */ + + public String getWorkerIdentity() ; + + public String getHost() ; + + public int getRpcPort() ; + + public int getShufflePort() ; + + public boolean isAlive(); + + public Map getProperties() ; + + public Resource getResource(); +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstanceSet.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstanceSet.java new file mode 100644 index 0000000..47aec16 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstanceSet.java @@ -0,0 +1,17 @@ +package org.apache.hadoop.hive.llap.daemon.registry; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; + +public interface ServiceInstanceSet { + + public Map getInstances(); + + public ServiceInstance getInstance(String name); + + public Set getByHost(String host); + + public void refresh() throws IOException; + +} \ No newline at end of file diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceRegistry.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceRegistry.java new file mode 100644 index 0000000..884973e --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceRegistry.java @@ -0,0 +1,16 @@ +package org.apache.hadoop.hive.llap.daemon.registry; + +import java.io.IOException; + +public interface ServiceRegistry { + + public void start() throws InterruptedException; + + public void stop() throws InterruptedException; + + public void register() throws IOException; + + public void unregister() throws IOException; + + public ServiceInstanceSet getInstances(String component) throws IOException; +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java new file mode 100644 index 0000000..51c0c27 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java @@ -0,0 +1,171 @@ +package org.apache.hadoop.hive.llap.daemon.registry.impl; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; +import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstance; +import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstanceSet; +import org.apache.hadoop.hive.llap.daemon.registry.ServiceRegistry; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.Resource; + +public class LlapFixedRegistryImpl implements ServiceRegistry { + private final int port; + private final int shuffle; + private final String[] hosts; + private final int memory; + private final int vcores; + + private final Map srv = new HashMap(); + + public LlapFixedRegistryImpl(String hosts, Configuration conf) { + this.hosts = hosts.split(","); + this.port = + conf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT, + LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT); + this.shuffle = + conf.getInt(LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT, + LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT); + + for (Map.Entry kv : conf) { + if (kv.getKey().startsWith(LlapConfiguration.LLAP_DAEMON_PREFIX) + || kv.getKey().startsWith("hive.llap.")) { + // TODO: read this somewhere useful, like the task scheduler + srv.put(kv.getKey(), kv.getValue()); + } + } + + this.memory = + conf.getInt(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, + LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT); + this.vcores = + conf.getInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, + LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT); + } + + @Override + public void start() throws InterruptedException { + // nothing to start + } + + @Override + public void stop() throws InterruptedException { + // nothing to stop + } + + @Override + public void register() throws IOException { + // nothing to register + } + + @Override + public void unregister() throws IOException { + // nothing to unregister + } + + public static String getWorkerIdentity(String host) { + // trigger clean errors for anyone who mixes up identity with hosts + return "host-" + host; + } + + private final class FixedServiceInstance implements ServiceInstance { + + private final String host; + + public FixedServiceInstance(String host) { + this.host = host; + } + + @Override + public String getWorkerIdentity() { + return LlapFixedRegistryImpl.getWorkerIdentity(host); + } + + @Override + public String getHost() { + return host; + } + + @Override + public int getRpcPort() { + // TODO: allow >1 port per host? + return LlapFixedRegistryImpl.this.port; + } + + @Override + public int getShufflePort() { + return LlapFixedRegistryImpl.this.shuffle; + } + + @Override + public boolean isAlive() { + return true; + } + + @Override + public Map getProperties() { + Map properties = new HashMap<>(srv); + // no worker identity + return properties; + } + + @Override + public Resource getResource() { + return Resource.newInstance(memory, vcores); + } + + } + + private final class FixedServiceInstanceSet implements ServiceInstanceSet { + + private final Map instances = new HashMap(); + + public FixedServiceInstanceSet() { + for (String host : hosts) { + // trigger bugs in anyone who uses this as a hostname + instances.put(getWorkerIdentity(host), new FixedServiceInstance(host)); + } + } + + @Override + public Map getInstances() { + return instances; + } + + @Override + public ServiceInstance getInstance(String name) { + return instances.get(name); + } + + @Override + public Set getByHost(String host) { + Set byHost = new HashSet(); + ServiceInstance inst = getInstance(getWorkerIdentity(host)); + if (inst != null) { + byHost.add(inst); + } + return byHost; + } + + @Override + public void refresh() throws IOException { + // I will do no such thing + } + + } + + @Override + public ServiceInstanceSet getInstances(String component) throws IOException { + return new FixedServiceInstanceSet(); + } + + @Override + public String toString() { + return String.format("FixedRegistry hosts=%s", StringUtils.join(",", this.hosts)); + } +} \ No newline at end of file diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java index 67596fe..321e463 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java @@ -1,16 +1,16 @@ package org.apache.hadoop.hive.llap.daemon.registry.impl; import java.io.IOException; -import java.net.Inet4Address; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.Map; +import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; -import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemon; -import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstanceSet; +import org.apache.hadoop.hive.llap.daemon.registry.ServiceRegistry; import org.apache.hadoop.registry.client.api.RegistryOperationsFactory; import org.apache.hadoop.registry.client.binding.RegistryPathUtils; import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; @@ -30,25 +30,8 @@ public class LlapRegistryService extends AbstractService { private static final Logger LOG = Logger.getLogger(LlapRegistryService.class); - - public final static String SERVICE_CLASS = "org-apache-hive"; - - private RegistryOperationsService client; - private String instanceName; - private Configuration conf; - private ServiceRecordMarshal encoder; - - private static final String hostname; - static { - String localhost = "localhost"; - try { - localhost = InetAddress.getLocalHost().getCanonicalHostName(); - } catch (UnknownHostException uhe) { - // ignore - } - hostname = localhost; - } + private ServiceRegistry registry = null; public LlapRegistryService() { super("LlapRegistryService"); @@ -56,92 +39,48 @@ public LlapRegistryService() { @Override public void serviceInit(Configuration conf) { - String registryId = conf.getTrimmed(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS); - if (registryId.startsWith("@")) { - LOG.info("Llap Registry is enabled with registryid: " + registryId); - this.conf = new Configuration(conf); - conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); - // registry reference - instanceName = registryId.substring(1); - client = (RegistryOperationsService) RegistryOperationsFactory.createInstance(conf); - encoder = new RegistryUtils.ServiceRecordMarshal(); - + String hosts = conf.getTrimmed(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS); + if (hosts.startsWith("@")) { + registry = initRegistry(hosts.substring(1), conf); } else { - LOG.info("Llap Registry is disabled"); + registry = new LlapFixedRegistryImpl(hosts, conf); } + LOG.info("Using LLAP registry type " + registry); + } + + private ServiceRegistry initRegistry(String instanceName, Configuration conf) { + return new LlapYarnRegistryImpl(instanceName, conf); } @Override public void serviceStart() throws Exception { - if (client != null) { - client.start(); + if (this.registry != null) { + this.registry.start(); } } + @Override public void serviceStop() throws Exception { - if (client != null) { - client.stop(); + if (this.registry != null) { + this.registry.start(); + } else { + LOG.warn("Stopping non-existent registry service"); } } - public Endpoint getRpcEndpoint() { - final int rpcPort = - conf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT, - LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT); - - return RegistryTypeUtils.ipcEndpoint("llap", new InetSocketAddress(hostname, rpcPort)); - } - - public Endpoint getShuffleEndpoint() { - final int shufflePort = - conf.getInt(LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT, - LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT); - // HTTP today, but might not be - return RegistryTypeUtils.inetAddrEndpoint("shuffle", ProtocolTypes.PROTOCOL_TCP, hostname, - shufflePort); - } - - private final String getPath() { - return RegistryPathUtils.join(RegistryUtils.componentPath(RegistryUtils.currentUser(), - SERVICE_CLASS, instanceName, "workers"), "worker-"); - } - public void registerWorker() throws IOException { - if (this.client != null) { - String path = getPath(); - ServiceRecord srv = new ServiceRecord(); - srv.addInternalEndpoint(getRpcEndpoint()); - srv.addInternalEndpoint(getShuffleEndpoint()); - - for (Map.Entry kv : this.conf) { - if (kv.getKey().startsWith(LlapConfiguration.LLAP_DAEMON_PREFIX)) { - // TODO: read this somewhere useful, like the allocator - srv.set(kv.getKey(), kv.getValue()); - } - } - - client.mknode(RegistryPathUtils.parentOf(path), true); - - // FIXME: YARN registry needs to expose Ephemeral_Seq nodes & return the paths - client.zkCreate(path, CreateMode.EPHEMERAL_SEQUENTIAL, encoder.toBytes(srv), - client.getClientAcls()); + if (this.registry != null) { + this.registry.register(); } } - public void unregisterWorker() { - if (this.client != null) { - // with ephemeral nodes, there's nothing to do here - // because the create didn't return paths + public void unregisterWorker() throws IOException { + if (this.registry != null) { + this.registry.unregister(); } } - public Map getWorkers() throws IOException { - if (this.client != null) { - String path = getPath(); - return RegistryUtils.listServiceRecords(client, RegistryPathUtils.parentOf(path)); - } else { - Preconditions.checkNotNull(this.client, "Yarn registry client is not intialized"); - return null; - } + public ServiceInstanceSet getInstances() throws IOException { + return this.registry.getInstances("LLAP"); } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapYarnRegistryImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapYarnRegistryImpl.java new file mode 100644 index 0000000..5d7c68f --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapYarnRegistryImpl.java @@ -0,0 +1,288 @@ +package org.apache.hadoop.hive.llap.daemon.registry.impl; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.PathNotFoundException; +import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; +import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstance; +import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstanceSet; +import org.apache.hadoop.hive.llap.daemon.registry.ServiceRegistry; +import org.apache.hadoop.registry.client.api.RegistryOperationsFactory; +import org.apache.hadoop.registry.client.binding.RegistryPathUtils; +import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; +import org.apache.hadoop.registry.client.binding.RegistryUtils; +import org.apache.hadoop.registry.client.binding.RegistryUtils.ServiceRecordMarshal; +import org.apache.hadoop.registry.client.exceptions.InvalidRecordException; +import org.apache.hadoop.registry.client.impl.zk.RegistryOperationsService; +import org.apache.hadoop.registry.client.types.AddressTypes; +import org.apache.hadoop.registry.client.types.Endpoint; +import org.apache.hadoop.registry.client.types.ProtocolTypes; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.log4j.Logger; +import org.apache.zookeeper.CreateMode; + +import com.google.common.base.Preconditions; + +public class LlapYarnRegistryImpl implements ServiceRegistry { + + private static final Logger LOG = Logger.getLogger(LlapYarnRegistryImpl.class); + + private RegistryOperationsService client; + private String instanceName; + private Configuration conf; + private ServiceRecordMarshal encoder; + + private static final UUID uniq = UUID.randomUUID(); + private static final String hostname; + + private static final String UNIQUE_IDENTIFIER = "llap.unique.id"; + + private final static String SERVICE_CLASS = "org-apache-hive"; + + final ScheduledExecutorService refresher = Executors.newScheduledThreadPool(1); + + static { + String localhost = "localhost"; + try { + localhost = InetAddress.getLocalHost().getCanonicalHostName(); + } catch (UnknownHostException uhe) { + // ignore + } + hostname = localhost; + } + + public LlapYarnRegistryImpl(String instanceName, Configuration conf) { + + LOG.info("Llap Registry is enabled with registryid: " + instanceName); + this.conf = new Configuration(conf); + this.instanceName = instanceName; + conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); + // registry reference + client = (RegistryOperationsService) RegistryOperationsFactory.createInstance(conf); + encoder = new RegistryUtils.ServiceRecordMarshal(); + + } + + public Endpoint getRpcEndpoint() { + final int rpcPort = + conf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT, + LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT); + return RegistryTypeUtils.ipcEndpoint("llap", new InetSocketAddress(hostname, rpcPort)); + } + + public Endpoint getShuffleEndpoint() { + final int shufflePort = + conf.getInt(LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT, + LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT); + // HTTP today, but might not be + return RegistryTypeUtils.inetAddrEndpoint("shuffle", ProtocolTypes.PROTOCOL_TCP, hostname, + shufflePort); + } + + private final String getPath() { + return RegistryPathUtils.join(RegistryUtils.componentPath(RegistryUtils.currentUser(), + SERVICE_CLASS, instanceName, "workers"), "worker-"); + } + + @Override + public void register() throws IOException { + String path = getPath(); + ServiceRecord srv = new ServiceRecord(); + srv.addInternalEndpoint(getRpcEndpoint()); + srv.addInternalEndpoint(getShuffleEndpoint()); + + for (Map.Entry kv : this.conf) { + if (kv.getKey().startsWith(LlapConfiguration.LLAP_DAEMON_PREFIX) + || kv.getKey().startsWith("hive.llap.")) { + // TODO: read this somewhere useful, like the task scheduler + srv.set(kv.getKey(), kv.getValue()); + } + } + + // restart sensitive instance id + srv.set(UNIQUE_IDENTIFIER, uniq.toString()); + + client.mknode(RegistryPathUtils.parentOf(path), true); + + // FIXME: YARN registry needs to expose Ephemeral_Seq nodes & return the paths + client.zkCreate(path, CreateMode.EPHEMERAL_SEQUENTIAL, encoder.toBytes(srv), + client.getClientAcls()); + } + + @Override + public void unregister() throws IOException { + // Nothing for the zkCreate models + } + + private class DynamicServiceInstance implements ServiceInstance { + private final ServiceRecord srv; + private boolean alive = true; + private final String host; + private final int rpcPort; + private final int shufflePort; + + public DynamicServiceInstance(ServiceRecord srv) throws IOException { + this.srv = srv; + + final Endpoint shuffle = srv.getInternalEndpoint("shuffle"); + final Endpoint rpc = srv.getInternalEndpoint("llap"); + + this.host = + RegistryTypeUtils.getAddressField(rpc.addresses.get(0), + AddressTypes.ADDRESS_HOSTNAME_FIELD); + this.rpcPort = + Integer.valueOf(RegistryTypeUtils.getAddressField(rpc.addresses.get(0), + AddressTypes.ADDRESS_PORT_FIELD)); + this.shufflePort = + Integer.valueOf(RegistryTypeUtils.getAddressField(shuffle.addresses.get(0), + AddressTypes.ADDRESS_PORT_FIELD)); + } + + @Override + public String getWorkerIdentity() { + return srv.get(UNIQUE_IDENTIFIER); + } + + @Override + public String getHost() { + return host; + } + + @Override + public int getRpcPort() { + return rpcPort; + } + + @Override + public int getShufflePort() { + return shufflePort; + } + + @Override + public boolean isAlive() { + return alive ; + } + + public void kill() { + this.alive = false; + } + + @Override + public Map getProperties() { + return srv.attributes(); + } + + @Override + public Resource getResource() { + int memory = Integer.valueOf(srv.get(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB)); + int vCores = Integer.valueOf(srv.get(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS)); + return Resource.newInstance(memory, vCores); + } + } + + private class DynamicServiceInstanceSet implements ServiceInstanceSet { + + Map instances; + + @Override + public Map getInstances() { + return instances; + } + + @Override + public ServiceInstance getInstance(String name) { + return instances.get(name); + } + + @Override + public synchronized void refresh() throws IOException { + /* call this from wherever */ + Map freshInstances = new HashMap(); + + String path = getPath(); + Map records = + RegistryUtils.listServiceRecords(client, RegistryPathUtils.parentOf(path)); + + for (ServiceRecord rec : records.values()) { + ServiceInstance instance = new DynamicServiceInstance(rec); + freshInstances.put(instance.getWorkerIdentity(), instance); + } + + if (instances != null) { + Set oldKeys = instances.keySet(); + Set newKeys = freshInstances.keySet(); + if (oldKeys.removeAll(newKeys)) { + for (String k : oldKeys) { + // this is so that people can hold onto ServiceInstance references as placeholders for tasks + ((DynamicServiceInstance) instances.get(k)).kill(); + } + } + } + instances = freshInstances; + } + + @Override + public Set getByHost(String host) { + Set byHost = new HashSet(); + for (ServiceInstance i : instances.values()) { + if (host.equals(i.getHost())) { + // all hosts in instances should be alive in this impl + byHost.add(i); + } + } + return byHost; + } + } + + final DynamicServiceInstanceSet instances = new DynamicServiceInstanceSet(); + + @Override + public ServiceInstanceSet getInstances(String component) throws IOException { + Preconditions.checkArgument("LLAP".equals(component)); // right now there is only 1 component + if (this.client != null) { + instances.refresh(); + return instances; + } else { + Preconditions.checkNotNull(this.client, "Yarn registry client is not intialized"); + return null; + } + } + + @Override + public void start() { + if (client != null) { + client.start(); + /*refresher.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + try { + instances.refresh(); + } catch (IOException ioe) { + LOG.warn("Could not refresh hosts on the timer ", ioe); + } + } + }, 0, 5, TimeUnit.SECONDS);*/ + } + } + + @Override + public void stop() { + if (client != null) { + client.stop(); + } + } +} diff --git llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java index a50b159..4b504e1 100644 --- llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java +++ llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java @@ -19,6 +19,7 @@ import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.Arrays; +import java.util.Collection; import java.util.Comparator; import java.util.EnumSet; import java.util.HashMap; @@ -42,6 +43,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; import com.google.common.annotations.VisibleForTesting; @@ -68,12 +70,18 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; +import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstance; +import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstanceSet; import org.apache.hadoop.hive.llap.daemon.registry.impl.LlapRegistryService; import org.apache.hadoop.yarn.util.Clock; import org.apache.tez.dag.api.TaskAttemptEndReason; import org.apache.tez.dag.app.AppContext; +/** + * @author gvijayaraghavan + * + */ public class LlapTaskSchedulerService extends TaskSchedulerService { private static final Log LOG = LogFactory.getLog(LlapTaskSchedulerService.class); @@ -83,19 +91,16 @@ private final ExecutorService appCallbackExecutor; private final TaskSchedulerAppCallback appClientDelegate; - // Set of active hosts - @VisibleForTesting - final LinkedHashMap activeHosts = new LinkedHashMap<>(); - // Populated each time activeHosts is modified - @VisibleForTesting - String []activeHostList; + // interface into the registry service + private ServiceInstanceSet activeInstances; - // Set of all hosts in the system. @VisibleForTesting - final ConcurrentMap allHosts = new ConcurrentHashMap<>(); + final Map instanceToNodeMap = new HashMap<>(); + + @VisibleForTesting // Tracks currently allocated containers. - private final Map containerToHostMap = new HashMap<>(); + final Map containerToInstanceMap = new HashMap<>(); // Tracks tasks which could not be allocated immediately. @VisibleForTesting @@ -116,7 +121,6 @@ public int compare(Priority o1, Priority o2) { private final ContainerFactory containerFactory; private final Random random = new Random(); - private final int containerPort; private final Clock clock; private final ListeningExecutorService executor; private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -144,7 +148,6 @@ public int compare(Priority o1, Priority o2) { // Per Executor Thread private final Resource resourcePerExecutor; - private final boolean initFromRegistry; private final LlapRegistryService registry = new LlapRegistryService(); private final PendingTaskSchedulerCallable pendingTaskSchedulerCallable = new PendingTaskSchedulerCallable(); private ListenableFuture pendingTaskSchedulerFuture; @@ -156,10 +159,6 @@ public int compare(Priority o1, Priority o2) { @VisibleForTesting StatsPerDag dagStats = new StatsPerDag(); - - - - public LlapTaskSchedulerService(TaskSchedulerAppCallback appClient, AppContext appContext, String clientHostname, int clientPort, String trackingUrl, long customAppIdIdentifier, @@ -192,118 +191,39 @@ public LlapTaskSchedulerService(TaskSchedulerAppCallback appClient, AppContext a Preconditions.checkNotNull(instanceId, LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS + " must be defined"); - if (!instanceId.startsWith("@")) { // Manual setup. Not via the service registry - initFromRegistry = false; - String[] hosts = conf.getTrimmedStrings(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS); - Preconditions.checkState(hosts != null && hosts.length != 0, - LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS + "must be defined"); - for (String host : hosts) { - // If reading addresses from conf, try resolving local addresses so that - // this matches with the address reported by daemons. - InetAddress inetAddress = null; - try { - inetAddress = InetAddress.getByName(host); - if (NetUtils.isLocalAddress(inetAddress)) { - InetSocketAddress socketAddress = new InetSocketAddress(0); - socketAddress = NetUtils.getConnectAddress(socketAddress); - LOG.info("Adding host identified as local: " + host + " as " + socketAddress.getHostName()); - host = socketAddress.getHostName(); - } - } catch (UnknownHostException e) { - LOG.warn("Ignoring resolution issues for host: " + host, e); - } - NodeInfo nodeInfo = new NodeInfo(host, BACKOFF_FACTOR, clock); - activeHosts.put(host, nodeInfo); - allHosts.put(host, nodeInfo); - } - activeHostList = activeHosts.keySet().toArray(new String[activeHosts.size()]); - } else { - initFromRegistry = true; - } - - this.containerPort = conf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT, - LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT); ExecutorService executorService = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapScheduler").build()); executor = MoreExecutors.listeningDecorator(executorService); - if (activeHosts.size() > 0) { - LOG.info("Running with configuration: " + - "memoryPerInstance=" + memoryPerInstance + - ", vCoresPerInstance=" + coresPerInstance + - ", executorsPerInstance=" + executorsPerInstance + - ", resourcePerInstanceInferred=" + resourcePerExecutor + - ", hosts=" + allHosts.keySet() + - ", rpcPort=" + containerPort + - ", nodeReEnableTimeout=" + nodeReEnableTimeout + - ", nodeReEnableBackOffFactor=" + BACKOFF_FACTOR); - } else { - LOG.info("Running with configuration: " + + LOG.info("Running with configuration: " + "memoryPerInstance=" + memoryPerInstance + ", vCoresPerInstance=" + coresPerInstance + ", executorsPerInstance=" + executorsPerInstance + ", resourcePerInstanceInferred=" + resourcePerExecutor + - ", hosts=" + - ", rpcPort=" + ", nodeReEnableTimeout=" + nodeReEnableTimeout + ", nodeReEnableBackOffFactor=" + BACKOFF_FACTOR); - } - } @Override public void serviceInit(Configuration conf) { - if (initFromRegistry) { - registry.init(conf); - } + registry.init(conf); } @Override public void serviceStart() throws IOException { - writeLock.lock(); try { pendingTaskSchedulerFuture = executor.submit(pendingTaskSchedulerCallable); - if (initFromRegistry) { - registry.start(); - if (activeHosts.size() > 0) { - return; - } - LOG.info("Reading YARN registry for service records"); - - Map workers = registry.getWorkers(); - for (ServiceRecord srv : workers.values()) { - Endpoint rpc = srv.getInternalEndpoint("llap"); - if (rpc != null) { - LOG.info("Examining endpoint: " + rpc); - final String host = - RegistryTypeUtils.getAddressField(rpc.addresses.get(0), - AddressTypes.ADDRESS_HOSTNAME_FIELD); - NodeInfo nodeInfo = new NodeInfo(host, BACKOFF_FACTOR, clock); - activeHosts.put(host, nodeInfo); - allHosts.put(host, nodeInfo); - } else { - - LOG.info("The SRV record was " + srv); - } - } - activeHostList = activeHosts.keySet().toArray(new String[activeHosts.size()]); - - - - LOG.info("Re-inited with configuration: " + - "memoryPerInstance=" + memoryPerInstance + - ", vCoresPerInstance=" + coresPerInstance + - ", executorsPerInstance=" + executorsPerInstance + - ", resourcePerInstanceInferred=" + resourcePerExecutor + - ", hosts=" + allHosts.keySet()); + registry.start(); + activeInstances = registry.getInstances(); + for (ServiceInstance inst : activeInstances.getInstances().values()) { + instanceToNodeMap.put(inst, new NodeInfo(inst, BACKOFF_FACTOR, clock)); } } finally { writeLock.unlock(); } - } @Override @@ -316,33 +236,81 @@ public void serviceStop() { pendingTaskSchedulerFuture.cancel(true); } executor.shutdownNow(); - if (initFromRegistry) { - registry.stop(); - } + registry.stop(); appCallbackExecutor.shutdownNow(); } } finally { writeLock.unlock(); } } + + + @Override + public Resource getTotalResources() { + int memory = 0; + int vcores = 0; + readLock.lock(); + try { + for (ServiceInstance inst : activeInstances.getInstances().values()) { + if (inst.isAlive()) { + Resource r = inst.getResource(); + memory += r.getMemory(); + vcores += r.getVirtualCores(); + } + } + } finally { + readLock.unlock(); + } + + return Resource.newInstance(memory, vcores); + } + + /** + * The difference between this and getTotalResources() is that this only gives + * currently free resource instances, while the other lists all the instances + * that may become available in a while. + */ @Override public Resource getAvailableResources() { - // TODO This needs information about all running executors, and the amount of memory etc available across the cluster. - // No lock required until this moves to using something other than allHosts - return Resource - .newInstance(Ints.checkedCast(allHosts.size() * memoryPerInstance), - allHosts.size() * coresPerInstance); + // need a state store eventually for current state & measure backoffs + int memory = 0; + int vcores = 0; + readLock.lock(); + try { + for (ServiceInstance inst : instanceToNodeMap.keySet()) { + if (inst.isAlive()) { + Resource r = inst.getResource(); + memory += r.getMemory(); + vcores += r.getVirtualCores(); + } + } + } finally { + readLock.unlock(); + } + + return Resource.newInstance(memory, vcores); } @Override public int getClusterNodeCount() { - // No lock required until this moves to using something other than allHosts - return allHosts.size(); + readLock.lock(); + try { + int n = 0; + for (ServiceInstance inst : activeInstances.getInstances().values()) { + if (inst.isAlive()) { + n++; + } + } + return n; + } finally { + readLock.unlock(); + } } @Override public void resetMatchLocalityForAllHeldContainers() { + new Exception("Resetting things").printStackTrace(); // This is effectively DAG completed, and can be used to reset statistics being tracked. LOG.info("DAG: " + dagCounter.get() + " completed. Scheduling stats: " + dagStats); dagCounter.incrementAndGet(); @@ -350,14 +318,6 @@ public void resetMatchLocalityForAllHeldContainers() { } @Override - public Resource getTotalResources() { - // No lock required until this moves to using something other than allHosts - return Resource - .newInstance(Ints.checkedCast(allHosts.size() * memoryPerInstance), - allHosts.size() * coresPerInstance); - } - - @Override public void blacklistNode(NodeId nodeId) { LOG.info("DEBUG: BlacklistNode not supported"); } @@ -428,14 +388,14 @@ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEnd } return false; } - String hostForContainer = containerToHostMap.remove(taskInfo.containerId); + String hostForContainer = containerToInstanceMap.remove(taskInfo.containerId); assert hostForContainer != null; - String assignedHost = taskInfo.assignedHost; - assert assignedHost != null; + ServiceInstance assignedInstance = taskInfo.assignedInstance; + assert assignedInstance != null; if (taskSucceeded) { // The node may have been blacklisted at this point - which means it may not be in the activeNodeList. - NodeInfo nodeInfo = allHosts.get(assignedHost); + NodeInfo nodeInfo = instanceToNodeMap.get(assignedInstance); assert nodeInfo != null; nodeInfo.registerTaskSuccess(); // TODO Consider un-blacklisting the node since at least 1 slot should have become available on the node. @@ -443,11 +403,11 @@ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEnd .of(TaskAttemptEndReason.SERVICE_BUSY, TaskAttemptEndReason.COMMUNICATION_ERROR) .contains(endReason)) { if (endReason == TaskAttemptEndReason.COMMUNICATION_ERROR) { - dagStats.registerCommFailure(taskInfo.assignedHost); + dagStats.registerCommFailure(taskInfo.assignedInstance.getHost()); } else if (endReason == TaskAttemptEndReason.SERVICE_BUSY) { - dagStats.registerTaskRejected(taskInfo.assignedHost); + dagStats.registerTaskRejected(taskInfo.assignedInstance.getHost()); } - disableNode(assignedHost); + disableInstance(assignedInstance); } } finally { writeLock.unlock(); @@ -489,71 +449,86 @@ TaskSchedulerAppCallback createAppCallbackDelegate( * @param requestedHosts the list of preferred hosts. null implies any host * @return */ - private String selectHost(String[] requestedHosts) { - // TODO Change this to work off of what we think is remaining capacity for a host - + private ServiceInstance selectHost(TaskInfo request) { + String[] requestedHosts = request.requestedHosts; readLock.lock(); // Read-lock. Not updating any stats at the moment. try { - // Check if any hosts are active. If there's any active host, an allocation will happen. - if (activeHosts.size() == 0) { + // Check if any hosts are active. + if (getAvailableResources().getMemory() <= 0) { + refreshInstances(); + } + + // If there's no memory available, fail + if (getTotalResources().getMemory() <=0) { return null; } - String host = null; - if (requestedHosts != null && requestedHosts.length > 0) { + if (requestedHosts != null) { + for (String host : requestedHosts) { // Pick the first host always. Weak attempt at cache affinity. - host = requestedHosts[0]; - if (activeHosts.get(host) != null) { - LOG.info("Selected host: " + host + " from requested hosts: " + - Arrays.toString(requestedHosts)); - } else { - LOG.info("Preferred host: " + host + " not present. Attempting to select another one"); - host = null; - for (String h : requestedHosts) { - if (activeHosts.get(h) != null) { - host = h; - break; + Set instances = activeInstances.getByHost(host); + if (!instances.isEmpty()) { + for (ServiceInstance inst : instances) { + if (inst.isAlive() && instanceToNodeMap.containsKey(inst)) { + // only allocate from the "available" list + // TODO Change this to work off of what we think is remaining capacity for an instance + return inst; } } - if (host == null) { - host = activeHostList[random.nextInt(activeHostList.length)]; - LOG.info("Requested hosts: " + Arrays.toString(requestedHosts) + - " not present. Randomizing the host"); - } } - } else { - host = activeHostList[random.nextInt(activeHostList.length)]; - LOG.info("Selected random host: " + host + " since the request contained no host information"); + }} + /* fall through - miss in locality (random scheduling) */ + + // only allocate from the "available" list + ServiceInstance[] all = instanceToNodeMap.keySet().toArray(new ServiceInstance[0]); + + if (all.length == 0) { + return null; + } + + int n = random.nextInt(all.length); + // start at random offset and iterate whole list + for (int i = 0; i < all.length ; i++) { + ServiceInstance inst = all[(i+n) % all.length]; + if (inst.isAlive()) { + return inst; + } } - return host; } finally { readLock.unlock(); } + /* no matches */ + return null; } + private void refreshInstances() { + try { + activeInstances.refresh(); // handles its own sync + } catch(IOException ioe) { + LOG.warn("Could not refresh list of active instances", ioe); + } + } + private void reenableDisabledNode(NodeInfo nodeInfo) { writeLock.lock(); try { nodeInfo.enableNode(); - activeHosts.put(nodeInfo.hostname, nodeInfo); - activeHostList = activeHosts.keySet().toArray(new String[activeHosts.size()]); } finally { writeLock.unlock(); } } - private void disableNode(String hostname) { + private void disableInstance(ServiceInstance instance) { writeLock.lock(); try { - NodeInfo nodeInfo = activeHosts.remove(hostname); + NodeInfo nodeInfo = instanceToNodeMap.remove(instance); if (nodeInfo == null) { - LOG.debug("Node: " + hostname + " already disabled, or invalid. Not doing anything."); + LOG.debug("Node: " + instance + " already disabled, or invalid. Not doing anything."); } else { nodeInfo.disableNode(nodeReEnableTimeout); disabledNodes.add(nodeInfo); } - activeHostList = activeHosts.keySet().toArray(new String[activeHosts.size()]); } finally { writeLock.unlock(); } @@ -625,18 +600,18 @@ private void schedulePendingTasks() { } private boolean scheduleTask(TaskInfo taskInfo) { - String host = selectHost(taskInfo.requestedHosts); + ServiceInstance host = selectHost(taskInfo); if (host == null) { return false; } else { Container container = - containerFactory.createContainer(resourcePerExecutor, taskInfo.priority, host, containerPort); + containerFactory.createContainer(resourcePerExecutor, taskInfo.priority, host.getHost(), host.getRpcPort()); writeLock.lock(); // While updating local structures try { - dagStats.registerTaskAllocated(taskInfo.requestedHosts, taskInfo.requestedRacks, host); + dagStats.registerTaskAllocated(taskInfo.requestedHosts, taskInfo.requestedRacks, host.getHost()); taskInfo.setAssignmentInfo(host, container.getId()); knownTasks.putIfAbsent(taskInfo.task, taskInfo); - containerToHostMap.put(container.getId(), host); + containerToInstanceMap.put(container.getId(), host.getWorkerIdentity()); } finally { writeLock.unlock(); } @@ -683,7 +658,7 @@ public void shutdown() { @VisibleForTesting static class NodeInfo implements Delayed { private final float constBackOffFactor; - final String hostname; + final ServiceInstance host; private final Clock clock; long expireTimeMillis = -1; @@ -691,8 +666,8 @@ public void shutdown() { private long numSuccessfulTasksAtLastBlacklist = -1; float cumulativeBackoffFactor = 1.0f; - NodeInfo(String hostname, float backoffFactor, Clock clock) { - this.hostname = hostname; + NodeInfo(ServiceInstance host, float backoffFactor, Clock clock) { + this.host = host; constBackOffFactor = backoffFactor; this.clock = clock; } @@ -740,7 +715,7 @@ public int compareTo(Delayed o) { public String toString() { return "NodeInfo{" + "constBackOffFactor=" + constBackOffFactor + - ", hostname='" + hostname + '\'' + + ", host=" + host + ", expireTimeMillis=" + expireTimeMillis + ", numSuccessfulTasks=" + numSuccessfulTasks + ", numSuccessfulTasksAtLastBlacklist=" + numSuccessfulTasksAtLastBlacklist + @@ -795,6 +770,7 @@ void registerTaskAllocated(String[] requestedHosts, String [] requestedRacks, St if (requestedHosts != null && requestedHosts.length != 0) { Set requestedHostSet = new HashSet<>(Arrays.asList(requestedHosts)); if (requestedHostSet.contains(allocatedHost)) { + new Exception("Really?").printStackTrace(); numLocalAllocations++; _registerAllocationInHostMap(allocatedHost, localityBasedNumAllocationsPerHost); } else { @@ -837,7 +813,7 @@ private void _registerAllocationInHostMap(String host, Map