diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index a4f5d4d..b4b041a 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -47,6 +47,8 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto; import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; +import org.apache.hadoop.hive.llap.registry.ServiceInstance; +import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; import org.apache.hadoop.hive.llap.tez.Converters; import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy; @@ -63,6 +65,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.tez.common.TezTaskUmbilicalProtocol; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.api.TezConfiguration; @@ -108,6 +111,8 @@ private final ConcurrentMap knownNodeMap = new ConcurrentHashMap<>(); private final ConcurrentMap pingedNodeMap = new ConcurrentHashMap<>(); + private final LlapRegistryService serviceRegistry; + private volatile int currentDagId; private volatile QueryIdentifierProto currentQueryIdentifierProto; @@ -129,6 +134,9 @@ public LlapTaskCommunicator( } Preconditions.checkState((token != null) == UserGroupInformation.isSecurityEnabled()); + // Not closing this at the moment at shutdown, since this could be a shared instance. + serviceRegistry = LlapRegistryService.getClient(conf); + umbilical = new LlapTaskUmbilicalProtocolImpl(getUmbilical()); SubmitWorkRequestProto.Builder baseBuilder = SubmitWorkRequestProto.newBuilder(); @@ -455,6 +463,45 @@ public void indicateError(Throwable t) { }); } + // @Override - TODO Add the annotation after upgrading Hive to Tez 0.8.4 + public String getInProgressLogsUrl(TezTaskAttemptID attemptID, NodeId containerNodeId) { + return constructLogUrl(containerNodeId); + } + + // @Override - TODO Add the annotation after upgrading Hive to Tez 0.8.4 + public String getCompletedLogsUrl(TezTaskAttemptID attemptID, NodeId containerNodeId) { + return constructLogUrl(containerNodeId); + } + + private String constructLogUrl(NodeId containerNodeId) { + Set instanceSet = null; + try { + instanceSet = serviceRegistry.getInstances().getByHost(containerNodeId.getHost()); + } catch (IOException e) { + // Not failing the job due to a failure constructing the log url + LOG.warn( + "Unable to find instance for yarnNodeId={} to construct the log url. Exception message={}", + containerNodeId, e.getMessage()); + return null; + } + if (instanceSet != null) { + ServiceInstance matchedInstance = null; + for (ServiceInstance instance : instanceSet) { + if (instance.getRpcPort() == containerNodeId.getPort()) { + matchedInstance = instance; + break; + } + } + if (matchedInstance != null) { + return constructLlapLogUrl(matchedInstance); + } + } + return null; + } + + private String constructLlapLogUrl(ServiceInstance serviceInstance) { + return serviceInstance.getServicesAddress() + "/logs"; + } private static class PingingNodeInfo { final AtomicLong logTimestamp;