diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index 4148dc3..5575f6c 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -19,6 +19,7 @@ import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.TezAttemptArray; import java.io.IOException; +import java.net.URI; import java.nio.ByteBuffer; import java.util.HashSet; import java.util.Map; @@ -30,6 +31,7 @@ import java.util.concurrent.atomic.AtomicLong; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; @@ -73,6 +75,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.tez.common.TezTaskUmbilicalProtocol; import org.apache.tez.common.TezUtils; import org.apache.tez.common.security.JobTokenSecretManager; @@ -99,7 +102,9 @@ private static final Logger LOG = LoggerFactory.getLogger(LlapTaskCommunicator.class); private static final boolean isInfoEnabled = LOG.isInfoEnabled(); - + private static final String RESOURCE_URI_STR = "/ws/v1/applicationhistory"; + private static final Joiner JOINER = Joiner.on(""); + private static final Joiner PATH_JOINER = Joiner.on("/"); private final ConcurrentMap credentialMap; // Tracks containerIds and taskAttemptIds, so can be kept independent of the running DAG. @@ -114,6 +119,7 @@ private final Token token; private final String user; private String amHost; + private URI timelineServerUri; // These two structures track the list of known nodes, and the list of nodes which are sending in keep-alive heartbeats. // Primarily for debugging purposes a.t.m, since there's some unexplained TASK_TIMEOUTS which are currently being observed. @@ -185,6 +191,18 @@ public void initialize() throws Exception { + "fileCleanupDelay=" + deleteDelayOnDagComplete + ", numCommunicatorThreads=" + numThreads); this.communicator.init(conf); + if (YarnConfiguration.useHttps(conf)) { + timelineServerUri = URI + .create(JOINER.join("https://", conf.get( + YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS), + RESOURCE_URI_STR)); + } else { + timelineServerUri = URI.create(JOINER.join("http://", conf.get( + YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS), + RESOURCE_URI_STR)); + } } @Override @@ -508,16 +526,34 @@ public void indicateError(Throwable t) { @Override public String getInProgressLogsUrl(TezTaskAttemptID attemptID, NodeId containerNodeId) { - // Not supported yet. - // Need support from YARN to link to an already aggregated log, or at least list them. - return null; + String url = ""; + if (timelineServerUri != null) { + LlapNodeId llapNodeId = LlapNodeId.getInstance(containerNodeId.getHost(), containerNodeId.getPort()); + BiMap biMap = entityTracker.getContainerAttemptMapForNode(llapNodeId); + ContainerId containerId = biMap.inverse().get(attemptID); + String dagId = attemptID.getTaskID().getVertexID().getDAGId().toString(); + String filename = currentHiveQueryId + "-" + dagId + ".log"; + // YARN-6011 provides a webservice to get the logs + url = PATH_JOINER.join(timelineServerUri.toString(), "containers", containerId.toString(), "logs", + filename); + } + return url; } @Override public String getCompletedLogsUrl(TezTaskAttemptID attemptID, NodeId containerNodeId) { - // Not supported yet. - // Need support from YARN to link to an already aggregated log, or at least list them. - return null; + String url = ""; + if (timelineServerUri != null) { + LlapNodeId llapNodeId = LlapNodeId.getInstance(containerNodeId.getHost(), containerNodeId.getPort()); + BiMap biMap = entityTracker.getContainerAttemptMapForNode(llapNodeId); + ContainerId containerId = biMap.inverse().get(attemptID); + String dagId = attemptID.getTaskID().getVertexID().getDAGId().toString(); + String filename = currentHiveQueryId + "-" + dagId + ".log.done"; + // YARN-6011 provides a webservice to get the logs + url = PATH_JOINER.join(timelineServerUri.toString(), "containers", containerId.toString(), "logs", + filename); + } + return url; } private static class PingingNodeInfo {