diff --git llap-server/pom.xml llap-server/pom.xml
index 7b17f17..22ed693 100644
--- llap-server/pom.xml
+++ llap-server/pom.xml
@@ -165,6 +165,12 @@
test
+ org.apache.hadoop
+ hadoop-yarn-registry
+ ${hadoop-23.version}
+ true
+
+
org.apache.tez
tez-runtime-internals
${tez.version}
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonConfiguration.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonConfiguration.java
index 2f405d9..0554c32 100644
--- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonConfiguration.java
+++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonConfiguration.java
@@ -29,7 +29,7 @@ public LlapDaemonConfiguration() {
}
- private static final String LLAP_DAEMON_PREFIX = "llap.daemon.";
+ public static final String LLAP_DAEMON_PREFIX = "llap.daemon.";
private static final String LLAP_DAEMON_SITE = "llap-daemon-site.xml";
@@ -40,6 +40,7 @@ public LlapDaemonConfiguration() {
public static final String LLAP_DAEMON_WORK_DIRS = LLAP_DAEMON_PREFIX + "work.dirs";
public static final String LLAP_DAEMON_YARN_SHUFFLE_PORT = LLAP_DAEMON_PREFIX + "yarn.shuffle.port";
+ public static final int LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT = 15551;
// Section for configs used in AM and executors
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 2bf2ed9..0ff255c 100644
--- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -26,6 +26,7 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
import org.apache.hadoop.hive.llap.daemon.LlapDaemonConfiguration;
+import org.apache.hadoop.hive.llap.daemon.registry.impl.LlapRegistryService;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.io.api.LlapIoProxy;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
@@ -49,6 +50,7 @@
private final Configuration shuffleHandlerConf;
private final LlapDaemonProtocolServerImpl server;
private final ContainerRunnerImpl containerRunner;
+ private final LlapRegistryService registry;
private final AtomicLong numSubmissions = new AtomicLong(0);
private JvmPauseMonitor pauseMonitor;
private final ObjectName llapDaemonInfoBean;
@@ -128,6 +130,8 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor
this.containerRunner = new ContainerRunnerImpl(numExecutors, localDirs, shufflePort, address,
executorMemoryBytes, metrics);
+
+ this.registry = new LlapRegistryService();
}
private void printAsciiArt() {
@@ -148,6 +152,7 @@ private void printAsciiArt() {
public void serviceInit(Configuration conf) {
server.init(conf);
containerRunner.init(conf);
+ registry.init(conf);
LlapIoProxy.setDaemon(true);
LlapIoProxy.initializeLlapIo(conf);
}
@@ -157,6 +162,8 @@ public void serviceStart() throws Exception {
ShuffleHandler.initializeAndStart(shuffleHandlerConf);
server.start();
containerRunner.start();
+ registry.start();
+ registry.registerWorker();
}
public void serviceStop() throws Exception {
@@ -164,6 +171,8 @@ public void serviceStop() throws Exception {
shutdown();
containerRunner.stop();
server.stop();
+ registry.unregisterWorker();
+ registry.stop();
ShuffleHandler.shutdown();
}
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java
new file mode 100644
index 0000000..33cd7eb
--- /dev/null
+++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/registry/impl/LlapRegistryService.java
@@ -0,0 +1,147 @@
+package org.apache.hadoop.hive.llap.daemon.registry.impl;
+
+import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.daemon.LlapDaemonConfiguration;
+import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemon;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.registry.client.api.RegistryOperationsFactory;
+import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
+import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
+import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import org.apache.hadoop.registry.client.binding.RegistryUtils.ServiceRecordMarshal;
+import org.apache.hadoop.registry.client.impl.zk.RegistryOperationsService;
+import org.apache.hadoop.registry.client.types.Endpoint;
+import org.apache.hadoop.registry.client.types.ProtocolTypes;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+
+import com.google.common.base.Preconditions;
+
+public class LlapRegistryService extends AbstractService {
+
+ private static final Logger LOG = Logger.getLogger(LlapRegistryService.class);
+
+ public final static String SERVICE_CLASS = "org-apache-hive";
+
+ private RegistryOperationsService client;
+ private String instanceName;
+ private Configuration conf;
+ private ServiceRecordMarshal encoder;
+
+ private static final String hostname;
+
+ static {
+ String localhost = "localhost";
+ try {
+ localhost = InetAddress.getLocalHost().getCanonicalHostName();
+ } catch (UnknownHostException uhe) {
+ // ignore
+ }
+ hostname = localhost;
+ }
+
+ public LlapRegistryService() {
+ super("LlapRegistryService");
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) {
+ String registryId = conf.getTrimmed(LlapDaemonConfiguration.LLAP_DAEMON_SERVICE_HOSTS);
+ if (registryId.startsWith("@")) {
+ LOG.info("Llap Registry is enabled with registryid: " + registryId);
+ this.conf = new Configuration(conf);
+ conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
+ // registry reference
+ instanceName = registryId.substring(1);
+ client = (RegistryOperationsService) RegistryOperationsFactory.createInstance(conf);
+ encoder = new RegistryUtils.ServiceRecordMarshal();
+
+ } else {
+ LOG.info("Llap Registry is disabled");
+ }
+ }
+
+ @Override
+ public void serviceStart() throws Exception {
+ if (client != null) {
+ client.start();
+ }
+ }
+
+ public void serviceStop() throws Exception {
+ if (client != null) {
+ client.stop();
+ }
+ }
+
+ public Endpoint getRpcEndpoint() {
+ final int rpcPort =
+ conf.getInt(LlapDaemonConfiguration.LLAP_DAEMON_RPC_PORT,
+ LlapDaemonConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT);
+
+ return RegistryTypeUtils.ipcEndpoint("llap", new InetSocketAddress(hostname, rpcPort));
+ }
+
+ public Endpoint getShuffleEndpoint() {
+ final int shufflePort =
+ conf.getInt(LlapDaemonConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT,
+ LlapDaemonConfiguration.LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT);
+ // HTTP today, but might not be
+ return RegistryTypeUtils.inetAddrEndpoint("shuffle", ProtocolTypes.PROTOCOL_TCP, hostname,
+ shufflePort);
+ }
+
+ private final String getPath() {
+ return RegistryPathUtils.join(RegistryUtils.componentPath(RegistryUtils.currentUser(),
+ SERVICE_CLASS, instanceName, "workers"), "worker-");
+ }
+
+ public void registerWorker() throws IOException {
+ if (this.client != null) {
+ String path = getPath();
+ ServiceRecord srv = new ServiceRecord();
+ srv.addInternalEndpoint(getRpcEndpoint());
+ srv.addInternalEndpoint(getShuffleEndpoint());
+
+ for (Map.Entry kv : this.conf) {
+ if (kv.getKey().startsWith(LlapDaemonConfiguration.LLAP_DAEMON_PREFIX)) {
+ // TODO: read this somewhere useful, like the allocator
+ srv.set(kv.getKey(), kv.getValue());
+ }
+ }
+
+ client.mknode(RegistryPathUtils.parentOf(path), true);
+
+ // FIXME: YARN registry needs to expose Ephemeral_Seq nodes & return the paths
+ client.zkCreate(path, CreateMode.EPHEMERAL_SEQUENTIAL, encoder.toBytes(srv),
+ client.getClientAcls());
+ }
+ }
+
+ public void unregisterWorker() {
+ if (this.client != null) {
+ // with ephemeral nodes, there's nothing to do here
+ // because the create didn't return paths
+ }
+ }
+
+ public Map getWorkers() throws IOException {
+ if (this.client != null) {
+ String path = getPath();
+ return RegistryUtils.listServiceRecords(client, RegistryPathUtils.parentOf(path));
+ } else {
+ Preconditions.checkNotNull(this.client, "Yarn registry client is not intialized");
+ return null;
+ }
+ }
+}
diff --git llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
index 0af5ea4..fc8df6b 100644
--- llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
+++ llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
@@ -19,6 +19,7 @@
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -28,12 +29,17 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
+import org.apache.hadoop.registry.client.types.AddressTypes;
+import org.apache.hadoop.registry.client.types.Endpoint;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@@ -46,7 +52,9 @@
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.hive.llap.daemon.LlapDaemonConfiguration;
+import org.apache.hadoop.hive.llap.daemon.registry.impl.LlapRegistryService;
import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.app.AppContext;
@@ -78,8 +86,7 @@
// Per Executor Thread
private final Resource resourcePerExecutor;
- // TODO: replace with service registry
- private final YarnClient yc = YarnClient.createYarnClient();
+ private final LlapRegistryService registry = new LlapRegistryService();
public LlapTaskSchedulerService(TaskSchedulerAppCallback appClient, AppContext appContext,
String clientHostname, int clientPort, String trackingUrl,
@@ -109,17 +116,17 @@ public LlapTaskSchedulerService(TaskSchedulerAppCallback appClient, AppContext a
int coresPerExecutor = (int) (coresPerInstance / (float) executorsPerInstance);
this.resourcePerExecutor = Resource.newInstance(memoryPerExecutor, coresPerExecutor);
- String[] hosts = conf.getTrimmedStrings(LlapDaemonConfiguration.LLAP_DAEMON_SERVICE_HOSTS);
- if (hosts == null || hosts.length == 0) {
- hosts = new String[]{"localhost"};
- serviceHosts.add("localhost");
- serviceHostSet.add("localhost");
- } else if (!hosts[0].equals("*")) {
- for (String host : hosts) {
- serviceHosts.add(host);
- serviceHostSet.add(host);
+ String instanceId = conf.getTrimmed(LlapDaemonConfiguration.LLAP_DAEMON_SERVICE_HOSTS);
+
+ if (instanceId == null || false == instanceId.startsWith("@")) {
+ String[] hosts = conf.getTrimmedStrings(LlapDaemonConfiguration.LLAP_DAEMON_SERVICE_HOSTS);
+ if (hosts == null || hosts.length == 0) {
+ hosts = new String[] { "localhost" };
+ serviceHosts.add("localhost");
+ serviceHostSet.add("localhost");
}
}
+
this.containerPort = conf.getInt(LlapDaemonConfiguration.LLAP_DAEMON_RPC_PORT,
LlapDaemonConfiguration.LLAP_DAEMON_RPC_PORT_DEFAULT);
@@ -145,27 +152,30 @@ public LlapTaskSchedulerService(TaskSchedulerAppCallback appClient, AppContext a
@Override
public void serviceInit(Configuration conf) {
- yc.init(conf);
+ registry.init(conf);
}
@Override
public void serviceStart() {
- yc.start();
+ registry.start();
if (serviceHosts.size() > 0) {
return;
}
- LOG.info("Evaluating host usage criteria for service nodes");
+ LOG.info("Reading YARN registry for service records");
try {
- List nodes = yc.getNodeReports(NodeState.RUNNING);
- for (NodeReport nd : nodes) {
- Resource used = nd.getUsed();
- LOG.info("Examining node: " + nd);
- if (nd.getNodeState() == NodeState.RUNNING
- && used.getMemory() >= memoryPerInstance) {
- // TODO: fix this with YARN registry
- serviceHosts.add(nd.getNodeId().getHost());
- serviceHostSet.add(nd.getNodeId().getHost());
+ Map workers = registry.getWorkers();
+ for (ServiceRecord srv : workers.values()) {
+ Endpoint rpc = srv.getInternalEndpoint("llap");
+ if (rpc != null) {
+ LOG.info("Examining endpoint: " + rpc);
+ final String host =
+ RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
+ AddressTypes.ADDRESS_HOSTNAME_FIELD);
+ serviceHosts.add(host);
+ serviceHostSet.add(host);
+ } else {
+ LOG.info("The SRV record was " + srv);
}
}
LOG.info("Re-inited with configuration: " +
@@ -174,10 +184,8 @@ public void serviceStart() {
", executorsPerInstance=" + executorsPerInstance +
", resourcePerInstanceInferred=" + resourcePerExecutor +
", hosts="+ serviceHosts.toString());
- } catch (IOException e) {
- e.printStackTrace();
- } catch (YarnException e) {
- e.printStackTrace();
+ } catch (IOException ioe) {
+ throw new TezUncheckedException(ioe);
}
}
diff --git llap-server/src/main/resources/llap.py llap-server/src/main/resources/llap.py
index 65c0e9a..fa3bb48 100644
--- llap-server/src/main/resources/llap.py
+++ llap-server/src/main/resources/llap.py
@@ -44,6 +44,7 @@ def start(self, env):
# location containing llap-daemon-site.xml, tez and yarn configuration xmls as well.
os.environ['LLAP_DAEMON_CONF_DIR'] = format("{app_root}/conf/")
os.environ['LLAP_DAEMON_LOG_DIR'] = format("{app_log_dir}/")
+ os.environ['LLAP_DAEMON_LOGGER'] = format("{app_log_level}")
os.environ['LLAP_DAEMON_HEAPSIZE'] = format("{memory_val}")
os.environ['LLAP_DAEMON_PID_DIR'] = dirname(format("{pid_file}"))
os.environ['LLAP_DAEMON_LD_PATH'] = format('{library_path}')
diff --git llap-server/src/main/resources/params.py llap-server/src/main/resources/params.py
index 8706d04..8aa8e3c 100644
--- llap-server/src/main/resources/params.py
+++ llap-server/src/main/resources/params.py
@@ -29,6 +29,7 @@
additional_cp = config['configurations']['global']['additional_cp']
app_log_dir = config['configurations']['global']['app_log_dir']
+app_log_level = config['configurations']['global']['app_log_level']
port = config['configurations']['global']['listen_port']
memory_val = config['configurations']['global']['memory_val']
diff --git llap-server/src/main/resources/templates.py llap-server/src/main/resources/templates.py
index 382ef17..e3599c0 100644
--- llap-server/src/main/resources/templates.py
+++ llap-server/src/main/resources/templates.py
@@ -73,6 +73,7 @@
"java_home": "%(java_home)s",
"site.global.app_user": "yarn",
"site.global.app_root": "${AGENT_WORK_ROOT}/app/install/",
+ "site.global.app_log_level": "INFO,RFA",
"site.global.additional_cp": "%(hadoop_home)s",
"site.global.library_path": "%(hadoop_home)s/lib/native",
"site.global.memory_val": "%(heap)d",
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index 1f9ff94..2fc76cf 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -179,7 +179,7 @@ public void open(HiveConf conf, String[] additionalFiles)
commonLocalResources.put(utils.getBaseName(lr), lr);
}
- if("llap".equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE))) {
+ if ("llap".equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE))) {
// add configs for llap-daemon-site.xml + localize llap jars
// they cannot be referred to directly as it would be a circular depedency
conf.addResource("llap-daemon-site.xml");
@@ -193,6 +193,17 @@ public void open(HiveConf conf, String[] additionalFiles)
} catch (ClassNotFoundException ce) {
throw new IOException("Cannot find LlapInputFormat in the classpath", ce);
}
+
+ try {
+ final File registryJar =
+ new File(Utilities.jarFinderGetJar(Class
+ .forName("org.apache.hadoop.registry.client.api.RegistryOperations")));
+ final LocalResource registryLr =
+ createJarLocalResource(registryJar.toURI().toURL().toExternalForm());
+ commonLocalResources.put(utils.getBaseName(registryLr), registryLr);
+ } catch (ClassNotFoundException ce) {
+ throw new IOException("Cannot find Hadoop Registry in the classpath", ce);
+ }
}
// Create environment for AM.
@@ -402,7 +413,6 @@ private LocalResource createJarLocalResource(String localJarPath)
return utils.localizeResource(localFile, destFile, LocalResourceType.FILE, conf);
}
-
private String getSha(Path localFile) throws IOException, IllegalArgumentException {
InputStream is = null;
try {