diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java index a648fef..331e6a1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java @@ -46,6 +46,7 @@ *
  • Optional, application-specific binary service data.
  • *
  • Environment variables for the launched process.
  • *
  • Command to launch the container.
  • + *
  • Optional, application-specific {@link LogContext}
  • * *

    * @@ -73,6 +74,18 @@ public static ContainerLaunchContext newInstance( return container; } + @Public + @Stable + public static ContainerLaunchContext newInstance( + Map localResources, + Map environment, List commands, + Map serviceData, ByteBuffer tokens, + Map acls, LogContext logContext) { + ContainerLaunchContext container = newInstance(localResources, environment, + commands, serviceData, tokens, acls); + container.setLogContext(logContext); + return container; + } /** * Get all the tokens needed by this container. It may include file-system * tokens, ApplicationMaster related tokens if this container is an @@ -196,4 +209,23 @@ public static ContainerLaunchContext newInstance( @Public @Stable public abstract void setApplicationACLs(Map acls); + + /** + * Get LogContext of the application + * + * @return LogContext of the application + */ + @Public + @Stable + public abstract LogContext getLogContext(); + + /** + * Set LogContext for the application + * + * @param logContext + * for the application + */ + @Public + @Stable + public abstract void setLogContext(LogContext logContext); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogContext.java new file mode 100644 index 0000000..4d994e6 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogContext.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +import com.google.common.base.Preconditions; + +/** + *

    LogContext represents all of the + * information needed by the NodeManager to handle + * the logs for an application.

    + * + *

    It includes details such as: + *

      + *
    • logIncludePatterns. It defines include patterns which is used to + * filter the log files. The log files which match the include + * patterns will be uploaded.
    • + *
    • logExcludePatterns. It defines exclude patterns which is used to + * filter the log files. The log files which match the include + * patterns will not be uploaded. If any patterns are defined in + * both logIncludePatterns and logExcludePatterns, those files matched + * with these patterns will not be uploaded.
    • + *
    • logInterval. The default value is 0. By default, + * the logAggregationService only uploads container logs when + * the application is finished. This configure defines + * how often the logAggregationSerivce uploads container logs in days + * By setting this configure, the logAggregationSerivce can upload container + * logs periodically when the application is running. + *
    • + *
    + *

    + * + * @see ContainerLaunchContext + */ +public abstract class LogContext { + + @Public + @Unstable + public static LogContext newInstance(Set logIncludePatterns, + Set logExcludePatterns, long logInterval) { + Preconditions.checkArgument(logInterval >= 0); + LogContext context = Records.newRecord(LogContext.class); + context.setLogIncludePatterns(logIncludePatterns); + context.setLogExcludePatterns(logExcludePatterns); + context.setLogInterval(logInterval); + return context; + } + + /** + * Get include patterns + * @return set of include patterns + */ + @Public + @Stable + public abstract Set getLogIncludePatterns(); + + /** + * Set include patterns + * @param includePatterns to set + */ + @Public + @Stable + public abstract void setLogIncludePatterns(Set includePatterns); + + /** + * Get exclude patterns + * @return set of exclude patterns + */ + @Public + @Stable + public abstract Set getLogExcludePatterns(); + + /** + * Set exclude patterns + * @param excludePatterns to set + */ + @Public + @Stable + public abstract void setLogExcludePatterns(Set excludePatterns); + + /** + * Get LogInterval per day + * @return the logInterval + */ + @Public + @Stable + public abstract long getLogInterval(); + + /** + * Set LogInterval per day + * @param logInterval + */ + @Public + @Stable + public abstract void setLogInterval(long logInterval); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index d8c42cc..35b2b60 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -294,6 +294,12 @@ message ApplicationSubmissionContextProto { optional int64 attempt_failures_validity_interval = 13 [default = -1]; } +message LogContextProto { + repeated string log_include_pattern = 1; + repeated string log_exclude_pattern = 2; + optional int64 log_monitor_interval = 3 [default = 0]; +} + enum ApplicationAccessTypeProto { APPACCESS_VIEW_APP = 1; APPACCESS_MODIFY_APP = 2; @@ -344,6 +350,7 @@ message ContainerLaunchContextProto { repeated StringStringMapProto environment = 4; repeated string command = 5; repeated ApplicationACLMapProto application_ACLs = 6; + optional LogContextProto log_context = 7; } message ContainerStatusProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java index eb6169c..720a2fc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java @@ -31,7 +31,6 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Tool; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -39,8 +38,6 @@ import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; -import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -113,17 +110,16 @@ public int run(String[] args) throws Exception { System.err.println("Invalid ApplicationId specified"); return -1; } - + try { int resultCode = verifyApplicationState(appId); if (resultCode != 0) { - System.out.println("Application has not completed." + - " Logs are only available after an application completes"); + System.out.println("Logs are not avaiable right now."); return resultCode; } } catch (Exception e) { System.err.println("Unable to get ApplicationState." + - " Attempting to fetch logs directly from the filesystem."); + " Attempting to fetch logs directly from the filesystem."); } LogCLIHelpers logCliHelper = new LogCLIHelpers(); @@ -141,18 +137,9 @@ public int run(String[] args) throws Exception { printHelpMessage(printOpts); resultCode = -1; } else { - Path remoteRootLogDir = - new Path(getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, - YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); - AggregatedLogFormat.LogReader reader = - new AggregatedLogFormat.LogReader(getConf(), - LogAggregationUtils.getRemoteNodeLogFileForApp( - remoteRootLogDir, - appId, - appOwner, - ConverterUtils.toNodeId(nodeAddress), - LogAggregationUtils.getRemoteNodeLogDirSuffix(getConf()))); - resultCode = logCliHelper.dumpAContainerLogs(containerIdStr, reader, System.out); + resultCode = + logCliHelper.dumpAContainersLogs(appIdStr, containerIdStr, + nodeAddress, appOwner); } return resultCode; @@ -167,10 +154,10 @@ private int verifyApplicationState(ApplicationId appId) throws IOException, switch (appReport.getYarnApplicationState()) { case NEW: case NEW_SAVING: + return -1; case ACCEPTED: case SUBMITTED: case RUNNING: - return -1; case FAILED: case FINISHED: case KILLED: diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java index 12dcfcd..0373ef2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java @@ -30,10 +30,12 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LogContext; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; +import org.apache.hadoop.yarn.proto.YarnProtos.LogContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.StringBytesMapProto; import org.apache.hadoop.yarn.proto.YarnProtos.StringLocalResourceMapProto; import org.apache.hadoop.yarn.proto.YarnProtos.StringStringMapProto; @@ -56,6 +58,7 @@ private Map environment = null; private List commands = null; private Map applicationACLS = null; + private LogContext logContext = null; public ContainerLaunchContextPBImpl() { builder = ContainerLaunchContextProto.newBuilder(); @@ -120,6 +123,9 @@ private void mergeLocalToBuilder() { if (this.applicationACLS != null) { addApplicationACLs(); } + if (this.logContext != null) { + builder.setLogContext(convertToProtoFormat(this.logContext)); + } } private void mergeLocalToProto() { @@ -463,4 +469,34 @@ private LocalResourcePBImpl convertFromProtoFormat(LocalResourceProto p) { private LocalResourceProto convertToProtoFormat(LocalResource t) { return ((LocalResourcePBImpl)t).getProto(); } + + + private LogContextPBImpl convertFromProtoFormat(LogContextProto p) { + return new LogContextPBImpl(p); + } + + private LogContextProto convertToProtoFormat(LogContext t) { + return ((LogContextPBImpl)t).getProto(); + } + + @Override + public LogContext getLogContext() { + ContainerLaunchContextProtoOrBuilder p = viaProto ? proto : builder; + if (this.logContext != null) { + return this.logContext; + } // Else via proto + if (!p.hasLogContext()) { + return null; + } + logContext = convertFromProtoFormat(p.getLogContext()); + return logContext; + } + + @Override + public void setLogContext(LogContext logContext) { + maybeInitBuilder(); + if (logContext == null) + builder.clearLogContext(); + this.logContext = logContext; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogContextPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogContextPBImpl.java new file mode 100644 index 0000000..73ccf71 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogContextPBImpl.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.LogContext; +import org.apache.hadoop.yarn.proto.YarnProtos.LogContextProto; +import org.apache.hadoop.yarn.proto.YarnProtos.LogContextProtoOrBuilder; + +import com.google.common.base.Preconditions; +import com.google.protobuf.TextFormat; + +@Private +@Unstable +public class LogContextPBImpl extends LogContext{ + LogContextProto proto = + LogContextProto.getDefaultInstance(); + LogContextProto.Builder builder = null; + boolean viaProto = false; + + private Set logIncludePatterns = null; + private Set logExcludePatterns = null; + + public LogContextPBImpl() { + builder = LogContextProto.newBuilder(); + } + + public LogContextPBImpl(LogContextProto proto) { + this.proto = proto; + viaProto = true; + } + + public LogContextProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToBuilder() { + if (this.logIncludePatterns != null && !this.logIncludePatterns.isEmpty()) { + builder.clearLogIncludePattern(); + builder.addAllLogIncludePattern(this.logIncludePatterns); + } + + if (this.logExcludePatterns != null && !this.logExcludePatterns.isEmpty()) { + builder.clearLogExcludePattern(); + builder.addAllLogExcludePattern(this.logExcludePatterns); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = LogContextProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public Set getLogIncludePatterns() { + initLogIncludePatterns(); + return this.logIncludePatterns; + } + + @Override + public void setLogIncludePatterns(Set logPattern) { + maybeInitBuilder(); + if (logPattern == null || logPattern.isEmpty()) { + builder.clearLogIncludePattern(); + this.logIncludePatterns = null; + return; + } + + this.logIncludePatterns = new HashSet(); + this.logIncludePatterns.addAll(logPattern); + } + + private void initLogIncludePatterns() { + if (this.logIncludePatterns != null) { + return; + } + LogContextProtoOrBuilder p = viaProto ? proto : builder; + this.logIncludePatterns = new HashSet(); + this.logIncludePatterns.addAll(p.getLogIncludePatternList()); + } + + @Override + public Set getLogExcludePatterns() { + initLogExcludePatterns(); + return this.logExcludePatterns; + } + + @Override + public void setLogExcludePatterns(Set logPattern) { + maybeInitBuilder(); + if (logPattern == null || logPattern.isEmpty()) { + builder.clearLogExcludePattern(); + this.logExcludePatterns = null; + return; + } + + this.logExcludePatterns = new HashSet(); + this.logExcludePatterns.addAll(logPattern); + } + + private void initLogExcludePatterns() { + if (this.logExcludePatterns != null) { + return; + } + LogContextProtoOrBuilder p = viaProto ? proto : builder; + this.logExcludePatterns = new HashSet(); + this.logExcludePatterns.addAll(p.getLogExcludePatternList()); + } + + @Override + public long getLogInterval() { + LogContextProtoOrBuilder p = viaProto ? proto : builder; + return p.getLogMonitorInterval(); + } + + @Override + public void setLogInterval(long logInterval) { + Preconditions.checkArgument(logInterval >= 0); + maybeInitBuilder(); + builder.setLogMonitorInterval(logInterval); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index 3568de2..e74948e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -24,6 +24,7 @@ import java.io.DataOutputStream; import java.io.EOFException; import java.io.File; +import java.io.FileFilter; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; @@ -35,10 +36,12 @@ import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; - +import java.util.Set; +import org.apache.commons.io.filefilter.WildcardFileFilter; import org.apache.commons.io.input.BoundedInputStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -60,10 +63,15 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.LogContext; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.util.ConverterUtils; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; + @Public @Evolving public class AggregatedLogFormat { @@ -149,6 +157,10 @@ public String toString() { private final List rootLogDirs; private final ContainerId containerId; private final String user; + private final LogContext logContext; + private Set uploadedFiles = new HashSet(); + private final Set alreadyUploadedLogFiles; + private Set allExistingFileMeta = new HashSet(); // TODO Maybe add a version string here. Instead of changing the version of // the entire k-v format @@ -160,9 +172,24 @@ public LogValue(List rootLogDirs, ContainerId containerId, // Ensure logs are processed in lexical order Collections.sort(this.rootLogDirs); + this.logContext = null; + this.alreadyUploadedLogFiles = new HashSet(); } - public void write(DataOutputStream out) throws IOException { + public LogValue(List rootLogDirs, ContainerId containerId, + String user, LogContext logContext, + Set alreadyUploadedLogFiles) { + this.rootLogDirs = new ArrayList(rootLogDirs); + this.containerId = containerId; + this.user = user; + + // Ensure logs are processed in lexical order + Collections.sort(this.rootLogDirs); + this.logContext = logContext; + this.alreadyUploadedLogFiles = alreadyUploadedLogFiles; + } + + private int numOflogFiles() { for (String rootLogDir : this.rootLogDirs) { File appLogDir = new File(rootLogDir, @@ -177,61 +204,131 @@ public void write(DataOutputStream out) throws IOException { continue; // ContainerDir may have been deleted by the user. } - // Write out log files in lexical order - File[] logFiles = containerLogDir.listFiles(); - Arrays.sort(logFiles); - for (File logFile : logFiles) { - - final long fileLength = logFile.length(); - - // Write the logFile Type - out.writeUTF(logFile.getName()); - - // Write the log length as UTF so that it is printable - out.writeUTF(String.valueOf(fileLength)); - - // Write the log itself - FileInputStream in = null; - try { - in = SecureIOUtils.openForRead(logFile, getUser(), null); - byte[] buf = new byte[65535]; - int len = 0; - long bytesLeft = fileLength; - while ((len = in.read(buf)) != -1) { - //If buffer contents within fileLength, write - if (len < bytesLeft) { - out.write(buf, 0, len); - bytesLeft-=len; - } - //else only write contents within fileLength, then exit early - else { - out.write(buf, 0, (int)bytesLeft); - break; - } - } - long newLength = logFile.length(); - if(fileLength < newLength) { - LOG.warn("Aggregated logs truncated by approximately "+ - (newLength-fileLength) +" bytes."); + this.uploadedFiles.addAll(getFilteredLogFiles(containerLogDir)); + } + return this.uploadedFiles.size(); + } + + public void write(DataOutputStream out) throws IOException { + List fileList = new ArrayList(this.uploadedFiles); + Collections.sort(fileList); + + for (File logFile : fileList) { + final long fileLength = logFile.length(); + // Write the logFile Type + out.writeUTF(logFile.getName()); + + // Write the log length as UTF so that it is printable + out.writeUTF(String.valueOf(fileLength)); + + // Write the log itself + FileInputStream in = null; + try { + in = SecureIOUtils.openForRead(logFile, getUser(), null); + byte[] buf = new byte[65535]; + int len = 0; + long bytesLeft = fileLength; + while ((len = in.read(buf)) != -1) { + //If buffer contents within fileLength, write + if (len < bytesLeft) { + out.write(buf, 0, len); + bytesLeft-=len; } - } catch (IOException e) { - String message = "Error aggregating log file. Log file : " - + logFile.getAbsolutePath() + e.getMessage(); - LOG.error(message, e); - out.write(message.getBytes()); - } finally { - if (in != null) { - in.close(); + //else only write contents within fileLength, then exit early + else { + out.write(buf, 0, (int)bytesLeft); + break; } } + long newLength = logFile.length(); + if(fileLength < newLength) { + LOG.warn("Aggregated logs truncated by approximately "+ + (newLength-fileLength) +" bytes."); + } + } catch (IOException e) { + String message = "Error aggregating log file. Log file : " + + logFile.getAbsolutePath() + e.getMessage(); + LOG.error(message, e); + out.write(message.getBytes()); + } finally { + if (in != null) { + in.close(); + } } } } - + // Added for testing purpose. public String getUser() { return user; } + + private Set getFilteredLogFiles(File containerLogDir) { + Set candiates = + new HashSet(Arrays.asList(containerLogDir.listFiles())); + for (File logFile : candiates) { + this.allExistingFileMeta.add(fileMeta(logFile)); + } + + if (this.logContext != null && candiates.size() > 0) { + if (this.logContext.getLogIncludePatterns() != null + && !this.logContext.getLogIncludePatterns().isEmpty()) { + FileFilter includeLogFileFilter = + new WildcardFileFilter(new ArrayList( + this.logContext.getLogIncludePatterns())); + candiates = + new HashSet(Arrays.asList(containerLogDir + .listFiles(includeLogFileFilter))); + } + + FileFilter excludeLogFileFilter = null; + if (this.logContext.getLogExcludePatterns() != null + && !this.logContext.getLogExcludePatterns().isEmpty()) { + excludeLogFileFilter = + new WildcardFileFilter(new ArrayList( + this.logContext.getLogExcludePatterns())); + } + final Set excludeFiles = + excludeLogFileFilter == null ? new HashSet() + : new HashSet(Arrays.asList(containerLogDir + .listFiles(excludeLogFileFilter))); + Iterable mask = + Iterables.filter(candiates, new Predicate() { + @Override + public boolean apply(File next) { + return !alreadyUploadedLogFiles.contains(fileMeta(next)) + && !excludeFiles.contains(next); + } + }); + candiates = Sets.newHashSet(mask); + } + return candiates; + } + + public Set getCurrentUpLoadedFilesPath() { + Set path = new HashSet(); + for (File file : this.uploadedFiles) { + path.add(new Path(file.getAbsolutePath())); + } + return path; + } + + public Set getCurrentUpLoadedFileMeta() { + Set info = new HashSet(); + for (File file : this.uploadedFiles) { + info.add(fileMeta(file)); + } + return info; + } + + public Set getAllExistingFilesMeta() { + return this.allExistingFileMeta; + } + + private String fileMeta(File file) { + return containerId.toString() + "_" + file.getName() + "_" + + file.lastModified(); + } } /** @@ -242,16 +339,22 @@ public String getUser() { private final FSDataOutputStream fsDataOStream; private final TFile.Writer writer; + private final Path remoteAppLogFile; + private FileContext fc; public LogWriter(final Configuration conf, final Path remoteAppLogFile, UserGroupInformation userUgi) throws IOException { + this.remoteAppLogFile = remoteAppLogFile; try { this.fsDataOStream = userUgi.doAs(new PrivilegedExceptionAction() { @Override public FSDataOutputStream run() throws Exception { - FileContext fc = FileContext.getFileContext(conf); + fc = FileContext.getFileContext(conf); fc.setUMask(APP_LOG_FILE_UMASK); + if (fc.util().exists(remoteAppLogFile)) { + fc.delete(remoteAppLogFile, false); + } return fc.create( remoteAppLogFile, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), @@ -304,6 +407,14 @@ public void writeApplicationACLs(Map appAcls) } public void append(LogKey logKey, LogValue logValue) throws IOException { + if (logValue.numOflogFiles() == 0) { + // No new logs need to uploaded at this time + // delete the temporary remote App file + if (fc.util().exists(remoteAppLogFile)) { + fc.delete(remoteAppLogFile, false); + return; + } + } DataOutputStream out = this.writer.prepareAppendKey(-1); logKey.write(out); out.close(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java index 4445ff9..96cbdf6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java @@ -28,6 +28,7 @@ @Private public class LogAggregationUtils { + public static final String TMP_FILE = "TEMP.tmp"; /** * Constructs the full filename for an application's log file per node. * @param remoteRootLogDir @@ -44,6 +45,21 @@ public static Path getRemoteNodeLogFileForApp(Path remoteRootLogDir, } /** + * Get the remote node dir under remote app log dir. + * @param remoteRootLogDir + * @param appId + * @param user + * @param nodeId + * @param suffix + * @return the remote log file. + */ + public static Path getRemoteNodeLogDirForApp(Path remoteRootLogDir, + ApplicationId appId, String user, NodeId nodeId, String suffix) { + return new Path(getRemoteAppLogDir(remoteRootLogDir, appId, user, suffix), + getNodeString(nodeId)); + } + + /** * Gets the remote app log dir. * @param remoteRootLogDir * @param appId diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java index 3bafdb3..3eb4a06 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java @@ -52,19 +52,42 @@ public int dumpAContainersLogs(String appId, String containerId, YarnConfiguration.NM_REMOTE_APP_LOG_DIR, YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(getConf()); - Path logPath = LogAggregationUtils.getRemoteNodeLogFileForApp( + Path remoteAppNodeLogDir = LogAggregationUtils.getRemoteNodeLogFileForApp( remoteRootLogDir, ConverterUtils.toApplicationId(appId), jobOwner, ConverterUtils.toNodeId(nodeId), suffix); - AggregatedLogFormat.LogReader reader; + RemoteIterator nodeFiles; try { - reader = new AggregatedLogFormat.LogReader(getConf(), logPath); - } catch (FileNotFoundException fnfe) { - System.out.println("Logs not available at " + logPath.toString()); + Path qualifiedLogDir = + FileContext.getFileContext(getConf()).makeQualified( + remoteAppNodeLogDir); + nodeFiles = + FileContext.getFileContext(qualifiedLogDir.toUri(), getConf()) + .listStatus(remoteAppNodeLogDir); + } catch (FileNotFoundException fnf) { + System.out.println("Logs not available at " + + remoteAppNodeLogDir.toString()); System.out .println("Log aggregation has not completed or is not enabled."); return -1; } - return dumpAContainerLogs(containerId, reader, System.out); + boolean foundContainerLogs = false; + while (nodeFiles.hasNext()) { + FileStatus thisNodeFile = nodeFiles.next(); + if (thisNodeFile.getPath().getName().contains( + remoteAppNodeLogDir.getName())) { + AggregatedLogFormat.LogReader reader = + new AggregatedLogFormat.LogReader(getConf(), new Path( + remoteAppNodeLogDir, thisNodeFile.getPath().getName())); + if (dumpAContainerLogs(containerId, reader, System.out) > -1) { + foundContainerLogs = true; + } + } + } + if (!foundContainerLogs) { + System.out.println("Logs for container " + containerId + + " are not present in this log-file."); + } + return 0; } @Private @@ -81,8 +104,6 @@ public int dumpAContainerLogs(String containerIdStr, } if (valueStream == null) { - System.out.println("Logs for container " + containerIdStr - + " are not present in this log-file."); return -1; } @@ -120,34 +141,53 @@ public int dumpAllContainersLogs(ApplicationId appId, String appOwner, return -1; } while (nodeFiles.hasNext()) { - FileStatus thisNodeFile = nodeFiles.next(); - AggregatedLogFormat.LogReader reader = new AggregatedLogFormat.LogReader( - getConf(), new Path(remoteAppLogDir, thisNodeFile.getPath().getName())); - try { - - DataInputStream valueStream; - LogKey key = new LogKey(); - valueStream = reader.next(key); - - while (valueStream != null) { - String containerString = "\n\nContainer: " + key + " on " - + thisNodeFile.getPath().getName(); - out.println(containerString); - out.println(StringUtils.repeat("=", containerString.length())); - while (true) { + FileStatus thisNodeDir = nodeFiles.next(); + if (thisNodeDir.isDirectory()) { + RemoteIterator logFiles; + try { + Path qualifiedLogDir = + FileContext.getFileContext(getConf()).makeQualified( + thisNodeDir.getPath()); + logFiles = + FileContext.getFileContext(qualifiedLogDir.toUri(), getConf()) + .listStatus(thisNodeDir.getPath()); + } catch (FileNotFoundException fnf) { + continue; + } + while (logFiles.hasNext()) { + FileStatus logFile = logFiles.next(); + if (!logFile.getPath().getName().equals(LogAggregationUtils.TMP_FILE)) { + AggregatedLogFormat.LogReader reader = + new AggregatedLogFormat.LogReader(getConf(),logFile.getPath()); try { - LogReader.readAContainerLogsForALogType(valueStream, out); - } catch (EOFException eof) { - break; + + DataInputStream valueStream; + LogKey key = new LogKey(); + valueStream = reader.next(key); + + while (valueStream != null) { + String containerString = + "\n\nContainer: " + key + " on " + + logFile.getPath().getName(); + out.println(containerString); + out.println(StringUtils.repeat("=", containerString.length())); + while (true) { + try { + LogReader.readAContainerLogsForALogType(valueStream, out); + } catch (EOFException eof) { + break; + } + } + + // Next container + key = new LogKey(); + valueStream = reader.next(key); + } + } finally { + reader.close(); } } - - // Next container - key = new LogKey(); - valueStream = reader.next(key); } - } finally { - reader.close(); } } return 0; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java index c6572e9..caacef8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java @@ -178,6 +178,7 @@ public static void setup() throws Exception { "http", "localhost", 8080, "file0")); typeValueCache.put(SerializedException.class, SerializedException.newInstance(new IOException("exception for test"))); + generateByNewInstance(LogContext.class); generateByNewInstance(ApplicationId.class); generateByNewInstance(ApplicationAttemptId.class); generateByNewInstance(ContainerId.class); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java index 94902d4..a734484 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java @@ -173,7 +173,7 @@ public void testNoLogs() throws Exception { block.getWriter().flush(); String out = data.toString(); - assertTrue(out.contains("No logs available for container container_0_0001_01_000001")); + assertTrue(out.contains("Logs not available for entity")); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 12166e0..eb8b8c6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -72,9 +72,11 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.LogContext; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.SerializedException; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.LogContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; @@ -275,11 +277,16 @@ private void recoverApplication(ContainerManagerApplicationProto p) aclProto.getAcl()); } + LogContext logContext = null; + if (p.getLogContext() != null) { + logContext = new LogContextPBImpl(p.getLogContext()); + } + LOG.info("Recovering application " + appId); ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId, creds, context); context.getApplications().put(appId, app); - app.handle(new ApplicationInitEvent(appId, acls)); + app.handle(new ApplicationInitEvent(appId, acls, logContext)); } @SuppressWarnings("unchecked") @@ -719,13 +726,15 @@ protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier, private ContainerManagerApplicationProto buildAppProto(ApplicationId appId, String user, Credentials credentials, - Map appAcls) { + Map appAcls, LogContext logContext) { ContainerManagerApplicationProto.Builder builder = ContainerManagerApplicationProto.newBuilder(); builder.setId(((ApplicationIdPBImpl) appId).getProto()); builder.setUser(user); - + if (logContext != null) { + builder.setLogContext(((LogContextPBImpl) logContext).getProto()); + } builder.clearCredentials(); if (credentials != null) { DataOutputBuffer dob = new DataOutputBuffer(); @@ -828,10 +837,12 @@ private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier, LOG.info("Creating a new application reference for app " + applicationID); Map appAcls = container.getLaunchContext().getApplicationACLs(); + LogContext logContext = container.getLaunchContext().getLogContext(); context.getNMStateStore().storeApplication(applicationID, - buildAppProto(applicationID, user, credentials, appAcls)); + buildAppProto(applicationID, user, credentials, appAcls, + logContext)); dispatcher.getEventHandler().handle( - new ApplicationInitEvent(applicationID, appAcls)); + new ApplicationInitEvent(applicationID, appAcls, logContext)); } this.context.getNMStateStore().storeContainer(containerId, request); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index cc5544c..5eeb37b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -237,7 +237,7 @@ public void transition(ApplicationImpl app, ApplicationEvent event) { app.dispatcher.getEventHandler().handle( new LogHandlerAppStartedEvent(app.appId, app.user, app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS, - app.applicationACLs)); + app.applicationACLs, initEvent.getLogContext())); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java index 5746b6a..fdd828f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java @@ -22,18 +22,32 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.LogContext; public class ApplicationInitEvent extends ApplicationEvent { private final Map applicationACLs; + private final LogContext logContext; public ApplicationInitEvent(ApplicationId appId, Map acls) { super(appId, ApplicationEventType.INIT_APPLICATION); this.applicationACLs = acls; + this.logContext = null; + } + + public ApplicationInitEvent(ApplicationId appId, + Map acls, LogContext logContext) { + super(appId, ApplicationEventType.INIT_APPLICATION); + this.applicationACLs = acls; + this.logContext = logContext; } public Map getApplicationACLs() { return this.applicationACLs; } + + public LogContext getLogContext() { + return this.logContext; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 1af48bb..ef60328 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -20,14 +20,17 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -36,35 +39,44 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.LogContext; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter; +import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; +import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; import org.apache.hadoop.yarn.util.ConverterUtils; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; + public class AppLogAggregatorImpl implements AppLogAggregator { private static final Log LOG = LogFactory .getLog(AppLogAggregatorImpl.class); private static final int THREAD_SLEEP_TIME = 1000; - private static final String TMP_FILE_SUFFIX = ".tmp"; private final LocalDirsHandlerService dirsHandler; private final Dispatcher dispatcher; private final ApplicationId appId; + private final NodeId nodeId; private final String applicationId; private boolean logAggregationDisabled = false; private final Configuration conf; private final DeletionService delService; private final UserGroupInformation userUgi; - private final Path remoteNodeLogFileForApp; - private final Path remoteNodeTmpLogFileForApp; + private final Path remoteNodeLogDirForApp; + private Path remoteNodeTmpLogFileForApp; private final ContainerLogsRetentionPolicy retentionPolicy; private final BlockingQueue pendingContainers; @@ -72,53 +84,37 @@ private final AtomicBoolean appAggregationFinished = new AtomicBoolean(); private final AtomicBoolean aborted = new AtomicBoolean(); private final Map appAcls; - - private LogWriter writer = null; + private final LogContext logContext; + private final Context context; + private String remoteLogFileName; + private Set currentUploadedFiles = new HashSet(); + private Set alreadyUploadedLogs = new HashSet(); + private Set currentExistingLogFiles = new HashSet(); public AppLogAggregatorImpl(Dispatcher dispatcher, - DeletionService deletionService, Configuration conf, ApplicationId appId, - UserGroupInformation userUgi, LocalDirsHandlerService dirsHandler, - Path remoteNodeLogFileForApp, + DeletionService deletionService, Configuration conf, NodeId nodeId, + ApplicationId appId, UserGroupInformation userUgi, + LocalDirsHandlerService dirsHandler, Path remoteNodeLogDirForApp, ContainerLogsRetentionPolicy retentionPolicy, - Map appAcls) { + Map appAcls, LogContext logContext, + Context context) { this.dispatcher = dispatcher; this.conf = conf; this.delService = deletionService; + this.nodeId = nodeId; this.appId = appId; this.applicationId = ConverterUtils.toString(appId); this.userUgi = userUgi; this.dirsHandler = dirsHandler; - this.remoteNodeLogFileForApp = remoteNodeLogFileForApp; - this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp(); + this.remoteNodeLogDirForApp = remoteNodeLogDirForApp; this.retentionPolicy = retentionPolicy; this.pendingContainers = new LinkedBlockingQueue(); this.appAcls = appAcls; + this.logContext = logContext; + this.context = context; } - private void uploadLogsForContainer(ContainerId containerId) { - - if (this.logAggregationDisabled) { - return; - } - - // Lazy creation of the writer - if (this.writer == null) { - LOG.info("Starting aggregate log-file for app " + this.applicationId - + " at " + this.remoteNodeTmpLogFileForApp); - try { - this.writer = - new LogWriter(this.conf, this.remoteNodeTmpLogFileForApp, - this.userUgi); - //Write ACLs once when and if the writer is created. - this.writer.writeApplicationACLs(appAcls); - this.writer.writeApplicationOwner(this.userUgi.getShortUserName()); - } catch (IOException e) { - LOG.error("Cannot create writer for app " + this.applicationId - + ". Disabling log-aggregation for this app.", e); - this.logAggregationDisabled = true; - return; - } - } + private void uploadLogsForContainer(ContainerId containerId, LogWriter writer) { LOG.info("Uploading logs for container " + containerId + ". Current good log dirs are " @@ -126,13 +122,91 @@ private void uploadLogsForContainer(ContainerId containerId) { LogKey logKey = new LogKey(containerId); LogValue logValue = new LogValue(dirsHandler.getLogDirs(), containerId, - userUgi.getShortUserName()); + userUgi.getShortUserName(), this.logContext, this.alreadyUploadedLogs); try { - this.writer.append(logKey, logValue); + writer.append(logKey, logValue); } catch (IOException e) { LOG.error("Couldn't upload logs for " + containerId + ". Skipping this container."); } + currentUploadedFiles.addAll(logValue.getCurrentUpLoadedFilesPath()); + alreadyUploadedLogs.addAll(logValue.getCurrentUpLoadedFileMeta()); + currentExistingLogFiles.addAll(logValue.getAllExistingFilesMeta()); + } + + private void uploadLogsForContainers(Set containers) { + if (this.logAggregationDisabled || containers.isEmpty()) { + return; + } + this.remoteLogFileName = + nodeId.toString().replace(":", "_") + "_" + System.currentTimeMillis(); + this.remoteNodeTmpLogFileForApp = + getRemoteNodeTmpLogFileForApp(this.remoteNodeLogDirForApp); + + LogWriter writer = null; + try { + try { + writer = + new LogWriter(this.conf, this.remoteNodeTmpLogFileForApp, + this.userUgi); + // Write ACLs once when the writer is created. + writer.writeApplicationACLs(appAcls); + writer.writeApplicationOwner(this.userUgi.getShortUserName()); + + } catch (IOException e1) { + LOG.error("Cannot create writer for app " + this.applicationId + + ". Skip log upload this time. "); + return; + } + + currentUploadedFiles.clear(); + currentExistingLogFiles.clear(); + for (ContainerId container : containers) { + uploadLogsForContainer(container, writer); + } + + this.delService.delete(this.userUgi.getShortUserName(), null, + currentUploadedFiles.toArray(new Path[currentUploadedFiles.size()])); + + // if any of the previous uoloaded logs have been deleted, + // we need to remove them from alreadyUploadedLogs + Iterable mask = + Iterables.filter(alreadyUploadedLogs, new Predicate() { + @Override + public boolean apply(String next) { + return currentExistingLogFiles.contains(next); + } + }); + + alreadyUploadedLogs = Sets.newHashSet(mask); + + if (writer != null) { + writer.close(); + } + + try { + userUgi.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + FileSystem remoteFS = FileSystem.get(conf); + if (remoteFS.exists(remoteNodeTmpLogFileForApp)) { + remoteFS.rename(remoteNodeTmpLogFileForApp, new Path( + remoteNodeLogDirForApp, remoteLogFileName)); + } + return null; + } + }); + } catch (Exception e) { + LOG.error( + "Failed to move temporary log file to final location: [" + + remoteNodeTmpLogFileForApp + "] to [" + + remoteNodeLogDirForApp + "]", e); + } + } finally { + if (writer != null) { + writer.close(); + } + } } @Override @@ -149,12 +223,19 @@ public void run() { @SuppressWarnings("unchecked") private void doAppLogAggregation() { - ContainerId containerId; - while (!this.appFinishing.get() && !this.aborted.get()) { synchronized(this) { try { - wait(THREAD_SLEEP_TIME); + if (this.logContext != null && this.logContext.getLogInterval() > 0) { + wait(this.logContext.getLogInterval() * 24 * 3600 * 1000); + if (this.appFinishing.get() || this.aborted.get()) { + break; + } + uploadLogsForContainers(this.context.getApplications() + .get(this.appId).getContainers().keySet()); + } else { + wait(THREAD_SLEEP_TIME); + } } catch (InterruptedException e) { LOG.warn("PendingContainers queue is interrupted"); this.appFinishing.set(true); @@ -166,10 +247,13 @@ private void doAppLogAggregation() { return; } - // Application is finished. Finish pending-containers + // App is finished, upload the container logs. + Set ids = new HashSet(); + ContainerId containerId; while ((containerId = this.pendingContainers.poll()) != null) { - uploadLogsForContainer(containerId); + ids.add(containerId); } + uploadLogsForContainers(ids); // Remove the local app-log-dirs List rootLogDirs = dirsHandler.getLogDirs(); @@ -181,26 +265,6 @@ private void doAppLogAggregation() { } this.delService.delete(this.userUgi.getShortUserName(), null, localAppLogDirs); - - if (this.writer != null) { - this.writer.close(); - LOG.info("Finished aggregate log-file for app " + this.applicationId); - } - - try { - userUgi.doAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - FileSystem remoteFS = FileSystem.get(conf); - remoteFS.rename(remoteNodeTmpLogFileForApp, remoteNodeLogFileForApp); - return null; - } - }); - } catch (Exception e) { - LOG.error("Failed to move temporary log file to final location: [" - + remoteNodeTmpLogFileForApp + "] to [" + remoteNodeLogFileForApp - + "]", e); - } this.dispatcher.getEventHandler().handle( new ApplicationEvent(this.appId, @@ -208,9 +272,8 @@ public Object run() throws Exception { this.appAggregationFinished.set(true); } - private Path getRemoteNodeTmpLogFileForApp() { - return new Path(remoteNodeLogFileForApp.getParent(), - (remoteNodeLogFileForApp.getName() + TMP_FILE_SUFFIX)); + private Path getRemoteNodeTmpLogFileForApp(Path remoteNodeLogDirForApp) { + return new Path(remoteNodeLogDirForApp, LogAggregationUtils.TMP_FILE); } private boolean shouldUploadLogs(ContainerId containerId, @@ -267,4 +330,11 @@ public synchronized void abortLogAggregation() { this.aborted.set(true); this.notifyAll(); } + + @Private + @VisibleForTesting + public synchronized void doLogAggregationOutOfBand() { + LOG.info("Do OutOfBand log aggregation"); + this.notifyAll(); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index 58e1837..b6011c0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.LogContext; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; @@ -58,7 +59,10 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; public class LogAggregationService extends AbstractService implements @@ -223,6 +227,12 @@ Path getRemoteNodeLogFileForApp(ApplicationId appId, String user) { this.remoteRootLogDirSuffix); } + Path getRemoteNodeLogDirForApp(ApplicationId appId, String user) { + return LogAggregationUtils.getRemoteNodeLogDirForApp( + this.remoteRootLogDir, appId, user, this.nodeId, + this.remoteRootLogDirSuffix); + } + private void createDir(FileSystem fs, Path path, FsPermission fsPerm) throws IOException { FsPermission dirPerm = new FsPermission(fsPerm); @@ -259,33 +269,42 @@ public Object run() throws Exception { // Only creating directories if they are missing to avoid // unnecessary load on the filesystem from all of the nodes - Path appDir = LogAggregationUtils.getRemoteAppLogDir( + Path node_appDir = LogAggregationUtils.getRemoteNodeLogDirForApp( LogAggregationService.this.remoteRootLogDir, appId, user, - LogAggregationService.this.remoteRootLogDirSuffix); - appDir = appDir.makeQualified(remoteFS.getUri(), + nodeId, LogAggregationService.this.remoteRootLogDirSuffix); + node_appDir = node_appDir.makeQualified(remoteFS.getUri(), remoteFS.getWorkingDirectory()); - - if (!checkExists(remoteFS, appDir, APP_DIR_PERMISSIONS)) { - Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir( - LogAggregationService.this.remoteRootLogDir, user, + if (! checkExists(remoteFS, node_appDir, APP_DIR_PERMISSIONS)) { + Path appDir = LogAggregationUtils.getRemoteAppLogDir( + LogAggregationService.this.remoteRootLogDir, appId, user, LogAggregationService.this.remoteRootLogDirSuffix); - suffixDir = suffixDir.makeQualified(remoteFS.getUri(), + appDir = appDir.makeQualified(remoteFS.getUri(), remoteFS.getWorkingDirectory()); - if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) { - Path userDir = LogAggregationUtils.getRemoteLogUserDir( - LogAggregationService.this.remoteRootLogDir, user); - userDir = userDir.makeQualified(remoteFS.getUri(), + if (!checkExists(remoteFS, appDir, APP_DIR_PERMISSIONS)) { + Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir( + LogAggregationService.this.remoteRootLogDir, user, + LogAggregationService.this.remoteRootLogDirSuffix); + suffixDir = suffixDir.makeQualified(remoteFS.getUri(), remoteFS.getWorkingDirectory()); - if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) { - createDir(remoteFS, userDir, APP_DIR_PERMISSIONS); + if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) { + Path userDir = LogAggregationUtils.getRemoteLogUserDir( + LogAggregationService.this.remoteRootLogDir, user); + userDir = userDir.makeQualified(remoteFS.getUri(), + remoteFS.getWorkingDirectory()); + + if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) { + createDir(remoteFS, userDir, APP_DIR_PERMISSIONS); + } + + createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS); } - createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS); + createDir(remoteFS, appDir, APP_DIR_PERMISSIONS); } - createDir(remoteFS, appDir, APP_DIR_PERMISSIONS); + createDir(remoteFS, node_appDir, APP_DIR_PERMISSIONS); } } catch (IOException e) { LOG.error("Failed to setup application log directory for " @@ -303,11 +322,12 @@ public Object run() throws Exception { @SuppressWarnings("unchecked") private void initApp(final ApplicationId appId, String user, Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy, - Map appAcls) { + Map appAcls, LogContext logContext) { ApplicationEvent eventResponse; try { verifyAndCreateRemoteLogDir(getConfig()); - initAppAggregator(appId, user, credentials, logRetentionPolicy, appAcls); + initAppAggregator(appId, user, credentials, logRetentionPolicy, appAcls, + logContext); eventResponse = new ApplicationEvent(appId, ApplicationEventType.APPLICATION_LOG_HANDLING_INITED); } catch (YarnRuntimeException e) { @@ -320,7 +340,7 @@ private void initApp(final ApplicationId appId, String user, protected void initAppAggregator(final ApplicationId appId, String user, Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy, - Map appAcls) { + Map appAcls, LogContext logContext) { // Get user's FileSystem credentials final UserGroupInformation userUgi = @@ -332,9 +352,9 @@ protected void initAppAggregator(final ApplicationId appId, String user, // New application final AppLogAggregator appLogAggregator = new AppLogAggregatorImpl(this.dispatcher, this.deletionService, - getConfig(), appId, userUgi, dirsHandler, - getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy, - appAcls); + getConfig(), nodeId, appId, userUgi, dirsHandler, + getRemoteNodeLogDirForApp(appId, user), logRetentionPolicy, + appAcls, logContext, this.context); if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) { throw new YarnRuntimeException("Duplicate initApp for " + appId); } @@ -421,7 +441,7 @@ public void handle(LogHandlerEvent event) { initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(), appStartEvent.getCredentials(), appStartEvent.getLogRetentionPolicy(), - appStartEvent.getApplicationAcls()); + appStartEvent.getApplicationAcls(), appStartEvent.getLogContext()); break; case CONTAINER_FINISHED: LogHandlerContainerFinishedEvent containerFinishEvent = @@ -439,4 +459,14 @@ public void handle(LogHandlerEvent event) { } } + + public Clock createSystemClock() { + return new SystemClock(); + } + + @Private + @VisibleForTesting + public ConcurrentMap getAppLogAggregators() { + return this.appLogAggregators; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java index 6c07674..84c5930 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java @@ -23,6 +23,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.LogContext; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; public class LogHandlerAppStartedEvent extends LogHandlerEvent { @@ -32,6 +33,7 @@ private final String user; private final Credentials credentials; private final Map appAcls; + private final LogContext logContext; public LogHandlerAppStartedEvent(ApplicationId appId, String user, Credentials credentials, ContainerLogsRetentionPolicy retentionPolicy, @@ -42,6 +44,19 @@ public LogHandlerAppStartedEvent(ApplicationId appId, String user, this.credentials = credentials; this.retentionPolicy = retentionPolicy; this.appAcls = appAcls; + this.logContext = null; + } + + public LogHandlerAppStartedEvent(ApplicationId appId, String user, + Credentials credentials, ContainerLogsRetentionPolicy retentionPolicy, + Map appAcls, LogContext logContext) { + super(LogHandlerEventType.APPLICATION_STARTED); + this.applicationId = appId; + this.user = user; + this.credentials = credentials; + this.retentionPolicy = retentionPolicy; + this.appAcls = appAcls; + this.logContext = logContext; } public ApplicationId getApplicationId() { @@ -64,4 +79,7 @@ public String getUser() { return this.appAcls; } + public LogContext getLogContext() { + return this.logContext; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto index e6f39f6..37501c7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto @@ -29,6 +29,7 @@ message ContainerManagerApplicationProto { optional string user = 2; optional bytes credentials = 3; repeated ApplicationACLMapProto acls = 4; + optional LogContextProto log_context = 5; } message DeletionServiceDeleteTaskProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 6ab594f..49e994e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -37,6 +37,7 @@ import java.io.DataInputStream; import java.io.EOFException; import java.io.File; +import java.io.FileNotFoundException; import java.io.FileWriter; import java.io.IOException; import java.io.PrintStream; @@ -50,14 +51,18 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.junit.Assert; - import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.Credentials; @@ -73,6 +78,7 @@ import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.LogContext; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -88,26 +94,29 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; +import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; +import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.Records; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.mortbay.util.MultiException; - - //@Ignore public class TestLogAggregationService extends BaseContainerManagerTest { @@ -178,7 +187,8 @@ public void testLocalFileDeletionAfterUpload() throws Exception { BuilderUtils.newApplicationAttemptId(application1, 1); ContainerId container11 = BuilderUtils.newContainerId(appAttemptId, 1); // Simulate log-file creation - writeContainerLogs(app1LogDir, container11); + writeContainerLogs(app1LogDir, container11, new String[] { "stdout", + "stderr", "syslog" }); logAggregationService.handle( new LogHandlerContainerFinishedEvent(container11, 0)); @@ -258,10 +268,21 @@ public void testNoContainerOnNode() throws Exception { logAggregationService.stop(); assertEquals(0, logAggregationService.getNumAggregators()); - Assert.assertFalse(new File(logAggregationService + // Node directory after ApplicationId should have been existed + Assert.assertTrue(new File(logAggregationService .getRemoteNodeLogFileForApp(application1, this.user).toUri().getPath()) .exists()); - + Path nodeLogDir = + logAggregationService.getRemoteNodeLogFileForApp(application1, + this.user); + + Path qualifiedLogDir = + FileContext.getFileContext(this.conf).makeQualified(nodeLogDir); + RemoteIterator nodeFiles = + FileContext.getFileContext(qualifiedLogDir.toUri(), this.conf) + .listStatus(nodeLogDir); + Assert.assertFalse(nodeFiles.hasNext()); + dispatcher.await(); ApplicationEvent expectedEvents[] = new ApplicationEvent[]{ @@ -283,7 +304,7 @@ public void testMultipleAppsLogAggregation() throws Exception { this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.remoteRootLogDir.getAbsolutePath()); - + String[] fileNames = new String[] { "stdout", "stderr", "syslog" }; DrainDispatcher dispatcher = createDispatcher(); EventHandler appEventHandler = mock(EventHandler.class); dispatcher.register(ApplicationEventType.class, appEventHandler); @@ -310,7 +331,7 @@ public void testMultipleAppsLogAggregation() throws Exception { ContainerId container11 = BuilderUtils.newContainerId(appAttemptId1, 1); // Simulate log-file creation - writeContainerLogs(app1LogDir, container11); + writeContainerLogs(app1LogDir, container11, fileNames); logAggregationService.handle( new LogHandlerContainerFinishedEvent(container11, 0)); @@ -328,13 +349,13 @@ public void testMultipleAppsLogAggregation() throws Exception { ContainerId container21 = BuilderUtils.newContainerId(appAttemptId2, 1); - writeContainerLogs(app2LogDir, container21); + writeContainerLogs(app2LogDir, container21, fileNames); logAggregationService.handle( new LogHandlerContainerFinishedEvent(container21, 0)); ContainerId container12 = BuilderUtils.newContainerId(appAttemptId1, 2); - writeContainerLogs(app1LogDir, container12); + writeContainerLogs(app1LogDir, container12, fileNames); logAggregationService.handle( new LogHandlerContainerFinishedEvent(container12, 0)); @@ -365,22 +386,22 @@ public void testMultipleAppsLogAggregation() throws Exception { reset(appEventHandler); ContainerId container31 = BuilderUtils.newContainerId(appAttemptId3, 1); - writeContainerLogs(app3LogDir, container31); + writeContainerLogs(app3LogDir, container31, fileNames); logAggregationService.handle( new LogHandlerContainerFinishedEvent(container31, 0)); ContainerId container32 = BuilderUtils.newContainerId(appAttemptId3, 2); - writeContainerLogs(app3LogDir, container32); + writeContainerLogs(app3LogDir, container32, fileNames); logAggregationService.handle( new LogHandlerContainerFinishedEvent(container32, 1)); // Failed ContainerId container22 = BuilderUtils.newContainerId(appAttemptId2, 2); - writeContainerLogs(app2LogDir, container22); + writeContainerLogs(app2LogDir, container22, fileNames); logAggregationService.handle( new LogHandlerContainerFinishedEvent(container22, 0)); ContainerId container33 = BuilderUtils.newContainerId(appAttemptId3, 3); - writeContainerLogs(app3LogDir, container33); + writeContainerLogs(app3LogDir, container33, fileNames); logAggregationService.handle( new LogHandlerContainerFinishedEvent(container33, 0)); @@ -395,11 +416,11 @@ public void testMultipleAppsLogAggregation() throws Exception { assertEquals(0, logAggregationService.getNumAggregators()); verifyContainerLogs(logAggregationService, application1, - new ContainerId[] { container11, container12 }); + new ContainerId[] { container11, container12 }, fileNames, 3, 1); verifyContainerLogs(logAggregationService, application2, - new ContainerId[] { container21 }); + new ContainerId[] { container21 }, fileNames, 3, 1); verifyContainerLogs(logAggregationService, application3, - new ContainerId[] { container31, container32 }); + new ContainerId[] { container31, container32 }, fileNames, 3, 1); dispatcher.await(); @@ -588,10 +609,10 @@ public void testLogAggregationInitAppFailsWithoutKillingNM() throws Exception { ApplicationId appId = BuilderUtils.newApplicationId( System.currentTimeMillis(), (int)Math.random()); - doThrow(new YarnRuntimeException("KABOOM!")) - .when(logAggregationService).initAppAggregator( - eq(appId), eq(user), any(Credentials.class), - any(ContainerLogsRetentionPolicy.class), anyMap()); + doThrow(new YarnRuntimeException("KABOOM!")).when(logAggregationService) + .initAppAggregator(eq(appId), eq(user), any(Credentials.class), + any(ContainerLogsRetentionPolicy.class), anyMap(), + any(LogContext.class)); logAggregationService.handle(new LogHandlerAppStartedEvent(appId, this.user, null, @@ -672,25 +693,51 @@ public void testLogAggregationCreateDirsFailsWithoutKillingNM() assertEquals(0, logAggregationService.getNumAggregators()); } - private void writeContainerLogs(File appLogDir, ContainerId containerId) - throws IOException { + private void writeContainerLogs(File appLogDir, ContainerId containerId, + String[] fileName) throws IOException { // ContainerLogDir should be created String containerStr = ConverterUtils.toString(containerId); File containerLogDir = new File(appLogDir, containerStr); containerLogDir.mkdir(); - for (String fileType : new String[] { "stdout", "stderr", "syslog" }) { + for (String fileType : fileName) { Writer writer11 = new FileWriter(new File(containerLogDir, fileType)); writer11.write(containerStr + " Hello " + fileType + "!"); writer11.close(); } } - private void verifyContainerLogs( - LogAggregationService logAggregationService, ApplicationId appId, - ContainerId[] expectedContainerIds) throws IOException { + private void verifyContainerLogs(LogAggregationService logAggregationService, + ApplicationId appId, ContainerId[] expectedContainerIds, + String[] logFiles, int numOfContainerLogs, int iteratorNum) + throws IOException { + Path nodeLogDir = + logAggregationService.getRemoteNodeLogFileForApp(appId, this.user); + RemoteIterator nodeFiles = null; + try { + Path qualifiedLogDir = + FileContext.getFileContext(this.conf).makeQualified( + nodeLogDir); + nodeFiles = + FileContext.getFileContext(qualifiedLogDir.toUri(), this.conf) + .listStatus(nodeLogDir); + } catch (FileNotFoundException fnf) { + Assert.fail("Should have log files"); + } + + Assert.assertTrue(nodeFiles.hasNext()); + FileStatus targetNodeFile = null; + long time_stamp = 0; + while (nodeFiles.hasNext()) { + FileStatus nodeFile = nodeFiles.next(); + if (nodeFile.getModificationTime() > time_stamp) { + targetNodeFile = nodeFile; + time_stamp = nodeFile.getModificationTime(); + } + } + Assert.assertNotNull(targetNodeFile); AggregatedLogFormat.LogReader reader = new AggregatedLogFormat.LogReader(this.conf, - logAggregationService.getRemoteNodeLogFileForApp(appId, this.user)); + targetNodeFile.getPath()); Assert.assertEquals(this.user, reader.getApplicationOwner()); verifyAcls(reader.getApplicationAcls()); @@ -749,8 +796,8 @@ private void verifyContainerLogs( for (ContainerId cId : expectedContainerIds) { String containerStr = ConverterUtils.toString(cId); Map thisContainerMap = logMap.remove(containerStr); - Assert.assertEquals(3, thisContainerMap.size()); - for (String fileType : new String[] { "stdout", "stderr", "syslog" }) { + Assert.assertEquals(numOfContainerLogs, thisContainerMap.size()); + for (String fileType : logFiles) { String expectedValue = containerStr + " Hello " + fileType + "!"; LOG.info("Expected log-content : " + new String(expectedValue)); String foundValue = thisContainerMap.remove(fileType); @@ -987,4 +1034,285 @@ private static String eventToString(Event event, String[] methods) throws Exc sb.append("]"); return sb.toString(); } + + @Test (timeout = 50000) + @SuppressWarnings("unchecked") + public void testLogAggregationServiceWithPatterns() throws Exception { + + LogContext logContextWithIncludePatterns = + Records.newRecord(LogContext.class); + Set includePatterns = new HashSet(); + includePatterns.add("stdout"); + includePatterns.add("syslog"); + logContextWithIncludePatterns.setLogIncludePatterns(includePatterns); + + LogContext logContextWithExcludePatterns = + Records.newRecord(LogContext.class); + Set excludePatterns = new HashSet(); + excludePatterns.add("stdout"); + excludePatterns.add("syslog"); + + logContextWithExcludePatterns.setLogExcludePatterns(excludePatterns); + + this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); + this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + this.remoteRootLogDir.getAbsolutePath()); + + DrainDispatcher dispatcher = createDispatcher(); + EventHandler appEventHandler = mock(EventHandler.class); + dispatcher.register(ApplicationEventType.class, appEventHandler); + + LogAggregationService logAggregationService = + new LogAggregationService(dispatcher, this.context, this.delSrvc, + super.dirsHandler); + logAggregationService.init(this.conf); + logAggregationService.start(); + + // LogContext for application1 has includePatten which includes + // stdout and syslog. + // After logAggregation is finished, we expect the logs for application1 + // has only logs from stdout and syslog + ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1); + + // AppLogDir should be created + File appLogDir1 = + new File(localLogDir, ConverterUtils.toString(application1)); + appLogDir1.mkdir(); + logAggregationService.handle(new LogHandlerAppStartedEvent(application1, + this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls, + logContextWithIncludePatterns)); + + ApplicationAttemptId appAttemptId1 = + BuilderUtils.newApplicationAttemptId(application1, 1); + ContainerId container1 = BuilderUtils.newContainerId(appAttemptId1, 1); + + // Simulate log-file creation + writeContainerLogs(appLogDir1, container1, new String[] { "stdout", + "stderr", "syslog" }); + logAggregationService.handle(new LogHandlerContainerFinishedEvent( + container1, 0)); + + // LogContext for application2 has excludePatten which includes + // stdout and syslog. + // After logAggregation is finished, we expect the logs for application2 + // has only logs from stderr + ApplicationId application2 = BuilderUtils.newApplicationId(1234, 2); + ApplicationAttemptId appAttemptId2 = + BuilderUtils.newApplicationAttemptId(application2, 1); + + File app2LogDir = + new File(localLogDir, ConverterUtils.toString(application2)); + app2LogDir.mkdir(); + logAggregationService.handle(new LogHandlerAppStartedEvent(application2, + this.user, null, ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY, + this.acls, logContextWithExcludePatterns)); + ContainerId container2 = BuilderUtils.newContainerId(appAttemptId2, 1); + + writeContainerLogs(app2LogDir, container2, new String[] { "stdout", + "stderr", "syslog" }); + logAggregationService.handle( + new LogHandlerContainerFinishedEvent(container2, 0)); + + dispatcher.await(); + ApplicationEvent expectedInitEvents[] = + new ApplicationEvent[] { new ApplicationEvent(application1, + ApplicationEventType.APPLICATION_LOG_HANDLING_INITED), + new ApplicationEvent(application2, + ApplicationEventType.APPLICATION_LOG_HANDLING_INITED) }; + checkEvents(appEventHandler, expectedInitEvents, false, "getType", + "getApplicationID"); + reset(appEventHandler); + + logAggregationService.handle(new LogHandlerAppFinishedEvent(application1)); + logAggregationService.handle(new LogHandlerAppFinishedEvent(application2)); + logAggregationService.stop(); + assertEquals(0, logAggregationService.getNumAggregators()); + + String[] logFiles = new String[] { "stdout", "syslog" }; + verifyContainerLogs(logAggregationService, application1, + new ContainerId[] { container1 }, logFiles, 2, 1); + + logFiles = new String[] { "stderr" }; + verifyContainerLogs(logAggregationService, application2, + new ContainerId[] { container2 }, logFiles, 1, 1); + + dispatcher.await(); + + ApplicationEvent[] expectedFinishedEvents = + new ApplicationEvent[] { new ApplicationEvent(application1, + ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED), + new ApplicationEvent(application2, + ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)}; + checkEvents(appEventHandler, expectedFinishedEvents, false, "getType", + "getApplicationID"); + dispatcher.stop(); + } + + @SuppressWarnings("unchecked") + @Test (timeout = 50000) + public void testLogAggregationServiceWithInterval() throws Exception { + + int max_attempt = 50; + + LogContext logContextWithInterval = + Records.newRecord(LogContext.class); + logContextWithInterval.setLogInterval(5000); + + this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); + this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + this.remoteRootLogDir.getAbsolutePath()); + this.conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600); + + DrainDispatcher dispatcher = createDispatcher(); + EventHandler appEventHandler = mock(EventHandler.class); + dispatcher.register(ApplicationEventType.class, appEventHandler); + + ApplicationId application = BuilderUtils.newApplicationId(123456, 1); + ApplicationAttemptId appAttemptId = + BuilderUtils.newApplicationAttemptId(application, 1); + ContainerId container = BuilderUtils.newContainerId(appAttemptId, 1); + + Context context = spy(this.context); + ConcurrentMap maps = + new ConcurrentHashMap(); + Application app = mock(Application.class); + Map containers = new HashMap(); + containers.put(container, mock(Container.class)); + maps.put(application, app); + when(app.getContainers()).thenReturn(containers); + when(context.getApplications()).thenReturn(maps); + + LogAggregationService logAggregationService = + new LogAggregationService(dispatcher, context, this.delSrvc, + super.dirsHandler); + + logAggregationService.init(this.conf); + logAggregationService.start(); + + // AppLogDir should be created + File appLogDir = + new File(localLogDir, ConverterUtils.toString(application)); + appLogDir.mkdir(); + logAggregationService.handle(new LogHandlerAppStartedEvent(application, + this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls, + logContextWithInterval)); + + // Simulate log-file creation + String[] logFiles1 = new String[] { "stdout", "stderr", "syslog" }; + writeContainerLogs(appLogDir, container, logFiles1); + Thread.sleep(1000); + + // Do log aggregation + AppLogAggregatorImpl aggregator = + (AppLogAggregatorImpl) logAggregationService.getAppLogAggregators() + .get(application); + aggregator.doLogAggregationOutOfBand(); + + // Make sure the log is uploaded + int count = 0; + while (count <= max_attempt) { + if (checkLogFileAvailable(logAggregationService, application, 1)) { + break; + } + Thread.sleep(200); + count++; + } + + // Container logs should be uploaded + verifyContainerLogs(logAggregationService, application, + new ContainerId[] { container }, logFiles1, 3, 1); + + // There is no log generated at this time. Do the log aggregation again. + aggregator.doLogAggregationOutOfBand(); + // Make sure there is no new log uploaded + count = 0; + while(count <= 10) { + if (! checkLogFileAvailable(logAggregationService, application, 1)) { + Assert.fail("Should not get any new logs"); + } + Thread.sleep(200); + count ++; + } + Assert.assertTrue(checkLogFileAvailable(logAggregationService, + application, 1)); + + // Do log aggregation + String[] logFiles2 = new String[] { "stdout_1", "stderr_1", "syslog_1" }; + writeContainerLogs(appLogDir, container, logFiles2); + Thread.sleep(1000); + + aggregator.doLogAggregationOutOfBand(); + + // Make sure the log is uploaded + count = 0; + while (count <= max_attempt) { + if (checkLogFileAvailable(logAggregationService, application, 2)) { + break; + } + Thread.sleep(200); + count++; + } + + Assert.assertTrue(checkLogFileAvailable(logAggregationService, application, + 2)); + + // Container logs should be uploaded + verifyContainerLogs(logAggregationService, application, + new ContainerId[] { container }, logFiles2, 3, 2); + + // create another logs + String[] logFiles3 = new String[] { "stdout_2", "stderr_2", "syslog_2" }; + writeContainerLogs(appLogDir, container, logFiles3); + Thread.sleep(1000); + + logAggregationService.handle( + new LogHandlerContainerFinishedEvent(container, 0)); + + dispatcher.await(); + logAggregationService.handle(new LogHandlerAppFinishedEvent(application)); + + // Make sure the log is uploaded + count = 0; + while (count <= max_attempt) { + if (checkLogFileAvailable(logAggregationService, application, 3)) { + break; + } + Thread.sleep(200); + count++; + } + Assert.assertTrue(checkLogFileAvailable(logAggregationService, application, + 3)); + + verifyContainerLogs(logAggregationService, application, + new ContainerId[] { container }, logFiles3, 3, 3); + logAggregationService.stop(); + assertEquals(0, logAggregationService.getNumAggregators()); + dispatcher.stop(); + + } + + private boolean checkLogFileAvailable( + LogAggregationService logAggregationService, ApplicationId appId, + int numOfLogFiles) throws IOException { + Path nodeLogDir = + logAggregationService.getRemoteNodeLogFileForApp(appId, this.user); + RemoteIterator nodeFiles = null; + try { + Path qualifiedLogDir = + FileContext.getFileContext(this.conf).makeQualified(nodeLogDir); + nodeFiles = + FileContext.getFileContext(qualifiedLogDir.toUri(), this.conf) + .listStatus(nodeLogDir); + } catch (FileNotFoundException fnf) { + return false; + } + int count = 0; + while (nodeFiles.hasNext()) { + FileStatus status = nodeFiles.next(); + if (status.getPath().getName().contains(nodeLogDir.getName())) { + count++; + } + } + return count == numOfLogFiles; + } }