diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java index 1da6e23..8a7239c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java @@ -24,41 +24,30 @@ import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING; import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.Map; - import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.HarFs; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; -import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; -import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; -import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.hadoop.yarn.util.Times; -import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; -import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.PRE; +import org.apache.hadoop.yarn.webapp.View.ViewContext; +import org.apache.hadoop.yarn.webapp.util.AggregatedLogLimits; +import org.apache.hadoop.yarn.webapp.util.AggregatedLogsBlockRender; import org.apache.hadoop.yarn.webapp.view.HtmlBlock; import com.google.inject.Inject; +/** + * A Block of HTML that will render a given aggregated log. + */ @InterfaceAudience.LimitedPrivate({"YARN", "MapReduce"}) public class AggregatedLogsBlock extends HtmlBlock { private final Configuration conf; @Inject - AggregatedLogsBlock(Configuration conf) { + AggregatedLogsBlock(ViewContext ctx, Configuration conf) { + super(ctx); this.conf = conf; } @@ -67,7 +56,7 @@ protected void render(Block html) { ContainerId containerId = verifyAndGetContainerId(html); NodeId nodeId = verifyAndGetNodeId(html); String appOwner = verifyAndGetAppOwner(html); - LogLimits logLimits = verifyAndGetLogLimits(html); + AggregatedLogLimits logLimits = verifyAndGetLogLimits(html); if (containerId == null || nodeId == null || appOwner == null || appOwner.isEmpty() || logLimits == null) { return; @@ -94,192 +83,10 @@ protected void render(Block html) { return; } - Path remoteRootLogDir = new Path(conf.get( - YarnConfiguration.NM_REMOTE_APP_LOG_DIR, - YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); - Path remoteAppDir = LogAggregationUtils.getRemoteAppLogDir( - remoteRootLogDir, applicationId, appOwner, - LogAggregationUtils.getRemoteNodeLogDirSuffix(conf)); - RemoteIterator nodeFiles; - try { - Path qualifiedLogDir = - FileContext.getFileContext(conf).makeQualified( - remoteAppDir); - nodeFiles = - FileContext.getFileContext(qualifiedLogDir.toUri(), conf) - .listStatus(remoteAppDir); - } catch (FileNotFoundException fnf) { - html.h1() - ._("Logs not available for " + logEntity - + ". Aggregation may not be complete, " - + "Check back later or try the nodemanager at " + nodeId)._(); - if(nmApplicationLogUrl != null) { - html.h1() - ._("Or see application log at " + nmApplicationLogUrl) - ._(); - } - return; - } catch (Exception ex) { - html.h1() - ._("Error getting logs at " + nodeId)._(); - return; - } - - boolean foundLog = false; - String desiredLogType = $(CONTAINER_LOG_TYPE); - try { - while (nodeFiles.hasNext()) { - AggregatedLogFormat.LogReader reader = null; - try { - FileStatus thisNodeFile = nodeFiles.next(); - if (thisNodeFile.getPath().getName().equals(applicationId + ".har")) { - Path p = new Path("har:///" - + thisNodeFile.getPath().toUri().getRawPath()); - nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p); - continue; - } - if (!thisNodeFile.getPath().getName() - .contains(LogAggregationUtils.getNodeString(nodeId)) - || thisNodeFile.getPath().getName() - .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) { - continue; - } - long logUploadedTime = thisNodeFile.getModificationTime(); - reader = - new AggregatedLogFormat.LogReader(conf, thisNodeFile.getPath()); - - String owner = null; - Map appAcls = null; - try { - owner = reader.getApplicationOwner(); - appAcls = reader.getApplicationAcls(); - } catch (IOException e) { - LOG.error("Error getting logs for " + logEntity, e); - continue; - } - ApplicationACLsManager aclsManager = new ApplicationACLsManager(conf); - aclsManager.addApplication(applicationId, appAcls); - - String remoteUser = request().getRemoteUser(); - UserGroupInformation callerUGI = null; - if (remoteUser != null) { - callerUGI = UserGroupInformation.createRemoteUser(remoteUser); - } - if (callerUGI != null && !aclsManager.checkAccess(callerUGI, - ApplicationAccessType.VIEW_APP, owner, applicationId)) { - html.h1() - ._("User [" + remoteUser - + "] is not authorized to view the logs for " + logEntity - + " in log file [" + thisNodeFile.getPath().getName() + "]")._(); - LOG.error("User [" + remoteUser - + "] is not authorized to view the logs for " + logEntity); - continue; - } - - AggregatedLogFormat.ContainerLogsReader logReader = reader - .getContainerLogsReader(containerId); - if (logReader == null) { - continue; - } - - foundLog = readContainerLogs(html, logReader, logLimits, - desiredLogType, logUploadedTime); - } catch (IOException ex) { - LOG.error("Error getting logs for " + logEntity, ex); - continue; - } finally { - if (reader != null) - reader.close(); - } - } - if (!foundLog) { - if (desiredLogType.isEmpty()) { - html.h1("No logs available for container " + containerId.toString()); - } else { - html.h1("Unable to locate '" + desiredLogType - + "' log for container " + containerId.toString()); - } - } - } catch (IOException e) { - html.h1()._("Error getting logs for " + logEntity)._(); - LOG.error("Error getting logs for " + logEntity, e); - } - } - - private boolean readContainerLogs(Block html, - AggregatedLogFormat.ContainerLogsReader logReader, LogLimits logLimits, - String desiredLogType, long logUpLoadTime) throws IOException { - int bufferSize = 65536; - char[] cbuf = new char[bufferSize]; - - boolean foundLog = false; - String logType = logReader.nextLog(); - while (logType != null) { - if (desiredLogType == null || desiredLogType.isEmpty() - || desiredLogType.equals(logType)) { - long logLength = logReader.getCurrentLogLength(); - if (foundLog) { - html.pre()._("\n\n")._(); - } - - html.p()._("Log Type: " + logType)._(); - html.p()._("Log Upload Time: " + Times.format(logUpLoadTime))._(); - html.p()._("Log Length: " + Long.toString(logLength))._(); - - long start = logLimits.start < 0 - ? logLength + logLimits.start : logLimits.start; - start = start < 0 ? 0 : start; - start = start > logLength ? logLength : start; - long end = logLimits.end < 0 - ? logLength + logLimits.end : logLimits.end; - end = end < 0 ? 0 : end; - end = end > logLength ? logLength : end; - end = end < start ? start : end; - - long toRead = end - start; - if (toRead < logLength) { - html.p()._("Showing " + toRead + " bytes of " + logLength - + " total. Click ") - .a(url("logs", $(NM_NODENAME), $(CONTAINER_ID), - $(ENTITY_STRING), $(APP_OWNER), - logType, "?start=0"), "here"). - _(" for the full log.")._(); - } - - long totalSkipped = 0; - while (totalSkipped < start) { - long ret = logReader.skip(start - totalSkipped); - if (ret == 0) { - //Read one byte - int nextByte = logReader.read(); - // Check if we have reached EOF - if (nextByte == -1) { - throw new IOException( "Premature EOF from container log"); - } - ret = 1; - } - totalSkipped += ret; - } - - int len = 0; - int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead; - PRE pre = html.pre(); - - while (toRead > 0 - && (len = logReader.read(cbuf, 0, currentToRead)) > 0) { - pre._(new String(cbuf, 0, len)); - toRead = toRead - len; - currentToRead = toRead > bufferSize ? bufferSize : (int) toRead; - } - - pre._(); - foundLog = true; - } - - logType = logReader.nextLog(); - } - - return foundLog; + AggregatedLogsBlockRender render = new AggregatedLogsBlockRender( + this.context(), applicationId, appOwner, nodeId, containerId); + render.renderAggregatedLogsBlock(html, conf, $(CONTAINER_LOG_TYPE), + logLimits, logEntity, nmApplicationLogUrl, request().getRemoteUser()); } private ContainerId verifyAndGetContainerId(Block html) { @@ -325,14 +132,11 @@ private String verifyAndGetAppOwner(Block html) { return appOwner; } - private static class LogLimits { - long start; - long end; - } - - private LogLimits verifyAndGetLogLimits(Block html) { + private AggregatedLogLimits verifyAndGetLogLimits(Block html) { long start = -4096; long end = Long.MAX_VALUE; + long timestampStart = 0; + long timestampEnd = Long.MAX_VALUE; boolean isValid = true; String startStr = $("start"); @@ -355,13 +159,38 @@ private LogLimits verifyAndGetLogLimits(Block html) { } } + String timestampStartStr = $("timestampstart"); + if (timestampStartStr != null && !timestampStartStr.isEmpty()) { + try { + timestampStart = Long.parseLong(timestampStartStr); + } catch (NumberFormatException e) { + isValid = false; + html.h1()._("Invalid timestamp start value: " + timestampStartStr)._(); + } + } + + String timestampEndStr = $("timestampend"); + if (timestampEndStr != null && !timestampEndStr.isEmpty()) { + try { + timestampEnd = Long.parseLong(timestampEndStr); + } catch (NumberFormatException e) { + isValid = false; + html.h1()._("Invalid timestamp end value: " + timestampEndStr)._(); + } + } + + if (timestampStart > timestampEnd) { + isValid = false; + html.h1()._("Invalid timestampstart and timestampend values. " + + "timestampstart: [" + timestampStart + "]" + + ", timestampend[" + timestampEnd + "]")._(); + } if (!isValid) { return null; } - LogLimits limits = new LogLimits(); - limits.start = start; - limits.end = end; + AggregatedLogLimits limits = new AggregatedLogLimits(start, end, + timestampStart, timestampEnd); return limits; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/AggregatedLogLimits.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/AggregatedLogLimits.java new file mode 100644 index 0000000..6d65cba --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/AggregatedLogLimits.java @@ -0,0 +1,60 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.webapp.util; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +/** + * Entities used to filter the aggregated log. + */ +@Private +@Evolving +@InterfaceAudience.LimitedPrivate({"YARN", "MapReduce"}) +public class AggregatedLogLimits { + private final long start; + private final long end; + private final long timestampStart; + private final long timestampEnd; + + public AggregatedLogLimits(long start, long end, + long timestampStart, long timestampEnd) { + this.start = start; + this.end = end; + this.timestampStart = timestampStart; + this.timestampEnd = timestampEnd; + } + + public long getStart() { + return start; + } + + public long getEnd() { + return end; + } + + public long getTimestampStart() { + return timestampStart; + } + + public long getTimestampEnd() { + return timestampEnd; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/AggregatedLogsBlockRender.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/AggregatedLogsBlockRender.java new file mode 100644 index 0000000..016d1ce --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/AggregatedLogsBlockRender.java @@ -0,0 +1,293 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.webapp.util; + +import com.google.inject.Inject; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Map; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.HarFs; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; +import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; +import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.util.Times; +import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; +import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.PRE; +import org.apache.hadoop.yarn.webapp.view.HtmlBlock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * Renders a block for the aggregated log. + */ +@InterfaceAudience.LimitedPrivate({"YARN", "MapReduce"}) +public class AggregatedLogsBlockRender extends HtmlBlock { + public static final Logger LOG = LoggerFactory.getLogger( + AggregatedLogsBlockRender.class); + + private final ApplicationId appId; + private final String appOwner; + private final ContainerId containerId; + private final NodeId nodeId; + + @Inject + public AggregatedLogsBlockRender(ViewContext ctx, ApplicationId appId, + String appOwner, NodeId nodeId, ContainerId containerId) { + super(ctx); + this.appId = appId; + this.appOwner = appOwner; + this.nodeId = nodeId; + this.containerId = containerId; + } + + public void renderAggregatedLogsBlock(Block html, Configuration conf, + String logType, AggregatedLogLimits logLimits, + String logEntity, String nmApplicationLogUrl, String remoteUser) { + Path remoteRootLogDir = new Path(conf.get( + YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); + Path remoteAppDir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogDir, appId, appOwner, + LogAggregationUtils.getRemoteNodeLogDirSuffix(conf)); + RemoteIterator nodeFiles; + try { + Path qualifiedLogDir = + FileContext.getFileContext(conf).makeQualified( + remoteAppDir); + nodeFiles = + FileContext.getFileContext(qualifiedLogDir.toUri(), conf) + .listStatus(remoteAppDir); + } catch (FileNotFoundException fnf) { + html.h1() + ._("Logs not available for " + logEntity + + ". Aggregation may not be complete, " + + "Check back later or try the nodemanager at " + nodeId)._(); + if(nmApplicationLogUrl != null) { + html.h1() + ._("Or see application log at " + nmApplicationLogUrl) + ._(); + } + return; + } catch (Exception ex) { + html.h1() + ._("Error getting logs at " + nodeId)._(); + return; + } + + boolean foundLog = false; + String desiredLogType = logType; + try { + while (nodeFiles.hasNext()) { + AggregatedLogFormat.LogReader reader = null; + try { + FileStatus thisNodeFile = nodeFiles.next(); + if (thisNodeFile.getPath().getName().equals(appId + ".har")) { + Path p = new Path("har:///" + + thisNodeFile.getPath().toUri().getRawPath()); + nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p); + continue; + } + if (!thisNodeFile.getPath().getName() + .contains(LogAggregationUtils.getNodeString(nodeId)) + || thisNodeFile.getPath().getName() + .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) { + continue; + } + long timestamp = thisNodeFile.getModificationTime(); + if (timestamp < logLimits.getTimestampStart() + || timestamp > logLimits.getTimestampEnd()) { + continue; + } + long logUploadedTime = thisNodeFile.getModificationTime(); + reader = + new AggregatedLogFormat.LogReader(conf, thisNodeFile.getPath()); + + String owner = null; + Map appAcls = null; + try { + owner = reader.getApplicationOwner(); + appAcls = reader.getApplicationAcls(); + } catch (IOException e) { + LOG.error("Error getting logs for " + logEntity, e); + continue; + } + ApplicationACLsManager aclsManager = new ApplicationACLsManager(conf); + aclsManager.addApplication(appId, appAcls); + + UserGroupInformation callerUGI = null; + if (remoteUser != null) { + callerUGI = UserGroupInformation.createRemoteUser(remoteUser); + } + if (callerUGI != null && !aclsManager.checkAccess(callerUGI, + ApplicationAccessType.VIEW_APP, owner, appId)) { + html.h1() + ._("User [" + remoteUser + + "] is not authorized to view the logs for " + logEntity + + " in log file [" + thisNodeFile.getPath().getName() + "]")._(); + LOG.error("User [" + remoteUser + + "] is not authorized to view the logs for " + logEntity); + continue; + } + + AggregatedLogFormat.ContainerLogsReader logReader = reader + .getContainerLogsReader(containerId); + if (logReader == null) { + continue; + } + + foundLog = readContainerLogs(html, logReader, logLimits, + desiredLogType, logEntity, logUploadedTime); + } catch (IOException ex) { + LOG.error("Error getting logs for " + logEntity, ex); + continue; + } finally { + if (reader != null) + reader.close(); + } + } + if (!foundLog) { + if (desiredLogType.isEmpty()) { + html.h1("No logs available for container " + containerId.toString()); + } else { + html.h1("Unable to locate '" + desiredLogType + + "' log for container " + containerId.toString()); + } + } + } catch (IOException e) { + html.h1()._("Error getting logs for " + logEntity)._(); + LOG.error("Error getting logs for " + logEntity, e); + } + } + + private boolean readContainerLogs(Block html, + AggregatedLogFormat.ContainerLogsReader logReader, + AggregatedLogLimits logLimits, String desiredLogType, + String logEntity, long logUpLoadTime) throws IOException { + int bufferSize = 65536; + char[] cbuf = new char[bufferSize]; + + boolean foundLog = false; + String logType = logReader.nextLog(); + while (logType != null) { + if (desiredLogType == null || desiredLogType.isEmpty() + || desiredLogType.equals(logType)) { + long logLength = logReader.getCurrentLogLength(); + if (foundLog) { + html.pre()._("\n\n")._(); + } + html.p()._("Log Type: " + logType)._(); + html.p()._("Log Upload Time: " + Times.format(logUpLoadTime))._(); + html.p()._("Log Length: " + Long.toString(logLength))._(); + + long start = logLimits.getStart() < 0 + ? logLength + logLimits.getStart() : logLimits.getStart(); + start = start < 0 ? 0 : start; + start = start > logLength ? logLength : start; + long end = logLimits.getEnd() < 0 + ? logLength + logLimits.getEnd() : logLimits.getEnd(); + end = end < 0 ? 0 : end; + end = end > logLength ? logLength : end; + end = end < start ? start : end; + + long toRead = end - start; + if (toRead < logLength) { + createURLforFullLog(html, toRead, logLength, logType, logEntity, + logLimits); + } + + long totalSkipped = 0; + while (totalSkipped < start) { + long ret = logReader.skip(start - totalSkipped); + if (ret == 0) { + //Read one byte + int nextByte = logReader.read(); + // Check if we have reached EOF + if (nextByte == -1) { + throw new IOException( "Premature EOF from container log"); + } + ret = 1; + } + totalSkipped += ret; + } + + int len = 0; + int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead; + PRE pre = html.pre(); + + while (toRead > 0 + && (len = logReader.read(cbuf, 0, currentToRead)) > 0) { + pre._(new String(cbuf, 0, len)); + toRead = toRead - len; + currentToRead = toRead > bufferSize ? bufferSize : (int) toRead; + } + + pre._(); + foundLog = true; + } + + logType = logReader.nextLog(); + } + + return foundLog; + } + + public void createURLforFullLog(Block html, long toRead, long logLength, + String logType, String logEntity, AggregatedLogLimits logLimits) { + html.p()._("Showing " + toRead + " bytes of " + logLength + + " total. Click ").a(url("logs", this.nodeId.toString(), + this.containerId.toString(), logEntity, this.appOwner, logType, + "?start=0×tampstart=" + logLimits.getTimestampStart() + + "×tampend=" + logLimits.getTimestampEnd()), "here") + ._(" for the full log.")._(); + } + + public ApplicationId getAppId() { + return appId; + } + + public String getAppOwner() { + return appOwner; + } + + public ContainerId getContainerId() { + return containerId; + } + + public NodeId getNodeId() { + return nodeId; + } + + @Override + protected void render(Block html) { + // DO Nothing + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java index 1e71b3c..1802c27 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java @@ -30,7 +30,6 @@ import java.util.Map; import javax.servlet.http.HttpServletRequest; - import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; @@ -44,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.webapp.View.ViewContext; import org.apache.hadoop.yarn.webapp.YarnWebParams; import org.apache.hadoop.yarn.webapp.log.AggregatedLogsBlockForTest; import org.apache.hadoop.yarn.webapp.view.BlockForTest; @@ -268,8 +268,9 @@ private AggregatedLogsBlockForTest getAggregatedLogsBlockForTest( String nodeName) { HttpServletRequest request = mock(HttpServletRequest.class); when(request.getRemoteUser()).thenReturn(user); + ViewContext mockView = mock(ViewContext.class); AggregatedLogsBlockForTest aggregatedBlock = new AggregatedLogsBlockForTest( - configuration); + mockView, configuration); aggregatedBlock.setRequest(request); aggregatedBlock.moreParams().put(YarnWebParams.CONTAINER_ID, containerId); aggregatedBlock.moreParams().put(YarnWebParams.NM_NODENAME, nodeName); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlockForTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlockForTest.java index 57e6c81..188f38f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlockForTest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlockForTest.java @@ -29,8 +29,9 @@ final private Map params = new HashMap(); private HttpServletRequest request; - public AggregatedLogsBlockForTest(Configuration conf) { - super(conf); + + public AggregatedLogsBlockForTest(ViewContext ctx, Configuration conf) { + super(ctx, conf); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java index 3e5f4d2..dff3679 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java @@ -29,28 +29,43 @@ import java.io.IOException; import java.io.InputStreamReader; import java.nio.charset.Charset; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Calendar; import java.util.Collections; +import java.util.Comparator; +import java.util.Date; import java.util.List; - import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta; +import org.apache.hadoop.yarn.logaggregation.LogToolUtils; +import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo; import org.apache.hadoop.yarn.server.nodemanager.Context; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.webapp.NotFoundException; import org.apache.hadoop.yarn.webapp.SubView; import org.apache.hadoop.yarn.webapp.YarnWebParams; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.PRE; +import org.apache.hadoop.yarn.webapp.util.AggregatedLogLimits; +import org.apache.hadoop.yarn.webapp.util.AggregatedLogsBlockRender; import org.apache.hadoop.yarn.webapp.view.HtmlBlock; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.inject.Inject; public class ContainerLogsPage extends NMView { - + + public static final Logger LOG = LoggerFactory.getLogger( + ContainerLogsPage.class); + public static final String REDIRECT_URL = "redirect.url"; - + @Override protected void preHead(Page.HTML<_> html) { String redirectUrl = $(REDIRECT_URL); if (redirectUrl == null || redirectUrl.isEmpty()) { @@ -86,13 +101,15 @@ protected void render(Block html) { String redirectUrl = $(REDIRECT_URL); if (redirectUrl !=null && redirectUrl.equals("false")) { html.h1("Failed while trying to construct the redirect url to the log" + - " server. Log Server url may not be configured"); + " server. Log Server url may not be configured"); //Intentional fallthrough. } ContainerId containerId; + ApplicationId appId; try { containerId = ContainerId.fromString($(CONTAINER_ID)); + appId = containerId.getApplicationAttemptId().getApplicationId(); } catch (IllegalArgumentException ex) { html.h1("Invalid container ID: " + $(CONTAINER_ID)); return; @@ -100,22 +117,52 @@ protected void render(Block html) { try { if ($(CONTAINER_LOG_TYPE).isEmpty()) { + html.h2("Local Logs:"); List logFiles = ContainerLogsUtils.getContainerLogDirs(containerId, request().getRemoteUser(), nmContext); - printLogFileDirectory(html, logFiles); + printLocalLogFileDirectory(html, logFiles); + // print out the aggregated logs if exists + try { + List containersLogMeta = LogToolUtils + .getContainerLogMetaFromRemoteFS(nmContext.getConf(), + containerId.getApplicationAttemptId().getApplicationId(), + containerId.toString(), nmContext.getNodeId().toString(), + $(APP_OWNER)); + if (containersLogMeta != null && !containersLogMeta.isEmpty()) { + html.h2("Aggregated Logs:"); + printAggregatedLogFileDirectory(html, containersLogMeta); + } + } catch (Exception ex) { + if (LOG.isDebugEnabled()) { + LOG.debug(ex.getMessage()); + } + } } else { - File logFile = ContainerLogsUtils.getContainerLogFile(containerId, - $(CONTAINER_LOG_TYPE), request().getRemoteUser(), nmContext); - printLogFile(html, logFile); + String aggregationType = $("log.aggregation.type"); + if (aggregationType == null || aggregationType.isEmpty() || + aggregationType.trim().toLowerCase().equals("local")) { + File logFile = ContainerLogsUtils.getContainerLogFile(containerId, + $(CONTAINER_LOG_TYPE), request().getRemoteUser(), nmContext); + printLocalLogFile(html, logFile); + } else if (aggregationType.trim().toLowerCase() + .equals("aggregated")) { + printAggregatedLogFile(html, appId, containerId, + $(CONTAINER_LOG_TYPE)); + } else { + html.h1("Invalid value for query parameter: log.aggregation.type. " + + "The valid value could be either local or aggregated."); + } } } catch (YarnException ex) { html.h1(ex.getMessage()); } catch (NotFoundException ex) { html.h1(ex.getMessage()); + } catch (IOException ex) { + html.h1(ex.getMessage()); } } - private void printLogFile(Block html, File logFile) { + private void printLocalLogFile(Block html, File logFile) { long start = $("start").isEmpty() ? -4 * 1024 : Long.parseLong($("start")); start = start < 0 ? logFile.length() + start : start; @@ -184,8 +231,75 @@ private void printLogFile(Block html, File logFile) { } } } - - private void printLogFileDirectory(Block html, List containerLogsDirs) { + + private void printAggregatedLogFile(Block html, ApplicationId appId, + ContainerId containerId, String logType) throws IOException { + NMAggregatedLogsBlockRender block = new NMAggregatedLogsBlockRender( + this.context(), appId, $(APP_OWNER), this.nmContext.getNodeId(), + containerId); + + long start = -4096; + long end = Long.MAX_VALUE; + long timestampStart = 0; + long timestampEnd = Long.MAX_VALUE; + String startStr = $("start"); + if (startStr != null && !startStr.isEmpty()) { + try { + start = Long.parseLong(startStr); + } catch (NumberFormatException e) { + html.h1()._("Invalid log start value: " + startStr)._(); + return; + } + } + + String endStr = $("end"); + if (endStr != null && !endStr.isEmpty()) { + try { + end = Long.parseLong(endStr); + } catch (NumberFormatException e) { + html.h1()._("Invalid log end value: " + endStr)._(); + return; + } + } + + String timestampStartStr = $("timestampstart"); + if (timestampStartStr != null && !timestampStartStr.isEmpty()) { + try { + timestampStart = Long.parseLong(timestampStartStr); + } catch (NumberFormatException e) { + html.h1()._("Invalid timestamp start value: " + + timestampStartStr)._(); + return; + } + } + + String timestampEndStr = $("timestampend"); + if (timestampEndStr != null && !timestampEndStr.isEmpty()) { + try { + timestampEnd = Long.parseLong(timestampEndStr); + } catch (NumberFormatException e) { + html.h1()._("Invalid timestamp end value: " + timestampEndStr)._(); + return; + } + } + + if (timestampStart > timestampEnd) { + html.h1()._("Invalid timestampstart and timestampend values. " + + "timestampstart: [" + timestampStart + "]" + + ", timestampend[" + timestampEnd + "]")._(); + return; + } + + AggregatedLogLimits limits = new AggregatedLogLimits(start, end, + timestampStart, timestampEnd); + + String remoteUser = request().getRemoteUser(); + block.renderAggregatedLogsBlock(html, this.nmContext.getConf(), logType, + limits, null, null, remoteUser); + } + + private void printLocalLogFileDirectory(Block html, + List containerLogsDirs) { // Print out log types in lexical order Collections.sort(containerLogsDirs); boolean foundLogFile = false; @@ -208,5 +322,83 @@ private void printLogFileDirectory(Block html, List containerLogsDirs) { return; } } + + private void printAggregatedLogFileDirectory(Block html, + List containersLogMeta) throws ParseException { + List filesInfo = new ArrayList<>(); + for (ContainerLogMeta logMeta : containersLogMeta) { + filesInfo.addAll(logMeta.getContainerLogMeta()); + } + + //sort the list, so we could list the log file in order. + Collections.sort(filesInfo, new Comparator() { + @Override + public int compare(PerContainerLogFileInfo o1, + PerContainerLogFileInfo o2) { + return createAggregatedLogFileName(o1.getFileName(), + o1.getLastModifiedTime()).compareTo( + createAggregatedLogFileName(o2.getFileName(), + o2.getLastModifiedTime())); + } + }); + + boolean foundLogFile = false; + for (PerContainerLogFileInfo fileInfo : filesInfo) { + long timestamp = convertDateToTimeStamp(fileInfo.getLastModifiedTime()); + foundLogFile = true; + String fileName = createAggregatedLogFileName(fileInfo.getFileName(), + fileInfo.getLastModifiedTime()); + html.p().a(url("containerlogs", $(CONTAINER_ID), $(APP_OWNER), + fileInfo.getFileName(), + "?start=-4096&log.aggregation.type=aggregated×tampstart=" + + timestamp + "×tampend=" + (timestamp + 1000)), + fileName + " : Total file length is " + + fileInfo.getFileSize() + " bytes.")._(); + } + + if (!foundLogFile) { + html.h4("No aggregated logs available for container " + + $(CONTAINER_ID)); + return; + } + } + + private String createAggregatedLogFileName(String fileName, + String modificationTime) { + return fileName + "_" + modificationTime; + } + + private long convertDateToTimeStamp(String dateTime) + throws ParseException { + SimpleDateFormat sdf = new SimpleDateFormat( + "EEE MMM dd HH:mm:ss Z yyyy"); + Date d = sdf.parse(dateTime); + + Calendar c = Calendar.getInstance(); + c.setTime(d); + return c.getTimeInMillis(); + } + + private static class NMAggregatedLogsBlockRender + extends AggregatedLogsBlockRender { + + @Inject + NMAggregatedLogsBlockRender(ViewContext ctx, ApplicationId appId, + String appOwner, NodeId nodeId, ContainerId containerId) { + super(ctx, appId, appOwner, nodeId, containerId); + } + + @Override + public void createURLforFullLog(Block html, long toRead, long logLength, + String logType, String logEntity, AggregatedLogLimits logLimits) { + html.p()._("Showing " + toRead + " bytes. Click ").a( + url("containerlogs", this.getContainerId().toString(), + this.getAppOwner(), logType, "?start=0&log.aggregation.type" + + "=aggregated×tampstart=" + + logLimits.getTimestampStart() + "×tampend=" + + logLimits.getTimestampEnd()), "here"). + _(" for full log")._(); + } + } } -} +} \ No newline at end of file