diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java index 5a94db9..7017a68 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener; import org.apache.hadoop.hive.llap.registry.ServiceRegistry; +import org.apache.hadoop.registry.client.binding.RegistryUtils; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.slf4j.Logger; @@ -57,17 +58,18 @@ public static synchronized LlapRegistryService getClient(Configuration conf) { String hosts = HiveConf.getTrimmedVar(conf, HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS); Preconditions.checkNotNull(hosts, ConfVars.LLAP_DAEMON_SERVICE_HOSTS.toString() + " must be defined"); LlapRegistryService registry; - // TODO: this is not going to work with multiple users. if (hosts.startsWith("@")) { // Caching instances only in case of the YARN registry. Each host based list will get it's own copy. - String name = hosts.substring(1); - if (yarnRegistries.containsKey(name) && yarnRegistries.get(name).isInState(STATE.STARTED)) { - registry = yarnRegistries.get(name); + String appName = hosts.substring(1); + String userName = HiveConf.getVar(conf, ConfVars.LLAP_ZK_REGISTRY_USER, RegistryUtils.currentUser()); + String key = appName + "-" + userName; + if (yarnRegistries.containsKey(key) && yarnRegistries.get(key).isInState(STATE.STARTED)) { + registry = yarnRegistries.get(key); } else { registry = new LlapRegistryService(false); registry.init(conf); registry.start(); - yarnRegistries.put(name, registry); + yarnRegistries.put(key, registry); } } else { registry = new LlapRegistryService(false); diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index 893b7d9..e7b2ad3 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -14,6 +14,7 @@ package org.apache.hadoop.hive.llap.tezplugins; +import org.apache.hadoop.hive.llap.registry.ServiceInstance; import org.apache.hadoop.io.Writable; import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.TezAttemptArray; @@ -149,7 +150,6 @@ public LlapTaskCommunicator( Preconditions.checkState((token != null) == UserGroupInformation.isSecurityEnabled()); // Not closing this at the moment at shutdown, since this could be a shared instance. - // TODO: this is unused. serviceRegistry = LlapRegistryService.getClient(conf); umbilical = new LlapTaskUmbilicalProtocolImpl(getUmbilical()); @@ -540,37 +540,56 @@ public void indicateError(Throwable t) { @Override public String getInProgressLogsUrl(TezTaskAttemptID attemptID, NodeId containerNodeId) { - String url = ""; - if (timelineServerUri != null) { - LlapNodeId llapNodeId = LlapNodeId.getInstance(containerNodeId.getHost(), containerNodeId.getPort()); - BiMap biMap = entityTracker.getContainerAttemptMapForNode(llapNodeId); - ContainerId containerId = biMap.inverse().get(attemptID); - if (containerId != null) { - String dagId = attemptID.getTaskID().getVertexID().getDAGId().toString(); - String filename = currentHiveQueryId + "-" + dagId + ".log"; - // YARN-6011 provides a webservice to get the logs - url = PATH_JOINER.join(timelineServerUri.toString(), "containers", containerId.toString(), "logs", - filename); - } - } - return url; + return constructLogUrl(attemptID, containerNodeId); } @Override public String getCompletedLogsUrl(TezTaskAttemptID attemptID, NodeId containerNodeId) { - String url = ""; - if (timelineServerUri != null) { - LlapNodeId llapNodeId = LlapNodeId.getInstance(containerNodeId.getHost(), containerNodeId.getPort()); - BiMap biMap = entityTracker.getContainerAttemptMapForNode(llapNodeId); - ContainerId containerId = biMap.inverse().get(attemptID); - if (containerId != null) { - String dagId = attemptID.getTaskID().getVertexID().getDAGId().toString(); - String filename = currentHiveQueryId + "-" + dagId + ".log.done"; - // YARN-6011 provides a webservice to get the logs - url = PATH_JOINER.join(timelineServerUri.toString(), "containers", containerId.toString(), "logs", - filename); + String url = constructLogUrl(attemptID, containerNodeId); + if (url == null) { + return url; + } + return url + ".done"; + } + + private String constructLogUrl(final TezTaskAttemptID attemptID, final NodeId containerNodeId) { + Set instanceSet; + try { + instanceSet = serviceRegistry.getInstances().getByHost(containerNodeId.getHost()); + } catch (IOException e) { + // Not failing the job due to a failure constructing the log url + LOG.warn( + "Unable to find instance for yarnNodeId={} to construct the log url. Exception message={}", + containerNodeId, e.getMessage()); + return null; + } + if (instanceSet != null) { + ServiceInstance matchedInstance = null; + for (ServiceInstance instance : instanceSet) { + if (instance.getRpcPort() == containerNodeId.getPort()) { + matchedInstance = instance; + break; + } + } + if (matchedInstance != null) { + String containerIdString = matchedInstance.getProperties() + .get(HiveConf.ConfVars.LLAP_DAEMON_CONTAINER_ID.varname); + if (containerIdString != null) { + return constructLlapLogUrl(attemptID, containerIdString); + } } } + return null; + } + + private String constructLlapLogUrl(final TezTaskAttemptID attemptID, final String containerIdString) { + String url = null; + if (timelineServerUri != null) { + String dagId = attemptID.getTaskID().getVertexID().getDAGId().toString(); + String filename = currentHiveQueryId + "-" + dagId + ".log"; + url = PATH_JOINER.join(timelineServerUri.toString(), "containers", containerIdString, "logs", + filename); + } return url; }