diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java index 5a94db9..610c0a5 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener; import org.apache.hadoop.hive.llap.registry.ServiceRegistry; +import org.apache.hadoop.registry.client.binding.RegistryUtils; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.slf4j.Logger; @@ -57,17 +58,17 @@ public static synchronized LlapRegistryService getClient(Configuration conf) { String hosts = HiveConf.getTrimmedVar(conf, HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS); Preconditions.checkNotNull(hosts, ConfVars.LLAP_DAEMON_SERVICE_HOSTS.toString() + " must be defined"); LlapRegistryService registry; - // TODO: this is not going to work with multiple users. if (hosts.startsWith("@")) { // Caching instances only in case of the YARN registry. Each host based list will get it's own copy. - String name = hosts.substring(1); - if (yarnRegistries.containsKey(name) && yarnRegistries.get(name).isInState(STATE.STARTED)) { - registry = yarnRegistries.get(name); - } else { + String appName = hosts.substring(1); + String userName = HiveConf.getVar(conf, ConfVars.LLAP_ZK_REGISTRY_USER, RegistryUtils.currentUser()); + String key = appName + "-" + userName; + registry = yarnRegistries.get(key); + if (registry == null || !registry.isInState(STATE.STARTED)) { registry = new LlapRegistryService(false); registry.init(conf); registry.start(); - yarnRegistries.put(name, registry); + yarnRegistries.put(key, registry); } } else { registry = new LlapRegistryService(false); diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index 893b7d9..f1c307d 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -14,6 +14,7 @@ package org.apache.hadoop.hive.llap.tezplugins; +import org.apache.hadoop.hive.llap.registry.ServiceInstance; import org.apache.hadoop.io.Writable; import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.TezAttemptArray; @@ -120,6 +121,7 @@ private final String user; private String amHost; private URI timelineServerUri; + private String nmWebAppAddress; // These two structures track the list of known nodes, and the list of nodes which are sending in keep-alive heartbeats. // Primarily for debugging purposes a.t.m, since there's some unexplained TASK_TIMEOUTS which are currently being observed. @@ -149,7 +151,6 @@ public LlapTaskCommunicator( Preconditions.checkState((token != null) == UserGroupInformation.isSecurityEnabled()); // Not closing this at the moment at shutdown, since this could be a shared instance. - // TODO: this is unused. serviceRegistry = LlapRegistryService.getClient(conf); umbilical = new LlapTaskUmbilicalProtocolImpl(getUmbilical()); @@ -203,6 +204,7 @@ public void initialize() throws Exception { YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS), RESOURCE_URI_STR)); } + this.nmWebAppAddress = conf.get(YarnConfiguration.NM_WEBAPP_ADDRESS, YarnConfiguration.DEFAULT_NM_WEBAPP_ADDRESS); } @Override @@ -540,37 +542,54 @@ public void indicateError(Throwable t) { @Override public String getInProgressLogsUrl(TezTaskAttemptID attemptID, NodeId containerNodeId) { - String url = ""; - if (timelineServerUri != null) { - LlapNodeId llapNodeId = LlapNodeId.getInstance(containerNodeId.getHost(), containerNodeId.getPort()); - BiMap biMap = entityTracker.getContainerAttemptMapForNode(llapNodeId); - ContainerId containerId = biMap.inverse().get(attemptID); - if (containerId != null) { - String dagId = attemptID.getTaskID().getVertexID().getDAGId().toString(); - String filename = currentHiveQueryId + "-" + dagId + ".log"; - // YARN-6011 provides a webservice to get the logs - url = PATH_JOINER.join(timelineServerUri.toString(), "containers", containerId.toString(), "logs", - filename); - } - } - return url; + return constructLogUrl(attemptID, containerNodeId, false); } @Override public String getCompletedLogsUrl(TezTaskAttemptID attemptID, NodeId containerNodeId) { - String url = ""; - if (timelineServerUri != null) { - LlapNodeId llapNodeId = LlapNodeId.getInstance(containerNodeId.getHost(), containerNodeId.getPort()); - BiMap biMap = entityTracker.getContainerAttemptMapForNode(llapNodeId); - ContainerId containerId = biMap.inverse().get(attemptID); - if (containerId != null) { - String dagId = attemptID.getTaskID().getVertexID().getDAGId().toString(); - String filename = currentHiveQueryId + "-" + dagId + ".log.done"; - // YARN-6011 provides a webservice to get the logs - url = PATH_JOINER.join(timelineServerUri.toString(), "containers", containerId.toString(), "logs", - filename); + return constructLogUrl(attemptID, containerNodeId, true); + } + + private String constructLogUrl(final TezTaskAttemptID attemptID, final NodeId containerNodeId, final boolean isDone) { + if (timelineServerUri == null) { + return null; + } + Set instanceSet; + try { + instanceSet = serviceRegistry.getInstances().getByHost(containerNodeId.getHost()); + } catch (IOException e) { + // Not failing the job due to a failure constructing the log url + LOG.warn( + "Unable to find instance for yarnNodeId={} to construct the log url. Exception message={}", + containerNodeId, e.getMessage()); + return null; + } + if (instanceSet != null) { + ServiceInstance matchedInstance = null; + for (ServiceInstance instance : instanceSet) { + if (instance.getRpcPort() == containerNodeId.getPort()) { + matchedInstance = instance; + break; + } + } + if (matchedInstance != null) { + String containerIdString = matchedInstance.getProperties() + .get(HiveConf.ConfVars.LLAP_DAEMON_CONTAINER_ID.varname); + if (containerIdString != null) { + return constructLlapLogUrl(attemptID, containerIdString, isDone); + } } } + return null; + } + + private String constructLlapLogUrl(final TezTaskAttemptID attemptID, final String containerIdString, + final boolean isDone) { + String dagId = attemptID.getTaskID().getVertexID().getDAGId().toString(); + String filename = JOINER.join(currentHiveQueryId, "-", dagId, ".log", (isDone ? ".done" : ""), + "?nm.id=", nmWebAppAddress); + String url = PATH_JOINER.join(timelineServerUri.toString(), "containers", containerIdString, "logs", + filename); return url; }