diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java index 5a94db9..610c0a5 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener; import org.apache.hadoop.hive.llap.registry.ServiceRegistry; +import org.apache.hadoop.registry.client.binding.RegistryUtils; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.slf4j.Logger; @@ -57,17 +58,17 @@ public static synchronized LlapRegistryService getClient(Configuration conf) { String hosts = HiveConf.getTrimmedVar(conf, HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS); Preconditions.checkNotNull(hosts, ConfVars.LLAP_DAEMON_SERVICE_HOSTS.toString() + " must be defined"); LlapRegistryService registry; - // TODO: this is not going to work with multiple users. if (hosts.startsWith("@")) { // Caching instances only in case of the YARN registry. Each host based list will get it's own copy. - String name = hosts.substring(1); - if (yarnRegistries.containsKey(name) && yarnRegistries.get(name).isInState(STATE.STARTED)) { - registry = yarnRegistries.get(name); - } else { + String appName = hosts.substring(1); + String userName = HiveConf.getVar(conf, ConfVars.LLAP_ZK_REGISTRY_USER, RegistryUtils.currentUser()); + String key = appName + "-" + userName; + registry = yarnRegistries.get(key); + if (registry == null || !registry.isInState(STATE.STARTED)) { registry = new LlapRegistryService(false); registry.init(conf); registry.start(); - yarnRegistries.put(name, registry); + yarnRegistries.put(key, registry); } } else { registry = new LlapRegistryService(false); diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index 893b7d9..bfd2def 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -14,12 +14,12 @@ package org.apache.hadoop.hive.llap.tezplugins; +import org.apache.hadoop.hive.llap.registry.ServiceInstance; import org.apache.hadoop.io.Writable; import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.TezAttemptArray; import java.io.IOException; -import java.net.URI; import java.nio.ByteBuffer; import java.util.HashSet; import java.util.Map; @@ -75,7 +75,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.apache.tez.common.TezTaskUmbilicalProtocol; import org.apache.tez.common.TezUtils; import org.apache.tez.common.security.JobTokenSecretManager; @@ -119,7 +119,8 @@ private final Token token; private final String user; private String amHost; - private URI timelineServerUri; + private String timelineServerUri; + private int nmPort; // These two structures track the list of known nodes, and the list of nodes which are sending in keep-alive heartbeats. // Primarily for debugging purposes a.t.m, since there's some unexplained TASK_TIMEOUTS which are currently being observed. @@ -149,7 +150,6 @@ public LlapTaskCommunicator( Preconditions.checkState((token != null) == UserGroupInformation.isSecurityEnabled()); // Not closing this at the moment at shutdown, since this could be a shared instance. - // TODO: this is unused. serviceRegistry = LlapRegistryService.getClient(conf); umbilical = new LlapTaskUmbilicalProtocolImpl(getUmbilical()); @@ -191,18 +191,10 @@ public void initialize() throws Exception { + "fileCleanupDelay=" + deleteDelayOnDagComplete + ", numCommunicatorThreads=" + numThreads); this.communicator.init(conf); - if (YarnConfiguration.useHttps(conf)) { - timelineServerUri = URI - .create(JOINER.join("https://", conf.get( - YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS), - RESOURCE_URI_STR)); - } else { - timelineServerUri = URI.create(JOINER.join("http://", conf.get( - YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS), - RESOURCE_URI_STR)); - } + String scheme = WebAppUtils.getHttpSchemePrefix(conf); + String ahsUrl = WebAppUtils.getAHSWebAppURLWithoutScheme(conf); + this.timelineServerUri = WebAppUtils.getURLWithScheme(scheme, ahsUrl); + this.nmPort = Integer.valueOf(WebAppUtils.getNMWebAppURLWithoutScheme(conf).split(":")[1]); } @Override @@ -540,37 +532,54 @@ public void indicateError(Throwable t) { @Override public String getInProgressLogsUrl(TezTaskAttemptID attemptID, NodeId containerNodeId) { - String url = ""; - if (timelineServerUri != null) { - LlapNodeId llapNodeId = LlapNodeId.getInstance(containerNodeId.getHost(), containerNodeId.getPort()); - BiMap biMap = entityTracker.getContainerAttemptMapForNode(llapNodeId); - ContainerId containerId = biMap.inverse().get(attemptID); - if (containerId != null) { - String dagId = attemptID.getTaskID().getVertexID().getDAGId().toString(); - String filename = currentHiveQueryId + "-" + dagId + ".log"; - // YARN-6011 provides a webservice to get the logs - url = PATH_JOINER.join(timelineServerUri.toString(), "containers", containerId.toString(), "logs", - filename); - } - } - return url; + return constructLogUrl(attemptID, containerNodeId, false); } @Override public String getCompletedLogsUrl(TezTaskAttemptID attemptID, NodeId containerNodeId) { - String url = ""; - if (timelineServerUri != null) { - LlapNodeId llapNodeId = LlapNodeId.getInstance(containerNodeId.getHost(), containerNodeId.getPort()); - BiMap biMap = entityTracker.getContainerAttemptMapForNode(llapNodeId); - ContainerId containerId = biMap.inverse().get(attemptID); - if (containerId != null) { - String dagId = attemptID.getTaskID().getVertexID().getDAGId().toString(); - String filename = currentHiveQueryId + "-" + dagId + ".log.done"; - // YARN-6011 provides a webservice to get the logs - url = PATH_JOINER.join(timelineServerUri.toString(), "containers", containerId.toString(), "logs", - filename); + return constructLogUrl(attemptID, containerNodeId, true); + } + + private String constructLogUrl(final TezTaskAttemptID attemptID, final NodeId containerNodeId, final boolean isDone) { + if (timelineServerUri == null) { + return null; + } + Set instanceSet; + try { + instanceSet = serviceRegistry.getInstances().getByHost(containerNodeId.getHost()); + } catch (IOException e) { + // Not failing the job due to a failure constructing the log url + LOG.warn( + "Unable to find instance for yarnNodeId={} to construct the log url. Exception message={}", + containerNodeId, e.getMessage()); + return null; + } + if (instanceSet != null) { + ServiceInstance matchedInstance = null; + for (ServiceInstance instance : instanceSet) { + if (instance.getRpcPort() == containerNodeId.getPort()) { + matchedInstance = instance; + break; + } + } + if (matchedInstance != null) { + String containerIdString = matchedInstance.getProperties() + .get(HiveConf.ConfVars.LLAP_DAEMON_CONTAINER_ID.varname); + if (containerIdString != null) { + return constructLlapLogUrl(attemptID, containerIdString, isDone, containerNodeId.getHost()); + } } } + return null; + } + + private String constructLlapLogUrl(final TezTaskAttemptID attemptID, final String containerIdString, + final boolean isDone, final String nmHost) { + String dagId = attemptID.getTaskID().getVertexID().getDAGId().toString(); + String filename = JOINER.join(currentHiveQueryId, "-", dagId, ".log", (isDone ? ".done" : ""), + "?nm.id=", nmHost, ":", nmPort); + String url = PATH_JOINER.join(timelineServerUri, "ws", "v1", "applicationhistory", "containers", + containerIdString, "logs", filename); return url; }