diff --git llap-client/pom.xml llap-client/pom.xml
index ff7c82c..bda23df 100644
--- llap-client/pom.xml
+++ llap-client/pom.xml
@@ -77,6 +77,12 @@
${hadoop.version}
true
+
+ org.apache.hadoop
+ hadoop-yarn-registry
+ ${hadoop.version}
+ true
+
junit
diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java
new file mode 100644
index 0000000..f116de4
--- /dev/null
+++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.registry;
+
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+
+public interface ServiceInstance {
+
+ /**
+ * Worker identity is a UUID (unique across restarts), to identify a node which died & was brought
+ * back on the same host/port
+ */
+ public String getWorkerIdentity();
+
+ /**
+ * Hostname of the service instance
+ *
+ * @return
+ */
+ public String getHost();
+
+ /**
+ * RPC Endpoint for service instance
+ *
+ * @return
+ */
+ public int getRpcPort();
+
+ /**
+ * Shuffle Endpoint for service instance
+ *
+ * @return
+ */
+ public int getShufflePort();
+
+ /**
+ * Return the last known state (without refreshing)
+ *
+ * @return
+ */
+
+ public boolean isAlive();
+
+ /**
+ * Config properties of the Service Instance (llap.daemon.*)
+ *
+ * @return
+ */
+
+ public Map getProperties();
+
+ /**
+ * Memory and Executors available for the LLAP tasks
+ *
+ * This does not include the size of the cache or the actual vCores allocated via Slider.
+ *
+ * @return
+ */
+ public Resource getResource();
+}
diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
new file mode 100644
index 0000000..388b5f3
--- /dev/null
+++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.registry;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+public interface ServiceInstanceSet {
+
+ /**
+ * Get an instance mapping which map worker identity to each instance.
+ *
+ * The worker identity does not collide between restarts, so each restart will have a unique id,
+ * while having the same host/ip pair.
+ *
+ * @return
+ */
+ public Map getAll();
+
+ /**
+ * Get an instance by worker identity.
+ *
+ * @param name
+ * @return
+ */
+ public ServiceInstance getInstance(String name);
+
+ /**
+ * Get a list of service instances for a given host.
+ *
+ * The list could include dead and alive instances.
+ *
+ * @param host
+ * @return
+ */
+ public Set getByHost(String host);
+
+ /**
+ * Refresh the instance set from registry backing store.
+ *
+ * @throws IOException
+ */
+ public void refresh() throws IOException;
+
+}
\ No newline at end of file
diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
new file mode 100644
index 0000000..d3fb517
--- /dev/null
+++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.registry;
+
+import java.io.IOException;
+
+/**
+ * ServiceRegistry interface for switching between fixed host and dynamic registry implementations.
+ */
+public interface ServiceRegistry {
+
+ /**
+ * Start the service registry
+ *
+ * @throws InterruptedException
+ */
+ public void start() throws InterruptedException;
+
+ /**
+ * Stop the service registry
+ *
+ * @throws InterruptedException
+ */
+ public void stop() throws InterruptedException;
+
+ /**
+ * Register the current instance - the implementation takes care of the endpoints to register.
+ *
+ * @throws IOException
+ */
+ public void register() throws IOException;
+
+ /**
+ * Remove the current registration cleanly (implementation defined cleanup)
+ *
+ * @throws IOException
+ */
+ public void unregister() throws IOException;
+
+ /**
+ * Client API to get the list of instances registered via the current registry key.
+ *
+ * @param component
+ * @return
+ * @throws IOException
+ */
+ public ServiceInstanceSet getInstances(String component) throws IOException;
+}
diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
new file mode 100644
index 0000000..bdf19ec
--- /dev/null
+++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.registry.impl;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
+import org.apache.hadoop.hive.llap.registry.ServiceInstance;
+import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
+import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.log4j.Logger;
+
+public class LlapFixedRegistryImpl implements ServiceRegistry {
+
+ private static final Logger LOG = Logger.getLogger(LlapFixedRegistryImpl.class);
+
+ @InterfaceAudience.Private
+ // This is primarily for testing to avoid the host lookup
+ public static final String FIXED_REGISTRY_RESOLVE_HOST_NAMES = "fixed.registry.resolve.host.names";
+
+ private final int port;
+ private final int shuffle;
+ private final String[] hosts;
+ private final int memory;
+ private final int vcores;
+ private final boolean resolveHosts;
+
+ private final Map srv = new HashMap();
+
+ public LlapFixedRegistryImpl(String hosts, Configuration conf) {
+ this.hosts = hosts.split(",");
+ this.port =
+ conf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT,
+ LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT);
+ this.shuffle =
+ conf.getInt(LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT,
+ LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT);
+ this.resolveHosts = conf.getBoolean(FIXED_REGISTRY_RESOLVE_HOST_NAMES, true);
+
+ for (Map.Entry kv : conf) {
+ if (kv.getKey().startsWith(LlapConfiguration.LLAP_DAEMON_PREFIX)
+ || kv.getKey().startsWith("hive.llap.")
+ || kv.getKey().startsWith(LlapConfiguration.LLAP_PREFIX)) {
+ // TODO: read this somewhere useful, like the task scheduler
+ srv.put(kv.getKey(), kv.getValue());
+ }
+ }
+
+ this.memory =
+ conf.getInt(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB,
+ LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT);
+ this.vcores =
+ conf.getInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS,
+ LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT);
+ }
+
+ @Override
+ public void start() throws InterruptedException {
+ // nothing to start
+ }
+
+ @Override
+ public void stop() throws InterruptedException {
+ // nothing to stop
+ }
+
+ @Override
+ public void register() throws IOException {
+ // nothing to register
+ }
+
+ @Override
+ public void unregister() throws IOException {
+ // nothing to unregister
+ }
+
+ public static String getWorkerIdentity(String host) {
+ // trigger clean errors for anyone who mixes up identity with hosts
+ return "host-" + host;
+ }
+
+ private final class FixedServiceInstance implements ServiceInstance {
+
+ private final String host;
+
+ public FixedServiceInstance(String host) {
+ if (resolveHosts) {
+ try {
+ InetAddress inetAddress = InetAddress.getByName(host);
+ if (NetUtils.isLocalAddress(inetAddress)) {
+ InetSocketAddress socketAddress = new InetSocketAddress(0);
+ socketAddress = NetUtils.getConnectAddress(socketAddress);
+ LOG.info("Adding host identified as local: " + host + " as "
+ + socketAddress.getHostName());
+ host = socketAddress.getHostName();
+ }
+ } catch (UnknownHostException e) {
+ LOG.warn("Ignoring resolution issues for host: " + host, e);
+ }
+ }
+ this.host = host;
+ }
+
+ @Override
+ public String getWorkerIdentity() {
+ return LlapFixedRegistryImpl.getWorkerIdentity(host);
+ }
+
+ @Override
+ public String getHost() {
+ return host;
+ }
+
+ @Override
+ public int getRpcPort() {
+ // TODO: allow >1 port per host?
+ return LlapFixedRegistryImpl.this.port;
+ }
+
+ @Override
+ public int getShufflePort() {
+ return LlapFixedRegistryImpl.this.shuffle;
+ }
+
+ @Override
+ public boolean isAlive() {
+ return true;
+ }
+
+ @Override
+ public Map getProperties() {
+ Map properties = new HashMap<>(srv);
+ // no worker identity
+ return properties;
+ }
+
+ @Override
+ public Resource getResource() {
+ return Resource.newInstance(memory, vcores);
+ }
+
+ @Override
+ public String toString() {
+ return "FixedServiceInstance{" +
+ "host=" + host +
+ ", memory=" + memory +
+ ", vcores=" + vcores +
+ '}';
+ }
+ }
+
+ private final class FixedServiceInstanceSet implements ServiceInstanceSet {
+
+ private final Map instances = new HashMap();
+
+ public FixedServiceInstanceSet() {
+ for (String host : hosts) {
+ // trigger bugs in anyone who uses this as a hostname
+ instances.put(getWorkerIdentity(host), new FixedServiceInstance(host));
+ }
+ }
+
+ @Override
+ public Map getAll() {
+ return instances;
+ }
+
+ @Override
+ public ServiceInstance getInstance(String name) {
+ return instances.get(name);
+ }
+
+ @Override
+ public Set getByHost(String host) {
+ Set byHost = new HashSet();
+ ServiceInstance inst = getInstance(getWorkerIdentity(host));
+ if (inst != null) {
+ byHost.add(inst);
+ }
+ return byHost;
+ }
+
+ @Override
+ public void refresh() throws IOException {
+ // I will do no such thing
+ }
+
+ }
+
+ @Override
+ public ServiceInstanceSet getInstances(String component) throws IOException {
+ return new FixedServiceInstanceSet();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("FixedRegistry hosts=%s", StringUtils.join(",", this.hosts));
+ }
+}
\ No newline at end of file
diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
new file mode 100644
index 0000000..d3904fe
--- /dev/null
+++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.registry.impl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
+import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
+import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.log4j.Logger;
+
+public class LlapRegistryService extends AbstractService {
+
+ private static final Logger LOG = Logger.getLogger(LlapRegistryService.class);
+
+ private ServiceRegistry registry = null;
+ private final boolean isDaemon;
+
+ public LlapRegistryService(boolean isDaemon) {
+ super("LlapRegistryService");
+ this.isDaemon = isDaemon;
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) {
+ String hosts = conf.getTrimmed(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS);
+ if (hosts.startsWith("@")) {
+ registry = new LlapYarnRegistryImpl(hosts.substring(1), conf, isDaemon);
+ } else {
+ registry = new LlapFixedRegistryImpl(hosts, conf);
+ }
+ LOG.info("Using LLAP registry type " + registry);
+ }
+
+
+ @Override
+ public void serviceStart() throws Exception {
+ if (this.registry != null) {
+ this.registry.start();
+ }
+ if (isDaemon) {
+ registerWorker();
+ }
+ }
+
+ @Override
+ public void serviceStop() throws Exception {
+ if (isDaemon) {
+ unregisterWorker();
+ }
+ if (this.registry != null) {
+ this.registry.stop();
+ } else {
+ LOG.warn("Stopping non-existent registry service");
+ }
+ }
+
+ private void registerWorker() throws IOException {
+ if (this.registry != null) {
+ this.registry.register();
+ }
+ }
+
+ private void unregisterWorker() throws IOException {
+ if (this.registry != null) {
+ this.registry.unregister();
+ }
+ }
+
+ public ServiceInstanceSet getInstances() throws IOException {
+ return this.registry.getInstances("LLAP");
+ }
+}
diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java
new file mode 100644
index 0000000..8336fdd
--- /dev/null
+++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.registry.impl;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
+import org.apache.hadoop.hive.llap.registry.ServiceInstance;
+import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
+import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
+import org.apache.hadoop.registry.client.api.RegistryOperationsFactory;
+import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
+import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
+import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import org.apache.hadoop.registry.client.binding.RegistryUtils.ServiceRecordMarshal;
+import org.apache.hadoop.registry.client.impl.zk.RegistryOperationsService;
+import org.apache.hadoop.registry.client.types.AddressTypes;
+import org.apache.hadoop.registry.client.types.Endpoint;
+import org.apache.hadoop.registry.client.types.ProtocolTypes;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+
+import com.google.common.base.Preconditions;
+
+public class LlapYarnRegistryImpl implements ServiceRegistry {
+
+ private static final Logger LOG = Logger.getLogger(LlapYarnRegistryImpl.class);
+
+ private final RegistryOperationsService client;
+ private final Configuration conf;
+ private final ServiceRecordMarshal encoder;
+ private final String path;
+
+ private final DynamicServiceInstanceSet instances = new DynamicServiceInstanceSet();
+
+ private static final UUID uniq = UUID.randomUUID();
+ private static final String hostname;
+
+ private static final String UNIQUE_IDENTIFIER = "llap.unique.id";
+
+ private final static String SERVICE_CLASS = "org-apache-hive";
+
+ final ScheduledExecutorService refresher = Executors.newScheduledThreadPool(1,
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapYarnRegistryRefresher").build());
+ final long refreshDelay;
+ private final boolean isDaemon;
+
+ static {
+ String localhost = "localhost";
+ try {
+ localhost = InetAddress.getLocalHost().getCanonicalHostName();
+ } catch (UnknownHostException uhe) {
+ // ignore
+ }
+ hostname = localhost;
+ }
+
+ public LlapYarnRegistryImpl(String instanceName, Configuration conf, boolean isDaemon) {
+
+ LOG.info("Llap Registry is enabled with registryid: " + instanceName);
+ this.conf = new Configuration(conf);
+ conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
+ // registry reference
+ client = (RegistryOperationsService) RegistryOperationsFactory.createInstance(conf);
+ encoder = new RegistryUtils.ServiceRecordMarshal();
+ this.path = RegistryPathUtils.join(RegistryUtils.componentPath(RegistryUtils.currentUser(),
+ SERVICE_CLASS, instanceName, "workers"), "worker-");
+ refreshDelay =
+ conf.getInt(LlapConfiguration.LLAP_DAEMON_SERVICE_REFRESH_INTERVAL,
+ LlapConfiguration.LLAP_DAEMON_SERVICE_REFRESH_INTERVAL_DEFAULT);
+ this.isDaemon = isDaemon;
+ Preconditions.checkArgument(refreshDelay > 0,
+ "Refresh delay for registry has to be positive = %d", refreshDelay);
+ }
+
+ public Endpoint getRpcEndpoint() {
+ final int rpcPort =
+ conf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT,
+ LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT);
+ return RegistryTypeUtils.ipcEndpoint("llap", new InetSocketAddress(hostname, rpcPort));
+ }
+
+ public Endpoint getShuffleEndpoint() {
+ final int shufflePort =
+ conf.getInt(LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT,
+ LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT);
+ // HTTP today, but might not be
+ return RegistryTypeUtils.inetAddrEndpoint("shuffle", ProtocolTypes.PROTOCOL_TCP, hostname,
+ shufflePort);
+ }
+
+ public Endpoint getServicesEndpoint() {
+ final int servicePort =
+ conf.getInt(LlapConfiguration.LLAP_DAEMON_SERVICE_PORT,
+ LlapConfiguration.LLAP_DAEMON_SERVICE_PORT_DEFAULT);
+ final boolean isSSL =
+ conf.getBoolean(LlapConfiguration.LLAP_DAEMON_SERVICE_SSL,
+ LlapConfiguration.LLAP_DAEMON_SERVICE_SSL_DEFAULT);
+ final String scheme = isSSL ? "https" : "http";
+ final URL serviceURL;
+ try {
+ serviceURL = new URL(scheme, hostname, servicePort, "");
+ return RegistryTypeUtils.webEndpoint("services", serviceURL.toURI());
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("llap service URI for " + hostname + " is invalid", e);
+ }
+ }
+
+ private final String getPath() {
+ return this.path;
+ }
+
+ @Override
+ public void register() throws IOException {
+ String path = getPath();
+ ServiceRecord srv = new ServiceRecord();
+ srv.addInternalEndpoint(getRpcEndpoint());
+ srv.addInternalEndpoint(getShuffleEndpoint());
+ srv.addExternalEndpoint(getServicesEndpoint());
+
+ for (Map.Entry kv : this.conf) {
+ if (kv.getKey().startsWith(LlapConfiguration.LLAP_DAEMON_PREFIX)
+ || kv.getKey().startsWith("hive.llap.")) {
+ // TODO: read this somewhere useful, like the task scheduler
+ srv.set(kv.getKey(), kv.getValue());
+ }
+ }
+
+ // restart sensitive instance id
+ srv.set(UNIQUE_IDENTIFIER, uniq.toString());
+
+ client.mknode(RegistryPathUtils.parentOf(path), true);
+
+ // FIXME: YARN registry needs to expose Ephemeral_Seq nodes & return the paths
+ client.zkCreate(path, CreateMode.EPHEMERAL_SEQUENTIAL, encoder.toBytes(srv),
+ client.getClientAcls());
+ }
+
+ @Override
+ public void unregister() throws IOException {
+ // Nothing for the zkCreate models
+ }
+
+ private class DynamicServiceInstance implements ServiceInstance {
+
+ private final ServiceRecord srv;
+ private boolean alive = true;
+ private final String host;
+ private final int rpcPort;
+ private final int shufflePort;
+
+ public DynamicServiceInstance(ServiceRecord srv) throws IOException {
+ this.srv = srv;
+
+ final Endpoint shuffle = srv.getInternalEndpoint("shuffle");
+ final Endpoint rpc = srv.getInternalEndpoint("llap");
+
+ this.host =
+ RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
+ AddressTypes.ADDRESS_HOSTNAME_FIELD);
+ this.rpcPort =
+ Integer.valueOf(RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
+ AddressTypes.ADDRESS_PORT_FIELD));
+ this.shufflePort =
+ Integer.valueOf(RegistryTypeUtils.getAddressField(shuffle.addresses.get(0),
+ AddressTypes.ADDRESS_PORT_FIELD));
+ }
+
+ @Override
+ public String getWorkerIdentity() {
+ return srv.get(UNIQUE_IDENTIFIER);
+ }
+
+ @Override
+ public String getHost() {
+ return host;
+ }
+
+ @Override
+ public int getRpcPort() {
+ return rpcPort;
+ }
+
+ @Override
+ public int getShufflePort() {
+ return shufflePort;
+ }
+
+ @Override
+ public boolean isAlive() {
+ return alive ;
+ }
+
+ public void kill() {
+ // May be possible to generate a notification back to the scheduler from here.
+ LOG.info("Killing service instance: " + this);
+ this.alive = false;
+ }
+
+ @Override
+ public Map getProperties() {
+ return srv.attributes();
+ }
+
+ @Override
+ public Resource getResource() {
+ int memory = Integer.valueOf(srv.get(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB));
+ int vCores = Integer.valueOf(srv.get(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS));
+ return Resource.newInstance(memory, vCores);
+ }
+
+ @Override
+ public String toString() {
+ return "DynamicServiceInstance [alive=" + alive + ", host=" + host + ":" + rpcPort + " with resources=" + getResource() +"]";
+ }
+
+ // Relying on the identity hashCode and equality, since refreshing instances retains the old copy
+ // of an already known instance.
+ }
+
+ private class DynamicServiceInstanceSet implements ServiceInstanceSet {
+
+ // LinkedHashMap to retain iteration order.
+ private final Map instances = new LinkedHashMap<>();
+
+ @Override
+ public synchronized Map getAll() {
+ // Return a copy. Instances may be modified during a refresh.
+ return new LinkedHashMap<>(instances);
+ }
+
+ @Override
+ public synchronized ServiceInstance getInstance(String name) {
+ return instances.get(name);
+ }
+
+ @Override
+ public void refresh() throws IOException {
+ /* call this from wherever */
+ Map freshInstances = new HashMap();
+
+ String path = getPath();
+ Map records =
+ RegistryUtils.listServiceRecords(client, RegistryPathUtils.parentOf(path));
+ // Synchronize after reading the service records from the external service (ZK)
+ synchronized (this) {
+ Set latestKeys = new HashSet();
+ LOG.info("Starting to refresh ServiceInstanceSet " + System.identityHashCode(this));
+ for (ServiceRecord rec : records.values()) {
+ ServiceInstance instance = new DynamicServiceInstance(rec);
+ if (instance != null) {
+ if (instances != null && instances.containsKey(instance.getWorkerIdentity()) == false) {
+ // add a new object
+ freshInstances.put(instance.getWorkerIdentity(), instance);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Adding new worker " + instance.getWorkerIdentity() + " which mapped to "
+ + instance);
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Retaining running worker " + instance.getWorkerIdentity() +
+ " which mapped to " + instance);
+ }
+ }
+ }
+ latestKeys.add(instance.getWorkerIdentity());
+ }
+
+ if (instances != null) {
+ // deep-copy before modifying
+ Set oldKeys = new HashSet<>(instances.keySet());
+ if (oldKeys.removeAll(latestKeys)) {
+ // This is all the records which have not checked in, and are effectively dead.
+ for (String k : oldKeys) {
+ // this is so that people can hold onto ServiceInstance references as placeholders for tasks
+ final DynamicServiceInstance dead = (DynamicServiceInstance) instances.get(k);
+ dead.kill();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Deleting dead worker " + k + " which mapped to " + dead);
+ }
+ }
+ }
+ // oldKeys contains the set of dead instances at this point.
+ this.instances.keySet().removeAll(oldKeys);
+ this.instances.putAll(freshInstances);
+ } else {
+ this.instances.putAll(freshInstances);
+ }
+ }
+ }
+
+ @Override
+ public synchronized Set getByHost(String host) {
+ // TODO Maybe store this as a map which is populated during construction, to avoid walking
+ // the map on each request.
+ Set byHost = new HashSet();
+
+ for (ServiceInstance i : instances.values()) {
+ if (host.equals(i.getHost())) {
+ // all hosts in instances should be alive in this impl
+ byHost.add(i);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Locality comparing " + host + " to " + i.getHost());
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Returning " + byHost.size() + " hosts for locality allocation on " + host);
+ }
+ return byHost;
+ }
+ }
+
+ @Override
+ public ServiceInstanceSet getInstances(String component) throws IOException {
+ Preconditions.checkArgument("LLAP".equals(component)); // right now there is only 1 component
+ if (this.client != null) {
+ instances.refresh();
+ return instances;
+ } else {
+ Preconditions.checkNotNull(this.client, "Yarn registry client is not intialized");
+ return null;
+ }
+ }
+
+ @Override
+ public void start() {
+ if (client == null) return;
+ client.start();
+ if (isDaemon) return;
+ refresher.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ instances.refresh();
+ } catch (IOException ioe) {
+ LOG.warn("Could not refresh hosts during scheduled refresh", ioe);
+ }
+ }
+ }, 0, refreshDelay, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void stop() {
+ if (client != null) {
+ client.stop();
+ }
+ }
+}
diff --git llap-server/pom.xml llap-server/pom.xml
index 42e53b6..c372c35 100644
--- llap-server/pom.xml
+++ llap-server/pom.xml
@@ -103,12 +103,6 @@
true
- org.apache.hadoop
- hadoop-yarn-registry
- ${hadoop.version}
- true
-
-
org.apache.tez
tez-runtime-internals
${tez.version}
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 6f75001..98b1ccd 100644
--- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -31,7 +31,7 @@
import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler;
-import org.apache.hadoop.hive.llap.daemon.registry.impl.LlapRegistryService;
+import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstance.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstance.java
deleted file mode 100644
index f0f22aa..0000000
--- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstance.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.llap.daemon.registry;
-
-import java.util.Map;
-
-import org.apache.hadoop.yarn.api.records.Resource;
-
-public interface ServiceInstance {
-
- /**
- * Worker identity is a UUID (unique across restarts), to identify a node which died & was brought
- * back on the same host/port
- */
- public String getWorkerIdentity();
-
- /**
- * Hostname of the service instance
- *
- * @return
- */
- public String getHost();
-
- /**
- * RPC Endpoint for service instance
- *
- * @return
- */
- public int getRpcPort();
-
- /**
- * Shuffle Endpoint for service instance
- *
- * @return
- */
- public int getShufflePort();
-
- /**
- * Return the last known state (without refreshing)
- *
- * @return
- */
-
- public boolean isAlive();
-
- /**
- * Config properties of the Service Instance (llap.daemon.*)
- *
- * @return
- */
-
- public Map getProperties();
-
- /**
- * Memory and Executors available for the LLAP tasks
- *
- * This does not include the size of the cache or the actual vCores allocated via Slider.
- *
- * @return
- */
- public Resource getResource();
-}
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstanceSet.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstanceSet.java
deleted file mode 100644
index 7ab36d4..0000000
--- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceInstanceSet.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.llap.daemon.registry;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
-
-public interface ServiceInstanceSet {
-
- /**
- * Get an instance mapping which map worker identity to each instance.
- *
- * The worker identity does not collide between restarts, so each restart will have a unique id,
- * while having the same host/ip pair.
- *
- * @return
- */
- public Map getAll();
-
- /**
- * Get an instance by worker identity.
- *
- * @param name
- * @return
- */
- public ServiceInstance getInstance(String name);
-
- /**
- * Get a list of service instances for a given host.
- *
- * The list could include dead and alive instances.
- *
- * @param host
- * @return
- */
- public Set getByHost(String host);
-
- /**
- * Refresh the instance set from registry backing store.
- *
- * @throws IOException
- */
- public void refresh() throws IOException;
-
-}
\ No newline at end of file
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceRegistry.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceRegistry.java
deleted file mode 100644
index a0f9aac..0000000
--- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/ServiceRegistry.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.llap.daemon.registry;
-
-import java.io.IOException;
-
-/**
- * ServiceRegistry interface for switching between fixed host and dynamic registry implementations.
- */
-public interface ServiceRegistry {
-
- /**
- * Start the service registry
- *
- * @throws InterruptedException
- */
- public void start() throws InterruptedException;
-
- /**
- * Stop the service registry
- *
- * @throws InterruptedException
- */
- public void stop() throws InterruptedException;
-
- /**
- * Register the current instance - the implementation takes care of the endpoints to register.
- *
- * @throws IOException
- */
- public void register() throws IOException;
-
- /**
- * Remove the current registration cleanly (implementation defined cleanup)
- *
- * @throws IOException
- */
- public void unregister() throws IOException;
-
- /**
- * Client API to get the list of instances registered via the current registry key.
- *
- * @param component
- * @return
- * @throws IOException
- */
- public ServiceInstanceSet getInstances(String component) throws IOException;
-}
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java
deleted file mode 100644
index 57aa1e7..0000000
--- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapFixedRegistryImpl.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.llap.daemon.registry.impl;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
-import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstance;
-import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstanceSet;
-import org.apache.hadoop.hive.llap.daemon.registry.ServiceRegistry;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.log4j.Logger;
-
-public class LlapFixedRegistryImpl implements ServiceRegistry {
-
- private static final Logger LOG = Logger.getLogger(LlapFixedRegistryImpl.class);
-
- @InterfaceAudience.Private
- // This is primarily for testing to avoid the host lookup
- public static final String FIXED_REGISTRY_RESOLVE_HOST_NAMES = "fixed.registry.resolve.host.names";
-
- private final int port;
- private final int shuffle;
- private final String[] hosts;
- private final int memory;
- private final int vcores;
- private final boolean resolveHosts;
-
- private final Map srv = new HashMap();
-
- public LlapFixedRegistryImpl(String hosts, Configuration conf) {
- this.hosts = hosts.split(",");
- this.port =
- conf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT,
- LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT);
- this.shuffle =
- conf.getInt(LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT,
- LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT);
- this.resolveHosts = conf.getBoolean(FIXED_REGISTRY_RESOLVE_HOST_NAMES, true);
-
- for (Map.Entry kv : conf) {
- if (kv.getKey().startsWith(LlapConfiguration.LLAP_DAEMON_PREFIX)
- || kv.getKey().startsWith("hive.llap.")
- || kv.getKey().startsWith(LlapConfiguration.LLAP_PREFIX)) {
- // TODO: read this somewhere useful, like the task scheduler
- srv.put(kv.getKey(), kv.getValue());
- }
- }
-
- this.memory =
- conf.getInt(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB,
- LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT);
- this.vcores =
- conf.getInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS,
- LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT);
- }
-
- @Override
- public void start() throws InterruptedException {
- // nothing to start
- }
-
- @Override
- public void stop() throws InterruptedException {
- // nothing to stop
- }
-
- @Override
- public void register() throws IOException {
- // nothing to register
- }
-
- @Override
- public void unregister() throws IOException {
- // nothing to unregister
- }
-
- public static String getWorkerIdentity(String host) {
- // trigger clean errors for anyone who mixes up identity with hosts
- return "host-" + host;
- }
-
- private final class FixedServiceInstance implements ServiceInstance {
-
- private final String host;
-
- public FixedServiceInstance(String host) {
- if (resolveHosts) {
- try {
- InetAddress inetAddress = InetAddress.getByName(host);
- if (NetUtils.isLocalAddress(inetAddress)) {
- InetSocketAddress socketAddress = new InetSocketAddress(0);
- socketAddress = NetUtils.getConnectAddress(socketAddress);
- LOG.info("Adding host identified as local: " + host + " as "
- + socketAddress.getHostName());
- host = socketAddress.getHostName();
- }
- } catch (UnknownHostException e) {
- LOG.warn("Ignoring resolution issues for host: " + host, e);
- }
- }
- this.host = host;
- }
-
- @Override
- public String getWorkerIdentity() {
- return LlapFixedRegistryImpl.getWorkerIdentity(host);
- }
-
- @Override
- public String getHost() {
- return host;
- }
-
- @Override
- public int getRpcPort() {
- // TODO: allow >1 port per host?
- return LlapFixedRegistryImpl.this.port;
- }
-
- @Override
- public int getShufflePort() {
- return LlapFixedRegistryImpl.this.shuffle;
- }
-
- @Override
- public boolean isAlive() {
- return true;
- }
-
- @Override
- public Map getProperties() {
- Map properties = new HashMap<>(srv);
- // no worker identity
- return properties;
- }
-
- @Override
- public Resource getResource() {
- return Resource.newInstance(memory, vcores);
- }
-
- @Override
- public String toString() {
- return "FixedServiceInstance{" +
- "host=" + host +
- ", memory=" + memory +
- ", vcores=" + vcores +
- '}';
- }
- }
-
- private final class FixedServiceInstanceSet implements ServiceInstanceSet {
-
- private final Map instances = new HashMap();
-
- public FixedServiceInstanceSet() {
- for (String host : hosts) {
- // trigger bugs in anyone who uses this as a hostname
- instances.put(getWorkerIdentity(host), new FixedServiceInstance(host));
- }
- }
-
- @Override
- public Map getAll() {
- return instances;
- }
-
- @Override
- public ServiceInstance getInstance(String name) {
- return instances.get(name);
- }
-
- @Override
- public Set getByHost(String host) {
- Set byHost = new HashSet();
- ServiceInstance inst = getInstance(getWorkerIdentity(host));
- if (inst != null) {
- byHost.add(inst);
- }
- return byHost;
- }
-
- @Override
- public void refresh() throws IOException {
- // I will do no such thing
- }
-
- }
-
- @Override
- public ServiceInstanceSet getInstances(String component) throws IOException {
- return new FixedServiceInstanceSet();
- }
-
- @Override
- public String toString() {
- return String.format("FixedRegistry hosts=%s", StringUtils.join(",", this.hosts));
- }
-}
\ No newline at end of file
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java
deleted file mode 100644
index d3647d0..0000000
--- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.llap.daemon.registry.impl;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
-import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstanceSet;
-import org.apache.hadoop.hive.llap.daemon.registry.ServiceRegistry;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.log4j.Logger;
-
-public class LlapRegistryService extends AbstractService {
-
- private static final Logger LOG = Logger.getLogger(LlapRegistryService.class);
-
- private ServiceRegistry registry = null;
- private final boolean isDaemon;
-
- public LlapRegistryService(boolean isDaemon) {
- super("LlapRegistryService");
- this.isDaemon = isDaemon;
- }
-
- @Override
- public void serviceInit(Configuration conf) {
- String hosts = conf.getTrimmed(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS);
- if (hosts.startsWith("@")) {
- registry = new LlapYarnRegistryImpl(hosts.substring(1), conf, isDaemon);
- } else {
- registry = new LlapFixedRegistryImpl(hosts, conf);
- }
- LOG.info("Using LLAP registry type " + registry);
- }
-
-
- @Override
- public void serviceStart() throws Exception {
- if (this.registry != null) {
- this.registry.start();
- }
- if (isDaemon) {
- registerWorker();
- }
- }
-
- @Override
- public void serviceStop() throws Exception {
- if (isDaemon) {
- unregisterWorker();
- }
- if (this.registry != null) {
- this.registry.stop();
- } else {
- LOG.warn("Stopping non-existent registry service");
- }
- }
-
- private void registerWorker() throws IOException {
- if (this.registry != null) {
- this.registry.register();
- }
- }
-
- private void unregisterWorker() throws IOException {
- if (this.registry != null) {
- this.registry.unregister();
- }
- }
-
- public ServiceInstanceSet getInstances() throws IOException {
- return this.registry.getInstances("LLAP");
- }
-}
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapYarnRegistryImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapYarnRegistryImpl.java
deleted file mode 100644
index cb1b1d0..0000000
--- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapYarnRegistryImpl.java
+++ /dev/null
@@ -1,383 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.llap.daemon.registry.impl;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.MalformedURLException;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.net.UnknownHostException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
-import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstance;
-import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstanceSet;
-import org.apache.hadoop.hive.llap.daemon.registry.ServiceRegistry;
-import org.apache.hadoop.registry.client.api.RegistryOperationsFactory;
-import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
-import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
-import org.apache.hadoop.registry.client.binding.RegistryUtils;
-import org.apache.hadoop.registry.client.binding.RegistryUtils.ServiceRecordMarshal;
-import org.apache.hadoop.registry.client.impl.zk.RegistryOperationsService;
-import org.apache.hadoop.registry.client.types.AddressTypes;
-import org.apache.hadoop.registry.client.types.Endpoint;
-import org.apache.hadoop.registry.client.types.ProtocolTypes;
-import org.apache.hadoop.registry.client.types.ServiceRecord;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.log4j.Logger;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.zookeeper.CreateMode;
-
-import com.google.common.base.Preconditions;
-
-public class LlapYarnRegistryImpl implements ServiceRegistry {
-
- private static final Logger LOG = Logger.getLogger(LlapYarnRegistryImpl.class);
-
- private final RegistryOperationsService client;
- private final Configuration conf;
- private final ServiceRecordMarshal encoder;
- private final String path;
-
- private final DynamicServiceInstanceSet instances = new DynamicServiceInstanceSet();
-
- private static final UUID uniq = UUID.randomUUID();
- private static final String hostname;
-
- private static final String UNIQUE_IDENTIFIER = "llap.unique.id";
-
- private final static String SERVICE_CLASS = "org-apache-hive";
-
- final ScheduledExecutorService refresher = Executors.newScheduledThreadPool(1,
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapYarnRegistryRefresher").build());
- final long refreshDelay;
- private final boolean isDaemon;
-
- static {
- String localhost = "localhost";
- try {
- localhost = InetAddress.getLocalHost().getCanonicalHostName();
- } catch (UnknownHostException uhe) {
- // ignore
- }
- hostname = localhost;
- }
-
- public LlapYarnRegistryImpl(String instanceName, Configuration conf, boolean isDaemon) {
-
- LOG.info("Llap Registry is enabled with registryid: " + instanceName);
- this.conf = new Configuration(conf);
- conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
- // registry reference
- client = (RegistryOperationsService) RegistryOperationsFactory.createInstance(conf);
- encoder = new RegistryUtils.ServiceRecordMarshal();
- this.path = RegistryPathUtils.join(RegistryUtils.componentPath(RegistryUtils.currentUser(),
- SERVICE_CLASS, instanceName, "workers"), "worker-");
- refreshDelay =
- conf.getInt(LlapConfiguration.LLAP_DAEMON_SERVICE_REFRESH_INTERVAL,
- LlapConfiguration.LLAP_DAEMON_SERVICE_REFRESH_INTERVAL_DEFAULT);
- this.isDaemon = isDaemon;
- Preconditions.checkArgument(refreshDelay > 0,
- "Refresh delay for registry has to be positive = %d", refreshDelay);
- }
-
- public Endpoint getRpcEndpoint() {
- final int rpcPort =
- conf.getInt(LlapConfiguration.LLAP_DAEMON_RPC_PORT,
- LlapConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT);
- return RegistryTypeUtils.ipcEndpoint("llap", new InetSocketAddress(hostname, rpcPort));
- }
-
- public Endpoint getShuffleEndpoint() {
- final int shufflePort =
- conf.getInt(LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT,
- LlapConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT);
- // HTTP today, but might not be
- return RegistryTypeUtils.inetAddrEndpoint("shuffle", ProtocolTypes.PROTOCOL_TCP, hostname,
- shufflePort);
- }
-
- public Endpoint getServicesEndpoint() {
- final int servicePort =
- conf.getInt(LlapConfiguration.LLAP_DAEMON_SERVICE_PORT,
- LlapConfiguration.LLAP_DAEMON_SERVICE_PORT_DEFAULT);
- final boolean isSSL =
- conf.getBoolean(LlapConfiguration.LLAP_DAEMON_SERVICE_SSL,
- LlapConfiguration.LLAP_DAEMON_SERVICE_SSL_DEFAULT);
- final String scheme = isSSL ? "https" : "http";
- final URL serviceURL;
- try {
- serviceURL = new URL(scheme, hostname, servicePort, "");
- return RegistryTypeUtils.webEndpoint("services", serviceURL.toURI());
- } catch (MalformedURLException e) {
- throw new TezUncheckedException(e);
- } catch (URISyntaxException e) {
- throw new TezUncheckedException("llap service URI for " + hostname + " is invalid", e);
- }
- }
-
- private final String getPath() {
- return this.path;
- }
-
- @Override
- public void register() throws IOException {
- String path = getPath();
- ServiceRecord srv = new ServiceRecord();
- srv.addInternalEndpoint(getRpcEndpoint());
- srv.addInternalEndpoint(getShuffleEndpoint());
- srv.addExternalEndpoint(getServicesEndpoint());
-
- for (Map.Entry kv : this.conf) {
- if (kv.getKey().startsWith(LlapConfiguration.LLAP_DAEMON_PREFIX)
- || kv.getKey().startsWith("hive.llap.")) {
- // TODO: read this somewhere useful, like the task scheduler
- srv.set(kv.getKey(), kv.getValue());
- }
- }
-
- // restart sensitive instance id
- srv.set(UNIQUE_IDENTIFIER, uniq.toString());
-
- client.mknode(RegistryPathUtils.parentOf(path), true);
-
- // FIXME: YARN registry needs to expose Ephemeral_Seq nodes & return the paths
- client.zkCreate(path, CreateMode.EPHEMERAL_SEQUENTIAL, encoder.toBytes(srv),
- client.getClientAcls());
- }
-
- @Override
- public void unregister() throws IOException {
- // Nothing for the zkCreate models
- }
-
- private class DynamicServiceInstance implements ServiceInstance {
-
- private final ServiceRecord srv;
- private boolean alive = true;
- private final String host;
- private final int rpcPort;
- private final int shufflePort;
-
- public DynamicServiceInstance(ServiceRecord srv) throws IOException {
- this.srv = srv;
-
- final Endpoint shuffle = srv.getInternalEndpoint("shuffle");
- final Endpoint rpc = srv.getInternalEndpoint("llap");
-
- this.host =
- RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
- AddressTypes.ADDRESS_HOSTNAME_FIELD);
- this.rpcPort =
- Integer.valueOf(RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
- AddressTypes.ADDRESS_PORT_FIELD));
- this.shufflePort =
- Integer.valueOf(RegistryTypeUtils.getAddressField(shuffle.addresses.get(0),
- AddressTypes.ADDRESS_PORT_FIELD));
- }
-
- @Override
- public String getWorkerIdentity() {
- return srv.get(UNIQUE_IDENTIFIER);
- }
-
- @Override
- public String getHost() {
- return host;
- }
-
- @Override
- public int getRpcPort() {
- return rpcPort;
- }
-
- @Override
- public int getShufflePort() {
- return shufflePort;
- }
-
- @Override
- public boolean isAlive() {
- return alive ;
- }
-
- public void kill() {
- // May be possible to generate a notification back to the scheduler from here.
- LOG.info("Killing service instance: " + this);
- this.alive = false;
- }
-
- @Override
- public Map getProperties() {
- return srv.attributes();
- }
-
- @Override
- public Resource getResource() {
- int memory = Integer.valueOf(srv.get(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB));
- int vCores = Integer.valueOf(srv.get(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS));
- return Resource.newInstance(memory, vCores);
- }
-
- @Override
- public String toString() {
- return "DynamicServiceInstance [alive=" + alive + ", host=" + host + ":" + rpcPort + " with resources=" + getResource() +"]";
- }
-
- // Relying on the identity hashCode and equality, since refreshing instances retains the old copy
- // of an already known instance.
- }
-
- private class DynamicServiceInstanceSet implements ServiceInstanceSet {
-
- // LinkedHashMap to retain iteration order.
- private final Map instances = new LinkedHashMap<>();
-
- @Override
- public synchronized Map getAll() {
- // Return a copy. Instances may be modified during a refresh.
- return new LinkedHashMap<>(instances);
- }
-
- @Override
- public synchronized ServiceInstance getInstance(String name) {
- return instances.get(name);
- }
-
- @Override
- public void refresh() throws IOException {
- /* call this from wherever */
- Map freshInstances = new HashMap();
-
- String path = getPath();
- Map records =
- RegistryUtils.listServiceRecords(client, RegistryPathUtils.parentOf(path));
- // Synchronize after reading the service records from the external service (ZK)
- synchronized (this) {
- Set latestKeys = new HashSet();
- LOG.info("Starting to refresh ServiceInstanceSet " + System.identityHashCode(this));
- for (ServiceRecord rec : records.values()) {
- ServiceInstance instance = new DynamicServiceInstance(rec);
- if (instance != null) {
- if (instances != null && instances.containsKey(instance.getWorkerIdentity()) == false) {
- // add a new object
- freshInstances.put(instance.getWorkerIdentity(), instance);
- if (LOG.isInfoEnabled()) {
- LOG.info("Adding new worker " + instance.getWorkerIdentity() + " which mapped to "
- + instance);
- }
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Retaining running worker " + instance.getWorkerIdentity() +
- " which mapped to " + instance);
- }
- }
- }
- latestKeys.add(instance.getWorkerIdentity());
- }
-
- if (instances != null) {
- // deep-copy before modifying
- Set oldKeys = new HashSet<>(instances.keySet());
- if (oldKeys.removeAll(latestKeys)) {
- // This is all the records which have not checked in, and are effectively dead.
- for (String k : oldKeys) {
- // this is so that people can hold onto ServiceInstance references as placeholders for tasks
- final DynamicServiceInstance dead = (DynamicServiceInstance) instances.get(k);
- dead.kill();
- if (LOG.isInfoEnabled()) {
- LOG.info("Deleting dead worker " + k + " which mapped to " + dead);
- }
- }
- }
- // oldKeys contains the set of dead instances at this point.
- this.instances.keySet().removeAll(oldKeys);
- this.instances.putAll(freshInstances);
- } else {
- this.instances.putAll(freshInstances);
- }
- }
- }
-
- @Override
- public synchronized Set getByHost(String host) {
- // TODO Maybe store this as a map which is populated during construction, to avoid walking
- // the map on each request.
- Set byHost = new HashSet();
-
- for (ServiceInstance i : instances.values()) {
- if (host.equals(i.getHost())) {
- // all hosts in instances should be alive in this impl
- byHost.add(i);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Locality comparing " + host + " to " + i.getHost());
- }
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Returning " + byHost.size() + " hosts for locality allocation on " + host);
- }
- return byHost;
- }
- }
-
- @Override
- public ServiceInstanceSet getInstances(String component) throws IOException {
- Preconditions.checkArgument("LLAP".equals(component)); // right now there is only 1 component
- if (this.client != null) {
- instances.refresh();
- return instances;
- } else {
- Preconditions.checkNotNull(this.client, "Yarn registry client is not intialized");
- return null;
- }
- }
-
- @Override
- public void start() {
- if (client == null) return;
- client.start();
- if (isDaemon) return;
- refresher.scheduleWithFixedDelay(new Runnable() {
- @Override
- public void run() {
- try {
- instances.refresh();
- } catch (IOException ioe) {
- LOG.warn("Could not refresh hosts during scheduled refresh", ioe);
- }
- }
- }, 0, refreshDelay, TimeUnit.SECONDS);
- }
-
- @Override
- public void stop() {
- if (client != null) {
- client.stop();
- }
- }
-}
diff --git llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
index 7fb9a99..6fd01f9 100644
--- llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
+++ llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
@@ -51,9 +51,9 @@
import com.google.common.util.concurrent.Futures;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
-import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstance;
-import org.apache.hadoop.hive.llap.daemon.registry.ServiceInstanceSet;
-import org.apache.hadoop.hive.llap.daemon.registry.impl.LlapRegistryService;
+import org.apache.hadoop.hive.llap.registry.ServiceInstance;
+import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
+import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
diff --git llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
index 2f93266..23724a4 100644
--- llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
+++ llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
@@ -34,7 +34,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
-import org.apache.hadoop.hive.llap.daemon.registry.impl.LlapFixedRegistryImpl;
+import org.apache.hadoop.hive.llap.registry.impl.LlapFixedRegistryImpl;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;