diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java index 4ba8fe0b377..b9a2caa6b33 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java @@ -69,6 +69,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.logaggregation.UserLogsRequest; import org.apache.hadoop.yarn.server.webapp.WrappedLogMetaRequest; import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams; import org.apache.hadoop.yarn.server.webapp.LogServlet; @@ -423,6 +424,29 @@ public JobTaskAttemptCounterInfo getJobTaskAttemptIdCounters( return new JobTaskAttemptCounterInfo(ta); } + @GET + @Path("/userlogs") + @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + @InterfaceAudience.Public + @InterfaceStability.Unstable + public Response getAggregatedLogsMeta(@Context HttpServletRequest hsr, + @QueryParam(YarnWebServiceParams.CONTAINER_LOG_FILE_NAME) String fileName, + @QueryParam(YarnWebServiceParams.FILESIZE) String fileSize, + @QueryParam(YarnWebServiceParams.MODIFICATION_TIME) String modificationTime, + @QueryParam(YarnWebServiceParams.APP_ID) String appIdStr, + @QueryParam(YarnWebServiceParams.CONTAINER_ID) String containerIdStr, + @QueryParam(YarnWebServiceParams.NM_ID) String nmId) throws IOException { + init(); + UserLogsRequest logsRequest = new UserLogsRequest(); + logsRequest.setAppId(appIdStr); + logsRequest.setFileName(fileName); + logsRequest.setContainerId(containerIdStr); + logsRequest.setFileSize(fileSize); + logsRequest.setModificationTime(modificationTime); + logsRequest.setNodeId(nmId); + return logServlet.getContainerLogsInfo(hsr, logsRequest); + } + @GET @Path("/aggregatedlogs") @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java index b51be9af14d..9af75086fe9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java @@ -29,10 +29,13 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import com.google.common.annotations.VisibleForTesting; + +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.NoSuchElementException; @Private public class LogAggregationUtils { @@ -295,18 +298,7 @@ public static String getNodeString(String nodeId) { // Return both new and old node files combined RemoteIterator curDir = nodeFilesCur; RemoteIterator prevDir = nodeFilesPrev; - RemoteIterator nodeFilesCombined = new - RemoteIterator() { - @Override - public boolean hasNext() throws IOException { - return prevDir.hasNext() || curDir.hasNext(); - } - - @Override - public FileStatus next() throws IOException { - return prevDir.hasNext() ? prevDir.next() : curDir.next(); - } - }; + RemoteIterator nodeFilesCombined = combineIterators(prevDir, curDir); return nodeFilesCombined; } } @@ -368,4 +360,92 @@ public FileStatus next() throws IOException { return nodeFiles; } + public static RemoteIterator getRemoteFiles( + Configuration conf, Path appPath) throws IOException { + + Path qualifiedLogDir = + FileContext.getFileContext(conf).makeQualified(appPath); + return FileContext.getFileContext( + qualifiedLogDir.toUri(), conf).listStatus(appPath); + } + + public static RemoteIterator getUserRemoteLogDir( + Configuration conf, String user, Path remoteRootLogDir, + String remoteRootLogDirSuffix) throws IOException { + Path userPath = LogAggregationUtils.getRemoteLogSuffixedDir( + remoteRootLogDir, user, remoteRootLogDirSuffix); + final RemoteIterator userRootDirFiles = + getRemoteFiles(conf, userPath); + + RemoteIterator newDirs = new RemoteIterator() { + private RemoteIterator currentBucketDir = + LogAggregationUtils.getSubDir(conf, userRootDirFiles); + @Override + public boolean hasNext() throws IOException { + return currentBucketDir != null && currentBucketDir.hasNext() || + userRootDirFiles.hasNext(); + } + + @Override + public FileStatus next() throws IOException { + FileStatus next = null; + while (next == null) { + if (currentBucketDir != null && currentBucketDir.hasNext()) { + next = currentBucketDir.next(); + } else if (userRootDirFiles.hasNext()) { + currentBucketDir = LogAggregationUtils.getSubDir( + conf, userRootDirFiles); + } else { + throw new NoSuchElementException(); + } + } + return next; + } + }; + + RemoteIterator allDir = newDirs; + if (LogAggregationUtils.isOlderPathEnabled(conf)) { + try { + Path oldPath = LogAggregationUtils.getOlderRemoteLogSuffixedDir( + remoteRootLogDir, user, remoteRootLogDirSuffix); + final RemoteIterator oldUserRootDirFiles = + getRemoteFiles(conf, oldPath); + allDir = combineIterators(oldUserRootDirFiles, newDirs); + } catch (FileNotFoundException e) { + return newDirs; + } + } + + return allDir; + } + + private static RemoteIterator getSubDir( + Configuration conf, RemoteIterator rootDir) throws IOException { + if (rootDir.hasNext()) { + Path userPath = rootDir.next().getPath(); + Path qualifiedLogDir = + FileContext.getFileContext(conf).makeQualified(userPath); + return FileContext.getFileContext( + qualifiedLogDir.toUri(), conf).listStatus(userPath); + } else { + return null; + } + } + + private static RemoteIterator combineIterators( + RemoteIterator first, RemoteIterator second) { + return new RemoteIterator() { + @Override + public boolean hasNext() throws IOException { + return first.hasNext() || second.hasNext(); + } + + @Override + public FileStatus next() throws IOException { + return first.hasNext() ? first.next() : second.next(); + } + }; + + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/UserLogsRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/UserLogsRequest.java new file mode 100644 index 00000000000..a7bbd4c8709 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/UserLogsRequest.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.logaggregation; + +import java.util.function.Predicate; +import java.util.regex.Pattern; + +public class UserLogsRequest { + private String user; + private MatchExpression appId; + private MatchExpression containerId; + private MatchExpression nodeId; + private MatchExpression fileName; + private ComparisonExpression fileSize; + private ComparisonExpression modificationTime; + + public static class ComparisonExpression { + private static final String GREATER_SIGN = ">"; + private static final String LESSER_SIGN = "<"; + + private String expression; + private Predicate comparisonFn; + private Long convertedValue; + + public ComparisonExpression(String expression) { + if (expression == null) { + return; + } + + if (expression.startsWith(GREATER_SIGN)) { + convertedValue = Long.parseLong(expression.substring(1)); + comparisonFn = a -> a > convertedValue; + } else if (expression.startsWith(LESSER_SIGN)) { + convertedValue = Long.parseLong(expression.substring(1)); + comparisonFn = a -> a < convertedValue; + } else { + convertedValue = Long.parseLong(expression); + comparisonFn = a -> a.equals(convertedValue); + } + + this.expression = expression; + } + + public boolean compare(String value) { + return compare(Long.valueOf(value), true); + } + + public boolean compare(Long value) { + return compare(value, true); + } + + public boolean compare(Long value, boolean defaultValue) { + if (expression == null) { + return defaultValue; + } else { + return comparisonFn.test(value); + } + } + + @Override + public String toString() { + return convertedValue != null ? String.valueOf(convertedValue) : ""; + } + } + + public static class MatchExpression { + private Pattern expression; + + public MatchExpression(String expression) { + this.expression = expression != null ? Pattern.compile(expression) : null; + } + + public boolean compare(String value) { + return expression == null || expression.matcher(value).matches(); + } + + @Override + public String toString() { + return expression != null ? expression.pattern() : ""; + } + } + + public String getUser() { + return user; + } + + public MatchExpression getAppId() { + return appId; + } + + public MatchExpression getContainerId() { + return containerId; + } + + public MatchExpression getNodeId() { + return nodeId; + } + + public MatchExpression getFileName() { + return fileName; + } + + public ComparisonExpression getFileSize() { + return fileSize; + } + + public ComparisonExpression getModificationTime() { + return modificationTime; + } + + public void setUser(String user) { + this.user = user; + } + + public void setAppId(String appId) { + this.appId = new MatchExpression(appId); + } + + public void setContainerId(String containerId) { + this.containerId = new MatchExpression(containerId); + } + + public void setNodeId(String nodeId) { + this.nodeId = new MatchExpression(nodeId); + } + + public void setFileName(String fileName) { + this.fileName = new MatchExpression(fileName); + } + + public void setFileSize(String fileSize) { + this.fileSize = new ComparisonExpression(fileSize); + } + + public void setModificationTime(String modificationTime) { + this.modificationTime = new ComparisonExpression(modificationTime); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java index ab6eb613209..c9ea33b582f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; +import org.apache.hadoop.yarn.logaggregation.UserLogsRequest; import org.apache.hadoop.yarn.webapp.View.ViewContext; import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block; import org.slf4j.Logger; @@ -218,6 +219,18 @@ public abstract boolean readAggregatedLogs(ContainerLogsRequest logRequest, public abstract List readAggregatedLogsMeta( ContainerLogsRequest logRequest) throws IOException; + /** + * Return a list of {@link ContainerLogMeta} for a user + * from Remote FileSystem. + * + * @param logRequest {@link ContainerLogsRequest} + * @return a list of {@link ContainerLogMeta} + * @throws IOException if there is no available log file + */ + public List readUserAggregatedLogsMeta( + UserLogsRequest logRequest) throws IOException { + return Collections.emptyList(); + } /** * Render Aggregated Logs block. * @param html the html diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java index 2355d306403..b18b9434316 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java @@ -29,6 +29,7 @@ import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.logaggregation.UserLogsRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.commons.math3.util.Pair; @@ -258,6 +259,106 @@ public boolean readAggregatedLogs(ContainerLogsRequest logRequest, return findLogs; } + @Override + public List readUserAggregatedLogsMeta( + UserLogsRequest logRequest) throws IOException { + List containersLogMeta = new ArrayList<>(); + RemoteIterator appDirs = LogAggregationUtils. + getUserRemoteLogDir(conf, logRequest.getUser(), + remoteRootLogDir, remoteRootLogDirSuffix); + + while (appDirs.hasNext()) { + FileStatus currentAppDir = appDirs.next(); + if (logRequest.getAppId().compare(currentAppDir.getPath().getName())) { + RemoteIterator nodeFiles = LogAggregationUtils + .getRemoteFiles(conf, currentAppDir.getPath()); + + while (nodeFiles.hasNext()) { + FileStatus currentNodeFile = nodeFiles.next(); + + if (currentNodeFile.getPath().getName().equals( + logRequest.getAppId() + ".har")) { + Path p = new Path("har:///" + + currentNodeFile.getPath().toUri().getRawPath()); + nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p); + continue; + } + + collectLogMeta(logRequest, containersLogMeta, currentNodeFile); + } + } + + } + return containersLogMeta; + } + + private void collectLogMeta( + UserLogsRequest logRequest, List containersLogMeta, + FileStatus currentNodeFile) throws IOException { + Path nodePath = currentNodeFile.getPath(); + + if (logRequest.getNodeId().compare(nodePath.getName()) && + !nodePath.getName().endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) { + LogReader reader = + new LogReader(conf, + nodePath); + try { + DataInputStream valueStream; + LogKey key = new LogKey(); + valueStream = reader.next(key); + while (valueStream != null) { + if (logRequest.getContainerId().compare(key.toString())) { + ContainerLogMeta containerLogMeta = new ContainerLogMeta( + key.toString(), nodePath.getName()); + + collectContainerLogFilesMeta(logRequest, currentNodeFile, + valueStream, containerLogMeta); + if (!containerLogMeta.getContainerLogMeta().isEmpty()) { + containersLogMeta.add(containerLogMeta); + } + } + // Next container + key = new LogKey(); + valueStream = reader.next(key); + } + } finally { + reader.close(); + } + } + } + + private void collectContainerLogFilesMeta( + UserLogsRequest logRequest, FileStatus currentNodeFile, + DataInputStream valueStream, ContainerLogMeta containerLogMeta) + throws IOException { + while (true) { + try { + Pair logMeta = + LogReader.readContainerMetaDataAndSkipData( + valueStream); + + boolean isFileNameMatches = logRequest.getFileName() + .compare(logMeta.getFirst()); + boolean fileSizeComparison = logRequest.getFileSize() + .compare(logMeta.getSecond()); + boolean modificationTimeComparison = logRequest.getModificationTime() + .compare(currentNodeFile.getModificationTime()); + + if (!isFileNameMatches || !fileSizeComparison || + !modificationTimeComparison) { + continue; + } + + containerLogMeta.addLogMeta( + logMeta.getFirst(), + logMeta.getSecond(), + Times.format(currentNodeFile.getModificationTime())); + } catch (EOFException eof) { + break; + } + } + } + @Override public List readAggregatedLogsMeta( ContainerLogsRequest logRequest) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogServlet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogServlet.java index 33de8df0111..608252966a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogServlet.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogServlet.java @@ -24,11 +24,14 @@ import com.sun.jersey.api.client.UniformInterfaceException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType; import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta; +import org.apache.hadoop.yarn.logaggregation.UserLogsRequest; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory; import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo; import org.apache.hadoop.yarn.util.Apps; @@ -45,6 +48,7 @@ import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.StreamingOutput; +import java.io.IOException; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collections; @@ -216,6 +220,29 @@ public Response getLogsInfo(HttpServletRequest hsr, String appIdStr, redirectedFromNode, null, manualRedirection); } + public Response getContainerLogsInfo(HttpServletRequest req, + UserLogsRequest logsRequest) throws IOException { + List logs = new ArrayList<>(); + if (logsRequest.getUser() == null) { + logsRequest.setUser(UserGroupInformation.getCurrentUser().getUserName()); + } + + for (LogAggregationFileController fc : getOrCreateFactory().getConfiguredLogAggregationFileControllerList()) { + logs.addAll(fc.readUserAggregatedLogsMeta(logsRequest)); + } + List containersLogsInfo = convertToContainerLogsInfo( + logs, false); + GenericEntity> meta = + new GenericEntity>(containersLogsInfo) { + }; + Response.ResponseBuilder response = Response.ok(meta); + // Sending the X-Content-Type-Options response header with the value + // nosniff will prevent Internet Explorer from MIME-sniffing a response + // away from the declared content-type. + response.header("X-Content-Type-Options", "nosniff"); + return response.build(); + } + /** * Returns information about the logs for a specific container. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/YarnWebServiceParams.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/YarnWebServiceParams.java index 0d9e9f68c10..efa1e11b50f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/YarnWebServiceParams.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/YarnWebServiceParams.java @@ -39,4 +39,6 @@ String REDIRECTED_FROM_NODE = "redirected_from_node"; String CLUSTER_ID = "clusterid"; String MANUAL_REDIRECTION = "manual_redirection"; + String FILESIZE = "file_size"; + String MODIFICATION_TIME = "modification_time"; }