diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java index 723a2e0..05d7648 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java @@ -54,6 +54,7 @@ * validityInterval into failure count. If failure count reaches to * maxAppAttempts, the application will be failed. * + *
  • {@link LogContext} of the application.
  • * *

    * @@ -128,6 +129,21 @@ public static ApplicationSubmissionContext newInstance( return context; } + @Public + @Unstable + public static ApplicationSubmissionContext newInstance( + ApplicationId applicationId, String applicationName, String queue, + Priority priority, ContainerLaunchContext amContainer, + boolean isUnmanagedAM, boolean cancelTokensWhenComplete, + int maxAppAttempts, Resource resource, String applicationType, + boolean keepContainers, LogContext logContext) { + ApplicationSubmissionContext context = + newInstance(applicationId, applicationName, queue, priority, + amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts, + resource, applicationType, keepContainers); + context.setLogContext(logContext); + return context; + } /** * Get the ApplicationId of the submitted application. * @return ApplicationId of the submitted application @@ -381,4 +397,21 @@ public abstract void setKeepContainersAcrossApplicationAttempts( @Stable public abstract void setAttemptFailuresValidityInterval( long attemptFailuresValidityInterval); -} \ No newline at end of file + + + /** + * Get LogContext of the application + * @return LogContext of the application + */ + @Public + @Stable + public abstract LogContext getLogContext(); + + /** + * Set LogContext for the application + * @param logContext for the application + */ + @Public + @Stable + public abstract void setLogContext(LogContext logContext); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java index a648fef..cd20939 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java @@ -22,8 +22,10 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.AuxiliaryService; @@ -46,6 +48,7 @@ *
  • Optional, application-specific binary service data.
  • *
  • Environment variables for the launched process.
  • *
  • Command to launch the container.
  • + *
  • {@link LogContext} will be set by RM when it launches AM.
  • * *

    * @@ -73,6 +76,18 @@ public static ContainerLaunchContext newInstance( return container; } + @Private + @Unstable + public static ContainerLaunchContext newInstance( + Map localResources, + Map environment, List commands, + Map serviceData, ByteBuffer tokens, + Map acls, LogContext logContext) { + ContainerLaunchContext container = newInstance(localResources, environment, + commands, serviceData, tokens, acls); + container.setLogContext(logContext); + return container; + } /** * Get all the tokens needed by this container. It may include file-system * tokens, ApplicationMaster related tokens if this container is an @@ -196,4 +211,12 @@ public static ContainerLaunchContext newInstance( @Public @Stable public abstract void setApplicationACLs(Map acls); + + @Private + @Stable + public abstract LogContext getLogContext(); + + @Private + @Stable + public abstract void setLogContext(LogContext logContext); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogContext.java new file mode 100644 index 0000000..c730573 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogContext.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +import com.google.common.base.Preconditions; + +/** + *

    LogContext represents all of the + * information needed by the NodeManager to handle + * the logs for an application.

    + * + *

    It includes details such as: + *

      + *
    • logIncludePattern. It defines include patterns which is used to + * filter the log files. The log files which match the include + * patterns will be uploaded.
    • + *
    • logExcludePattern. It defines include patterns which is used to + * filter the log files. The log files which match the include + * patterns will not be uploaded.
    • + *
    • logInterval. The default value is 0. By default, + * the logAggregationService only uploads container logs when + * the application is finished. This configure defines + * how often the logAggregationSerivce uploads container logs in days + * By setting this configure, the logAggregationSerivce can upload container + * logs periodically when the application is running. + *
    • + *
    + *

    + * + * @see ContainerLaunchContext + * @see ApplicationSubmissionContext + */ +public abstract class LogContext { + + @Public + @Unstable + public static LogContext newInstance(Set logIncludePattern, + Set logExcludePattern, long logInterval) { + Preconditions.checkArgument(logInterval >= 0); + LogContext context = Records.newRecord(LogContext.class); + context.setLogIncludePatterns(logIncludePattern); + context.setLogExcludePatterns(logExcludePattern); + context.setLogInterval(logInterval); + return context; + } + + /** + * Get include patterns + * @return set of include patterns + */ + @Public + @Stable + public abstract Set getLogIncludePatterns(); + + /** + * Set include patterns + * @param includePatterns to set + */ + @Public + @Stable + public abstract void setLogIncludePatterns(Set includePatterns); + + /** + * Get exclude patterns + * @return set of exclude patterns + */ + @Public + @Stable + public abstract Set getLogExcludePatterns(); + + /** + * Set exclude patterns + * @param excludePatterns to set + */ + @Public + @Stable + public abstract void setLogExcludePatterns(Set excludePatterns); + + /** + * Get LogInterval per day + * @return the logInterval + */ + @Public + @Stable + public abstract long getLogInterval(); + + /** + * Set LogInterval per day + * @param logInterval + */ + @Public + @Stable + public abstract void setLogInterval(long logInterval); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index d8c42cc..846cdc5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -292,6 +292,13 @@ message ApplicationSubmissionContextProto { optional bool keep_containers_across_application_attempts = 11 [default = false]; repeated string applicationTags = 12; optional int64 attempt_failures_validity_interval = 13 [default = -1]; + optional LogContextProto log_context = 14; +} + +message LogContextProto { + repeated string log_include_pattern = 1; + repeated string log_exclude_pattern = 2; + optional int64 log_monitor_interval = 3 [default = 0]; } enum ApplicationAccessTypeProto { @@ -344,6 +351,7 @@ message ContainerLaunchContextProto { repeated StringStringMapProto environment = 4; repeated string command = 5; repeated ApplicationACLMapProto application_ACLs = 6; + optional LogContextProto log_context = 7; } message ContainerStatusProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java index eb6169c..c31316a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java @@ -31,7 +31,6 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Tool; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -39,8 +38,6 @@ import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; -import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -113,17 +110,18 @@ public int run(String[] args) throws Exception { System.err.println("Invalid ApplicationId specified"); return -1; } - + try { int resultCode = verifyApplicationState(appId); if (resultCode != 0) { - System.out.println("Application has not completed." + - " Logs are only available after an application completes"); + System.out.println("Application is not running or completed." + + " Logs are only available when an application is running" + + " or completes"); return resultCode; } } catch (Exception e) { - System.err.println("Unable to get ApplicationState." + - " Attempting to fetch logs directly from the filesystem."); + System.err.println("Unable to get ApplicationState." + + " Attempting to fetch logs directly from the filesystem."); } LogCLIHelpers logCliHelper = new LogCLIHelpers(); @@ -141,18 +139,9 @@ public int run(String[] args) throws Exception { printHelpMessage(printOpts); resultCode = -1; } else { - Path remoteRootLogDir = - new Path(getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, - YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); - AggregatedLogFormat.LogReader reader = - new AggregatedLogFormat.LogReader(getConf(), - LogAggregationUtils.getRemoteNodeLogFileForApp( - remoteRootLogDir, - appId, - appOwner, - ConverterUtils.toNodeId(nodeAddress), - LogAggregationUtils.getRemoteNodeLogDirSuffix(getConf()))); - resultCode = logCliHelper.dumpAContainerLogs(containerIdStr, reader, System.out); + resultCode = + logCliHelper.dumpAContainersLogs(appIdStr, containerIdStr, + nodeAddress, appOwner); } return resultCode; @@ -167,10 +156,10 @@ private int verifyApplicationState(ApplicationId appId) throws IOException, switch (appReport.getYarnApplicationState()) { case NEW: case NEW_SAVING: + return -1; case ACCEPTED: case SUBMITTED: case RUNNING: - return -1; case FAILED: case FINISHED: case KILLED: diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java index 7b49a16..ee57f30 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java @@ -19,11 +19,13 @@ package org.apache.hadoop.yarn.api.records.impl.pb; import com.google.common.base.CharMatcher; + import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.LogContext; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -31,6 +33,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto; +import org.apache.hadoop.yarn.proto.YarnProtos.LogContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; @@ -53,6 +56,7 @@ private ContainerLaunchContext amContainer = null; private Resource resource = null; private Set applicationTags = null; + private LogContext logContext = null; public ApplicationSubmissionContextPBImpl() { builder = ApplicationSubmissionContextProto.newBuilder(); @@ -110,6 +114,9 @@ private void mergeLocalToBuilder() { builder.clearApplicationTags(); builder.addAllApplicationTags(this.applicationTags); } + if (this.logContext != null) { + builder.setLogContext(convertToProtoFormat(this.logContext)); + } } private void mergeLocalToProto() { @@ -415,4 +422,34 @@ public void setAttemptFailuresValidityInterval( maybeInitBuilder(); builder.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval); } + + + private LogContextPBImpl convertFromProtoFormat(LogContextProto p) { + return new LogContextPBImpl(p); + } + + private LogContextProto convertToProtoFormat(LogContext t) { + return ((LogContextPBImpl)t).getProto(); + } + + @Override + public LogContext getLogContext() { + ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; + if (this.logContext != null) { + return this.logContext; + } // Else via proto + if (!p.hasLogContext()) { + return null; + } + logContext = convertFromProtoFormat(p.getLogContext()); + return logContext; + } + + @Override + public void setLogContext(LogContext logContext) { + maybeInitBuilder(); + if (logContext == null) + builder.clearLogContext(); + this.logContext = logContext; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java index 12dcfcd..0373ef2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java @@ -30,10 +30,12 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LogContext; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; +import org.apache.hadoop.yarn.proto.YarnProtos.LogContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.StringBytesMapProto; import org.apache.hadoop.yarn.proto.YarnProtos.StringLocalResourceMapProto; import org.apache.hadoop.yarn.proto.YarnProtos.StringStringMapProto; @@ -56,6 +58,7 @@ private Map environment = null; private List commands = null; private Map applicationACLS = null; + private LogContext logContext = null; public ContainerLaunchContextPBImpl() { builder = ContainerLaunchContextProto.newBuilder(); @@ -120,6 +123,9 @@ private void mergeLocalToBuilder() { if (this.applicationACLS != null) { addApplicationACLs(); } + if (this.logContext != null) { + builder.setLogContext(convertToProtoFormat(this.logContext)); + } } private void mergeLocalToProto() { @@ -463,4 +469,34 @@ private LocalResourcePBImpl convertFromProtoFormat(LocalResourceProto p) { private LocalResourceProto convertToProtoFormat(LocalResource t) { return ((LocalResourcePBImpl)t).getProto(); } + + + private LogContextPBImpl convertFromProtoFormat(LogContextProto p) { + return new LogContextPBImpl(p); + } + + private LogContextProto convertToProtoFormat(LogContext t) { + return ((LogContextPBImpl)t).getProto(); + } + + @Override + public LogContext getLogContext() { + ContainerLaunchContextProtoOrBuilder p = viaProto ? proto : builder; + if (this.logContext != null) { + return this.logContext; + } // Else via proto + if (!p.hasLogContext()) { + return null; + } + logContext = convertFromProtoFormat(p.getLogContext()); + return logContext; + } + + @Override + public void setLogContext(LogContext logContext) { + maybeInitBuilder(); + if (logContext == null) + builder.clearLogContext(); + this.logContext = logContext; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogContextPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogContextPBImpl.java new file mode 100644 index 0000000..73ccf71 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogContextPBImpl.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.LogContext; +import org.apache.hadoop.yarn.proto.YarnProtos.LogContextProto; +import org.apache.hadoop.yarn.proto.YarnProtos.LogContextProtoOrBuilder; + +import com.google.common.base.Preconditions; +import com.google.protobuf.TextFormat; + +@Private +@Unstable +public class LogContextPBImpl extends LogContext{ + LogContextProto proto = + LogContextProto.getDefaultInstance(); + LogContextProto.Builder builder = null; + boolean viaProto = false; + + private Set logIncludePatterns = null; + private Set logExcludePatterns = null; + + public LogContextPBImpl() { + builder = LogContextProto.newBuilder(); + } + + public LogContextPBImpl(LogContextProto proto) { + this.proto = proto; + viaProto = true; + } + + public LogContextProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToBuilder() { + if (this.logIncludePatterns != null && !this.logIncludePatterns.isEmpty()) { + builder.clearLogIncludePattern(); + builder.addAllLogIncludePattern(this.logIncludePatterns); + } + + if (this.logExcludePatterns != null && !this.logExcludePatterns.isEmpty()) { + builder.clearLogExcludePattern(); + builder.addAllLogExcludePattern(this.logExcludePatterns); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = LogContextProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public Set getLogIncludePatterns() { + initLogIncludePatterns(); + return this.logIncludePatterns; + } + + @Override + public void setLogIncludePatterns(Set logPattern) { + maybeInitBuilder(); + if (logPattern == null || logPattern.isEmpty()) { + builder.clearLogIncludePattern(); + this.logIncludePatterns = null; + return; + } + + this.logIncludePatterns = new HashSet(); + this.logIncludePatterns.addAll(logPattern); + } + + private void initLogIncludePatterns() { + if (this.logIncludePatterns != null) { + return; + } + LogContextProtoOrBuilder p = viaProto ? proto : builder; + this.logIncludePatterns = new HashSet(); + this.logIncludePatterns.addAll(p.getLogIncludePatternList()); + } + + @Override + public Set getLogExcludePatterns() { + initLogExcludePatterns(); + return this.logExcludePatterns; + } + + @Override + public void setLogExcludePatterns(Set logPattern) { + maybeInitBuilder(); + if (logPattern == null || logPattern.isEmpty()) { + builder.clearLogExcludePattern(); + this.logExcludePatterns = null; + return; + } + + this.logExcludePatterns = new HashSet(); + this.logExcludePatterns.addAll(logPattern); + } + + private void initLogExcludePatterns() { + if (this.logExcludePatterns != null) { + return; + } + LogContextProtoOrBuilder p = viaProto ? proto : builder; + this.logExcludePatterns = new HashSet(); + this.logExcludePatterns.addAll(p.getLogExcludePatternList()); + } + + @Override + public long getLogInterval() { + LogContextProtoOrBuilder p = viaProto ? proto : builder; + return p.getLogMonitorInterval(); + } + + @Override + public void setLogInterval(long logInterval) { + Preconditions.checkArgument(logInterval >= 0); + maybeInitBuilder(); + builder.setLogMonitorInterval(logInterval); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index 3568de2..729a275 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -24,6 +24,7 @@ import java.io.DataOutputStream; import java.io.EOFException; import java.io.File; +import java.io.FileFilter; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; @@ -35,10 +36,12 @@ import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; - +import java.util.Set; +import org.apache.commons.io.filefilter.WildcardFileFilter; import org.apache.commons.io.input.BoundedInputStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -60,10 +63,15 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.LogContext; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.util.ConverterUtils; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; + @Public @Evolving public class AggregatedLogFormat { @@ -149,6 +157,10 @@ public String toString() { private final List rootLogDirs; private final ContainerId containerId; private final String user; + private final LogContext logContext; + private Set uploadedFiles = new HashSet(); + private final Set alreadyUploadedLogFiles; + private Set allExistingFileMeta = new HashSet(); // TODO Maybe add a version string here. Instead of changing the version of // the entire k-v format @@ -160,6 +172,21 @@ public LogValue(List rootLogDirs, ContainerId containerId, // Ensure logs are processed in lexical order Collections.sort(this.rootLogDirs); + this.logContext = null; + this.alreadyUploadedLogFiles = new HashSet(); + } + + public LogValue(List rootLogDirs, ContainerId containerId, + String user, LogContext logContext, + Set alreadyUploadedLogFiles) { + this.rootLogDirs = new ArrayList(rootLogDirs); + this.containerId = containerId; + this.user = user; + + // Ensure logs are processed in lexical order + Collections.sort(this.rootLogDirs); + this.logContext = logContext; + this.alreadyUploadedLogFiles = alreadyUploadedLogFiles; } public void write(DataOutputStream out) throws IOException { @@ -178,10 +205,13 @@ public void write(DataOutputStream out) throws IOException { } // Write out log files in lexical order - File[] logFiles = containerLogDir.listFiles(); - Arrays.sort(logFiles); - for (File logFile : logFiles) { + this.uploadedFiles = getLogFiles(containerLogDir); + + List fileList = new ArrayList(uploadedFiles); + Collections.sort(fileList); + + for (File logFile : fileList) { final long fileLength = logFile.length(); // Write the logFile Type @@ -232,6 +262,73 @@ public void write(DataOutputStream out) throws IOException { public String getUser() { return user; } + + private Set getLogFiles(File containerLogDir) { + Set candiates = + new HashSet(Arrays.asList(containerLogDir.listFiles())); + for (File logFile : candiates) { + this.allExistingFileMeta.add(fileMeta(logFile)); + } + + if (this.logContext != null && candiates.size() > 0) { + if (this.logContext.getLogIncludePatterns() != null + && !this.logContext.getLogIncludePatterns().isEmpty()) { + FileFilter includeLogFileFilter = + new WildcardFileFilter(new ArrayList( + this.logContext.getLogIncludePatterns())); + candiates = + new HashSet(Arrays.asList(containerLogDir + .listFiles(includeLogFileFilter))); + } + + FileFilter excludeLogFileFilter = null; + if (this.logContext.getLogExcludePatterns() != null + && !this.logContext.getLogExcludePatterns().isEmpty()) { + excludeLogFileFilter = + new WildcardFileFilter(new ArrayList( + this.logContext.getLogExcludePatterns())); + } + final Set excludeFiles = + excludeLogFileFilter == null ? new HashSet() + : new HashSet(Arrays.asList(containerLogDir + .listFiles(excludeLogFileFilter))); + Iterable mask = + Iterables.filter(candiates, new Predicate() { + @Override + public boolean apply(File next) { + return !alreadyUploadedLogFiles.contains(fileMeta(next)) + && !excludeFiles.contains(next); + } + }); + candiates = Sets.newHashSet(mask); + } + return candiates; + } + + public Set getCurrentUpLoadedFilesPath() { + Set path = new HashSet(); + for (File file : this.uploadedFiles) { + path.add(new Path(file.getAbsolutePath())); + } + return path; + } + + public Set getCurrentUpLoadedFileMeta() { + Set info = new HashSet(); + for (File file : this.uploadedFiles) { + info.add(fileMeta(file)); + } + return info; + } + + public Set getAllExistingFilesMeta() { + return this.allExistingFileMeta; + } + + private String fileMeta(File file) { + return containerId.toString() + "_" + file.getName() + "_" + + file.lastModified(); + } } /** @@ -252,6 +349,9 @@ public LogWriter(final Configuration conf, final Path remoteAppLogFile, public FSDataOutputStream run() throws Exception { FileContext fc = FileContext.getFileContext(conf); fc.setUMask(APP_LOG_FILE_UMASK); + if (fc.util().exists(remoteAppLogFile)) { + fc.delete(remoteAppLogFile, false); + } return fc.create( remoteAppLogFile, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java index 4445ff9..96cbdf6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java @@ -28,6 +28,7 @@ @Private public class LogAggregationUtils { + public static final String TMP_FILE = "TEMP.tmp"; /** * Constructs the full filename for an application's log file per node. * @param remoteRootLogDir @@ -44,6 +45,21 @@ public static Path getRemoteNodeLogFileForApp(Path remoteRootLogDir, } /** + * Get the remote node dir under remote app log dir. + * @param remoteRootLogDir + * @param appId + * @param user + * @param nodeId + * @param suffix + * @return the remote log file. + */ + public static Path getRemoteNodeLogDirForApp(Path remoteRootLogDir, + ApplicationId appId, String user, NodeId nodeId, String suffix) { + return new Path(getRemoteAppLogDir(remoteRootLogDir, appId, user, suffix), + getNodeString(nodeId)); + } + + /** * Gets the remote app log dir. * @param remoteRootLogDir * @param appId diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java index 3bafdb3..1fbf5dc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java @@ -52,19 +52,42 @@ public int dumpAContainersLogs(String appId, String containerId, YarnConfiguration.NM_REMOTE_APP_LOG_DIR, YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(getConf()); - Path logPath = LogAggregationUtils.getRemoteNodeLogFileForApp( + Path remoteAppNodeLogDir = LogAggregationUtils.getRemoteNodeLogFileForApp( remoteRootLogDir, ConverterUtils.toApplicationId(appId), jobOwner, ConverterUtils.toNodeId(nodeId), suffix); - AggregatedLogFormat.LogReader reader; + RemoteIterator nodeFiles; try { - reader = new AggregatedLogFormat.LogReader(getConf(), logPath); - } catch (FileNotFoundException fnfe) { - System.out.println("Logs not available at " + logPath.toString()); + Path qualifiedLogDir = + FileContext.getFileContext(getConf()).makeQualified( + remoteAppNodeLogDir); + nodeFiles = + FileContext.getFileContext(qualifiedLogDir.toUri(), getConf()) + .listStatus(remoteAppNodeLogDir); + } catch (FileNotFoundException fnf) { + System.out.println("Logs not available at " + + remoteAppNodeLogDir.toString()); System.out .println("Log aggregation has not completed or is not enabled."); return -1; } - return dumpAContainerLogs(containerId, reader, System.out); + boolean foundContainerLogs = false; + while (nodeFiles.hasNext()) { + FileStatus thisNodeFile = nodeFiles.next(); + if (!thisNodeFile.getPath().getName() + .equals(LogAggregationUtils.TMP_FILE)) { + AggregatedLogFormat.LogReader reader = + new AggregatedLogFormat.LogReader(getConf(), new Path( + remoteAppNodeLogDir, thisNodeFile.getPath().getName())); + if (dumpAContainerLogs(containerId, reader, System.out) > -1) { + foundContainerLogs = true; + } + } + } + if (!foundContainerLogs) { + System.out.println("Logs for container " + containerId + + " are not present in this log-file."); + } + return 0; } @Private @@ -81,8 +104,6 @@ public int dumpAContainerLogs(String containerIdStr, } if (valueStream == null) { - System.out.println("Logs for container " + containerIdStr - + " are not present in this log-file."); return -1; } @@ -99,55 +120,77 @@ public int dumpAContainerLogs(String containerIdStr, @Private public int dumpAllContainersLogs(ApplicationId appId, String appOwner, PrintStream out) throws IOException { - Path remoteRootLogDir = new Path(getConf().get( - YarnConfiguration.NM_REMOTE_APP_LOG_DIR, - YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); + Path remoteRootLogDir = + new Path(getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); String user = appOwner; - String logDirSuffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(getConf()); + String logDirSuffix = + LogAggregationUtils.getRemoteNodeLogDirSuffix(getConf()); // TODO Change this to get a list of files from the LAS. - Path remoteAppLogDir = LogAggregationUtils.getRemoteAppLogDir( - remoteRootLogDir, appId, user, logDirSuffix); + Path remoteAppLogDir = + LogAggregationUtils.getRemoteAppLogDir(remoteRootLogDir, appId, user, + logDirSuffix); RemoteIterator nodeFiles; try { Path qualifiedLogDir = FileContext.getFileContext(getConf()).makeQualified(remoteAppLogDir); - nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(), - getConf()).listStatus(remoteAppLogDir); + nodeFiles = + FileContext.getFileContext(qualifiedLogDir.toUri(), getConf()) + .listStatus(remoteAppLogDir); } catch (FileNotFoundException fnf) { System.out.println("Logs not available at " + remoteAppLogDir.toString()); System.out - .println("Log aggregation has not completed or is not enabled."); + .println("Log aggregation has not completed or is not enabled."); return -1; } while (nodeFiles.hasNext()) { - FileStatus thisNodeFile = nodeFiles.next(); - AggregatedLogFormat.LogReader reader = new AggregatedLogFormat.LogReader( - getConf(), new Path(remoteAppLogDir, thisNodeFile.getPath().getName())); - try { - - DataInputStream valueStream; - LogKey key = new LogKey(); - valueStream = reader.next(key); - - while (valueStream != null) { - String containerString = "\n\nContainer: " + key + " on " - + thisNodeFile.getPath().getName(); - out.println(containerString); - out.println(StringUtils.repeat("=", containerString.length())); - while (true) { + FileStatus thisNodeDir = nodeFiles.next(); + if (thisNodeDir.isDirectory()) { + RemoteIterator logFiles; + try { + Path qualifiedLogDir = + FileContext.getFileContext(getConf()).makeQualified( + thisNodeDir.getPath()); + logFiles = + FileContext.getFileContext(qualifiedLogDir.toUri(), getConf()) + .listStatus(thisNodeDir.getPath()); + } catch (FileNotFoundException fnf) { + continue; + } + while (logFiles.hasNext()) { + FileStatus logFile = logFiles.next(); + if (!logFile.getPath().getName().equals(LogAggregationUtils.TMP_FILE)) { + AggregatedLogFormat.LogReader reader = + new AggregatedLogFormat.LogReader(getConf(),logFile.getPath()); try { - LogReader.readAContainerLogsForALogType(valueStream, out); - } catch (EOFException eof) { - break; + + DataInputStream valueStream; + LogKey key = new LogKey(); + valueStream = reader.next(key); + + while (valueStream != null) { + String containerString = + "\n\nContainer: " + key + " on " + + logFile.getPath().getName(); + out.println(containerString); + out.println(StringUtils.repeat("=", containerString.length())); + while (true) { + try { + LogReader.readAContainerLogsForALogType(valueStream, out); + } catch (EOFException eof) { + break; + } + } + + // Next container + key = new LogKey(); + valueStream = reader.next(key); + } + } finally { + reader.close(); } } - - // Next container - key = new LogKey(); - valueStream = reader.next(key); } - } finally { - reader.close(); } } return 0; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java index c6572e9..caacef8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java @@ -178,6 +178,7 @@ public static void setup() throws Exception { "http", "localhost", 8080, "file0")); typeValueCache.put(SerializedException.class, SerializedException.newInstance(new IOException("exception for test"))); + generateByNewInstance(LogContext.class); generateByNewInstance(ApplicationId.class); generateByNewInstance(ApplicationAttemptId.class); generateByNewInstance(ContainerId.class); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 12166e0..eb8b8c6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -72,9 +72,11 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.LogContext; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.SerializedException; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.LogContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; @@ -275,11 +277,16 @@ private void recoverApplication(ContainerManagerApplicationProto p) aclProto.getAcl()); } + LogContext logContext = null; + if (p.getLogContext() != null) { + logContext = new LogContextPBImpl(p.getLogContext()); + } + LOG.info("Recovering application " + appId); ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId, creds, context); context.getApplications().put(appId, app); - app.handle(new ApplicationInitEvent(appId, acls)); + app.handle(new ApplicationInitEvent(appId, acls, logContext)); } @SuppressWarnings("unchecked") @@ -719,13 +726,15 @@ protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier, private ContainerManagerApplicationProto buildAppProto(ApplicationId appId, String user, Credentials credentials, - Map appAcls) { + Map appAcls, LogContext logContext) { ContainerManagerApplicationProto.Builder builder = ContainerManagerApplicationProto.newBuilder(); builder.setId(((ApplicationIdPBImpl) appId).getProto()); builder.setUser(user); - + if (logContext != null) { + builder.setLogContext(((LogContextPBImpl) logContext).getProto()); + } builder.clearCredentials(); if (credentials != null) { DataOutputBuffer dob = new DataOutputBuffer(); @@ -828,10 +837,12 @@ private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier, LOG.info("Creating a new application reference for app " + applicationID); Map appAcls = container.getLaunchContext().getApplicationACLs(); + LogContext logContext = container.getLaunchContext().getLogContext(); context.getNMStateStore().storeApplication(applicationID, - buildAppProto(applicationID, user, credentials, appAcls)); + buildAppProto(applicationID, user, credentials, appAcls, + logContext)); dispatcher.getEventHandler().handle( - new ApplicationInitEvent(applicationID, appAcls)); + new ApplicationInitEvent(applicationID, appAcls, logContext)); } this.context.getNMStateStore().storeContainer(containerId, request); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index cc5544c..5eeb37b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -237,7 +237,7 @@ public void transition(ApplicationImpl app, ApplicationEvent event) { app.dispatcher.getEventHandler().handle( new LogHandlerAppStartedEvent(app.appId, app.user, app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS, - app.applicationACLs)); + app.applicationACLs, initEvent.getLogContext())); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java index 5746b6a..fdd828f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java @@ -22,18 +22,32 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.LogContext; public class ApplicationInitEvent extends ApplicationEvent { private final Map applicationACLs; + private final LogContext logContext; public ApplicationInitEvent(ApplicationId appId, Map acls) { super(appId, ApplicationEventType.INIT_APPLICATION); this.applicationACLs = acls; + this.logContext = null; + } + + public ApplicationInitEvent(ApplicationId appId, + Map acls, LogContext logContext) { + super(appId, ApplicationEventType.INIT_APPLICATION); + this.applicationACLs = acls; + this.logContext = logContext; } public Map getApplicationACLs() { return this.applicationACLs; } + + public LogContext getLogContext() { + return this.logContext; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 1af48bb..8d7c649 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -20,14 +20,17 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -36,35 +39,44 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.LogContext; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter; +import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; +import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; import org.apache.hadoop.yarn.util.ConverterUtils; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; + public class AppLogAggregatorImpl implements AppLogAggregator { private static final Log LOG = LogFactory .getLog(AppLogAggregatorImpl.class); private static final int THREAD_SLEEP_TIME = 1000; - private static final String TMP_FILE_SUFFIX = ".tmp"; private final LocalDirsHandlerService dirsHandler; private final Dispatcher dispatcher; private final ApplicationId appId; + private final NodeId nodeId; private final String applicationId; private boolean logAggregationDisabled = false; private final Configuration conf; private final DeletionService delService; private final UserGroupInformation userUgi; - private final Path remoteNodeLogFileForApp; - private final Path remoteNodeTmpLogFileForApp; + private final Path remoteNodeLogDirForApp; + private Path remoteNodeTmpLogFileForApp; private final ContainerLogsRetentionPolicy retentionPolicy; private final BlockingQueue pendingContainers; @@ -72,53 +84,37 @@ private final AtomicBoolean appAggregationFinished = new AtomicBoolean(); private final AtomicBoolean aborted = new AtomicBoolean(); private final Map appAcls; - - private LogWriter writer = null; + private final LogContext logContext; + private final Context context; + private String remoteLogFileName; + private Set currentUploadedFiles = new HashSet(); + private Set alreadyUploadedLogs = new HashSet(); + private Set currentExistingLogFiles = new HashSet(); public AppLogAggregatorImpl(Dispatcher dispatcher, - DeletionService deletionService, Configuration conf, ApplicationId appId, - UserGroupInformation userUgi, LocalDirsHandlerService dirsHandler, - Path remoteNodeLogFileForApp, + DeletionService deletionService, Configuration conf, NodeId nodeId, + ApplicationId appId, UserGroupInformation userUgi, + LocalDirsHandlerService dirsHandler, Path remoteNodeLogDirForApp, ContainerLogsRetentionPolicy retentionPolicy, - Map appAcls) { + Map appAcls, LogContext logContext, + Context context) { this.dispatcher = dispatcher; this.conf = conf; this.delService = deletionService; + this.nodeId = nodeId; this.appId = appId; this.applicationId = ConverterUtils.toString(appId); this.userUgi = userUgi; this.dirsHandler = dirsHandler; - this.remoteNodeLogFileForApp = remoteNodeLogFileForApp; - this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp(); + this.remoteNodeLogDirForApp = remoteNodeLogDirForApp; this.retentionPolicy = retentionPolicy; this.pendingContainers = new LinkedBlockingQueue(); this.appAcls = appAcls; + this.logContext = logContext; + this.context = context; } - private void uploadLogsForContainer(ContainerId containerId) { - - if (this.logAggregationDisabled) { - return; - } - - // Lazy creation of the writer - if (this.writer == null) { - LOG.info("Starting aggregate log-file for app " + this.applicationId - + " at " + this.remoteNodeTmpLogFileForApp); - try { - this.writer = - new LogWriter(this.conf, this.remoteNodeTmpLogFileForApp, - this.userUgi); - //Write ACLs once when and if the writer is created. - this.writer.writeApplicationACLs(appAcls); - this.writer.writeApplicationOwner(this.userUgi.getShortUserName()); - } catch (IOException e) { - LOG.error("Cannot create writer for app " + this.applicationId - + ". Disabling log-aggregation for this app.", e); - this.logAggregationDisabled = true; - return; - } - } + private void uploadLogsForContainer(ContainerId containerId, LogWriter writer) { LOG.info("Uploading logs for container " + containerId + ". Current good log dirs are " @@ -126,13 +122,89 @@ private void uploadLogsForContainer(ContainerId containerId) { LogKey logKey = new LogKey(containerId); LogValue logValue = new LogValue(dirsHandler.getLogDirs(), containerId, - userUgi.getShortUserName()); + userUgi.getShortUserName(), this.logContext, this.alreadyUploadedLogs); try { - this.writer.append(logKey, logValue); + writer.append(logKey, logValue); } catch (IOException e) { LOG.error("Couldn't upload logs for " + containerId + ". Skipping this container."); } + currentUploadedFiles.addAll(logValue.getCurrentUpLoadedFilesPath()); + alreadyUploadedLogs.addAll(logValue.getCurrentUpLoadedFileMeta()); + currentExistingLogFiles.addAll(logValue.getAllExistingFilesMeta()); + } + + private void uploadLogsForContainers(Set containers) { + if (this.logAggregationDisabled || containers.isEmpty()) { + return; + } + this.remoteLogFileName = + nodeId.toString().replace(":", "_") + "_" + System.currentTimeMillis(); + this.remoteNodeTmpLogFileForApp = + getRemoteNodeTmpLogFileForApp(this.remoteNodeLogDirForApp); + + LogWriter writer = null; + try { + try { + writer = + new LogWriter(this.conf, this.remoteNodeTmpLogFileForApp, + this.userUgi); + // Write ACLs once when the writer is created. + writer.writeApplicationACLs(appAcls); + writer.writeApplicationOwner(this.userUgi.getShortUserName()); + + } catch (IOException e1) { + LOG.error("Cannot create writer for app " + this.applicationId + + ". Skip log upload this time. "); + return; + } + + currentUploadedFiles.clear(); + currentExistingLogFiles.clear(); + for (ContainerId container : containers) { + uploadLogsForContainer(container, writer); + } + + this.delService.delete(this.userUgi.getShortUserName(), null, + currentUploadedFiles.toArray(new Path[currentUploadedFiles.size()])); + + // if any of the previous uoloaded logs have been deleted, + // we need to remove them from alreadyUploadedLogs + Iterable mask = + Iterables.filter(alreadyUploadedLogs, new Predicate() { + @Override + public boolean apply(String next) { + return currentExistingLogFiles.contains(next); + } + }); + + alreadyUploadedLogs = Sets.newHashSet(mask); + + if (writer != null) { + writer.close(); + } + + try { + userUgi.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + FileSystem remoteFS = FileSystem.get(conf); + remoteFS.rename(remoteNodeTmpLogFileForApp, new Path( + remoteNodeLogDirForApp, remoteLogFileName)); + return null; + } + }); + } catch (Exception e) { + LOG.error( + "Failed to move temporary log file to final location: [" + + remoteNodeTmpLogFileForApp + "] to [" + + remoteNodeLogDirForApp + "]", e); + } + } finally { + if (writer != null) { + writer.close(); + } + } } @Override @@ -149,12 +221,19 @@ public void run() { @SuppressWarnings("unchecked") private void doAppLogAggregation() { - ContainerId containerId; - while (!this.appFinishing.get() && !this.aborted.get()) { synchronized(this) { try { - wait(THREAD_SLEEP_TIME); + if (this.logContext != null && this.logContext.getLogInterval() > 0) { + wait(this.logContext.getLogInterval() * 24 * 3600 * 1000); + if (this.appFinishing.get() || this.aborted.get()) { + break; + } + uploadLogsForContainers(this.context.getApplications() + .get(this.appId).getContainers().keySet()); + } else { + wait(THREAD_SLEEP_TIME); + } } catch (InterruptedException e) { LOG.warn("PendingContainers queue is interrupted"); this.appFinishing.set(true); @@ -166,10 +245,13 @@ private void doAppLogAggregation() { return; } - // Application is finished. Finish pending-containers + // App is finished, upload the container logs. + Set ids = new HashSet(); + ContainerId containerId; while ((containerId = this.pendingContainers.poll()) != null) { - uploadLogsForContainer(containerId); + ids.add(containerId); } + uploadLogsForContainers(ids); // Remove the local app-log-dirs List rootLogDirs = dirsHandler.getLogDirs(); @@ -181,26 +263,6 @@ private void doAppLogAggregation() { } this.delService.delete(this.userUgi.getShortUserName(), null, localAppLogDirs); - - if (this.writer != null) { - this.writer.close(); - LOG.info("Finished aggregate log-file for app " + this.applicationId); - } - - try { - userUgi.doAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - FileSystem remoteFS = FileSystem.get(conf); - remoteFS.rename(remoteNodeTmpLogFileForApp, remoteNodeLogFileForApp); - return null; - } - }); - } catch (Exception e) { - LOG.error("Failed to move temporary log file to final location: [" - + remoteNodeTmpLogFileForApp + "] to [" + remoteNodeLogFileForApp - + "]", e); - } this.dispatcher.getEventHandler().handle( new ApplicationEvent(this.appId, @@ -208,9 +270,8 @@ public Object run() throws Exception { this.appAggregationFinished.set(true); } - private Path getRemoteNodeTmpLogFileForApp() { - return new Path(remoteNodeLogFileForApp.getParent(), - (remoteNodeLogFileForApp.getName() + TMP_FILE_SUFFIX)); + private Path getRemoteNodeTmpLogFileForApp(Path remoteNodeLogDirForApp) { + return new Path(remoteNodeLogDirForApp, LogAggregationUtils.TMP_FILE); } private boolean shouldUploadLogs(ContainerId containerId, @@ -267,4 +328,11 @@ public synchronized void abortLogAggregation() { this.aborted.set(true); this.notifyAll(); } + + @Private + @VisibleForTesting + public synchronized void doLogAggregationOutOfBand() { + LOG.info("Do OutOfBand log aggregation"); + this.notifyAll(); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index 58e1837..b6011c0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.LogContext; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; @@ -58,7 +59,10 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; public class LogAggregationService extends AbstractService implements @@ -223,6 +227,12 @@ Path getRemoteNodeLogFileForApp(ApplicationId appId, String user) { this.remoteRootLogDirSuffix); } + Path getRemoteNodeLogDirForApp(ApplicationId appId, String user) { + return LogAggregationUtils.getRemoteNodeLogDirForApp( + this.remoteRootLogDir, appId, user, this.nodeId, + this.remoteRootLogDirSuffix); + } + private void createDir(FileSystem fs, Path path, FsPermission fsPerm) throws IOException { FsPermission dirPerm = new FsPermission(fsPerm); @@ -259,33 +269,42 @@ public Object run() throws Exception { // Only creating directories if they are missing to avoid // unnecessary load on the filesystem from all of the nodes - Path appDir = LogAggregationUtils.getRemoteAppLogDir( + Path node_appDir = LogAggregationUtils.getRemoteNodeLogDirForApp( LogAggregationService.this.remoteRootLogDir, appId, user, - LogAggregationService.this.remoteRootLogDirSuffix); - appDir = appDir.makeQualified(remoteFS.getUri(), + nodeId, LogAggregationService.this.remoteRootLogDirSuffix); + node_appDir = node_appDir.makeQualified(remoteFS.getUri(), remoteFS.getWorkingDirectory()); - - if (!checkExists(remoteFS, appDir, APP_DIR_PERMISSIONS)) { - Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir( - LogAggregationService.this.remoteRootLogDir, user, + if (! checkExists(remoteFS, node_appDir, APP_DIR_PERMISSIONS)) { + Path appDir = LogAggregationUtils.getRemoteAppLogDir( + LogAggregationService.this.remoteRootLogDir, appId, user, LogAggregationService.this.remoteRootLogDirSuffix); - suffixDir = suffixDir.makeQualified(remoteFS.getUri(), + appDir = appDir.makeQualified(remoteFS.getUri(), remoteFS.getWorkingDirectory()); - if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) { - Path userDir = LogAggregationUtils.getRemoteLogUserDir( - LogAggregationService.this.remoteRootLogDir, user); - userDir = userDir.makeQualified(remoteFS.getUri(), + if (!checkExists(remoteFS, appDir, APP_DIR_PERMISSIONS)) { + Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir( + LogAggregationService.this.remoteRootLogDir, user, + LogAggregationService.this.remoteRootLogDirSuffix); + suffixDir = suffixDir.makeQualified(remoteFS.getUri(), remoteFS.getWorkingDirectory()); - if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) { - createDir(remoteFS, userDir, APP_DIR_PERMISSIONS); + if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) { + Path userDir = LogAggregationUtils.getRemoteLogUserDir( + LogAggregationService.this.remoteRootLogDir, user); + userDir = userDir.makeQualified(remoteFS.getUri(), + remoteFS.getWorkingDirectory()); + + if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) { + createDir(remoteFS, userDir, APP_DIR_PERMISSIONS); + } + + createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS); } - createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS); + createDir(remoteFS, appDir, APP_DIR_PERMISSIONS); } - createDir(remoteFS, appDir, APP_DIR_PERMISSIONS); + createDir(remoteFS, node_appDir, APP_DIR_PERMISSIONS); } } catch (IOException e) { LOG.error("Failed to setup application log directory for " @@ -303,11 +322,12 @@ public Object run() throws Exception { @SuppressWarnings("unchecked") private void initApp(final ApplicationId appId, String user, Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy, - Map appAcls) { + Map appAcls, LogContext logContext) { ApplicationEvent eventResponse; try { verifyAndCreateRemoteLogDir(getConfig()); - initAppAggregator(appId, user, credentials, logRetentionPolicy, appAcls); + initAppAggregator(appId, user, credentials, logRetentionPolicy, appAcls, + logContext); eventResponse = new ApplicationEvent(appId, ApplicationEventType.APPLICATION_LOG_HANDLING_INITED); } catch (YarnRuntimeException e) { @@ -320,7 +340,7 @@ private void initApp(final ApplicationId appId, String user, protected void initAppAggregator(final ApplicationId appId, String user, Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy, - Map appAcls) { + Map appAcls, LogContext logContext) { // Get user's FileSystem credentials final UserGroupInformation userUgi = @@ -332,9 +352,9 @@ protected void initAppAggregator(final ApplicationId appId, String user, // New application final AppLogAggregator appLogAggregator = new AppLogAggregatorImpl(this.dispatcher, this.deletionService, - getConfig(), appId, userUgi, dirsHandler, - getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy, - appAcls); + getConfig(), nodeId, appId, userUgi, dirsHandler, + getRemoteNodeLogDirForApp(appId, user), logRetentionPolicy, + appAcls, logContext, this.context); if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) { throw new YarnRuntimeException("Duplicate initApp for " + appId); } @@ -421,7 +441,7 @@ public void handle(LogHandlerEvent event) { initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(), appStartEvent.getCredentials(), appStartEvent.getLogRetentionPolicy(), - appStartEvent.getApplicationAcls()); + appStartEvent.getApplicationAcls(), appStartEvent.getLogContext()); break; case CONTAINER_FINISHED: LogHandlerContainerFinishedEvent containerFinishEvent = @@ -439,4 +459,14 @@ public void handle(LogHandlerEvent event) { } } + + public Clock createSystemClock() { + return new SystemClock(); + } + + @Private + @VisibleForTesting + public ConcurrentMap getAppLogAggregators() { + return this.appLogAggregators; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java index 6c07674..84c5930 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java @@ -23,6 +23,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.LogContext; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; public class LogHandlerAppStartedEvent extends LogHandlerEvent { @@ -32,6 +33,7 @@ private final String user; private final Credentials credentials; private final Map appAcls; + private final LogContext logContext; public LogHandlerAppStartedEvent(ApplicationId appId, String user, Credentials credentials, ContainerLogsRetentionPolicy retentionPolicy, @@ -42,6 +44,19 @@ public LogHandlerAppStartedEvent(ApplicationId appId, String user, this.credentials = credentials; this.retentionPolicy = retentionPolicy; this.appAcls = appAcls; + this.logContext = null; + } + + public LogHandlerAppStartedEvent(ApplicationId appId, String user, + Credentials credentials, ContainerLogsRetentionPolicy retentionPolicy, + Map appAcls, LogContext logContext) { + super(LogHandlerEventType.APPLICATION_STARTED); + this.applicationId = appId; + this.user = user; + this.credentials = credentials; + this.retentionPolicy = retentionPolicy; + this.appAcls = appAcls; + this.logContext = logContext; } public ApplicationId getApplicationId() { @@ -64,4 +79,7 @@ public String getUser() { return this.appAcls; } + public LogContext getLogContext() { + return this.logContext; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto index e6f39f6..37501c7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto @@ -29,6 +29,7 @@ message ContainerManagerApplicationProto { optional string user = 2; optional bytes credentials = 3; repeated ApplicationACLMapProto acls = 4; + optional LogContextProto log_context = 5; } message DeletionServiceDeleteTaskProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 6ab594f..e947c18 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -37,6 +37,7 @@ import java.io.DataInputStream; import java.io.EOFException; import java.io.File; +import java.io.FileNotFoundException; import java.io.FileWriter; import java.io.IOException; import java.io.PrintStream; @@ -50,14 +51,18 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.junit.Assert; - import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.Credentials; @@ -73,6 +78,7 @@ import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.LogContext; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -88,26 +94,29 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; +import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; +import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.Records; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.mortbay.util.MultiException; - - //@Ignore public class TestLogAggregationService extends BaseContainerManagerTest { @@ -178,7 +187,8 @@ public void testLocalFileDeletionAfterUpload() throws Exception { BuilderUtils.newApplicationAttemptId(application1, 1); ContainerId container11 = BuilderUtils.newContainerId(appAttemptId, 1); // Simulate log-file creation - writeContainerLogs(app1LogDir, container11); + writeContainerLogs(app1LogDir, container11, new String[] { "stdout", + "stderr", "syslog" }); logAggregationService.handle( new LogHandlerContainerFinishedEvent(container11, 0)); @@ -258,10 +268,21 @@ public void testNoContainerOnNode() throws Exception { logAggregationService.stop(); assertEquals(0, logAggregationService.getNumAggregators()); - Assert.assertFalse(new File(logAggregationService + // Node directory after ApplicationId should have been existed + Assert.assertTrue(new File(logAggregationService .getRemoteNodeLogFileForApp(application1, this.user).toUri().getPath()) .exists()); - + Path nodeLogDir = + logAggregationService.getRemoteNodeLogFileForApp(application1, + this.user); + + Path qualifiedLogDir = + FileContext.getFileContext(this.conf).makeQualified(nodeLogDir); + RemoteIterator nodeFiles = + FileContext.getFileContext(qualifiedLogDir.toUri(), this.conf) + .listStatus(nodeLogDir); + Assert.assertFalse(nodeFiles.hasNext()); + dispatcher.await(); ApplicationEvent expectedEvents[] = new ApplicationEvent[]{ @@ -283,7 +304,7 @@ public void testMultipleAppsLogAggregation() throws Exception { this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.remoteRootLogDir.getAbsolutePath()); - + String[] fileNames = new String[] { "stdout", "stderr", "syslog" }; DrainDispatcher dispatcher = createDispatcher(); EventHandler appEventHandler = mock(EventHandler.class); dispatcher.register(ApplicationEventType.class, appEventHandler); @@ -310,7 +331,7 @@ public void testMultipleAppsLogAggregation() throws Exception { ContainerId container11 = BuilderUtils.newContainerId(appAttemptId1, 1); // Simulate log-file creation - writeContainerLogs(app1LogDir, container11); + writeContainerLogs(app1LogDir, container11, fileNames); logAggregationService.handle( new LogHandlerContainerFinishedEvent(container11, 0)); @@ -328,13 +349,13 @@ public void testMultipleAppsLogAggregation() throws Exception { ContainerId container21 = BuilderUtils.newContainerId(appAttemptId2, 1); - writeContainerLogs(app2LogDir, container21); + writeContainerLogs(app2LogDir, container21, fileNames); logAggregationService.handle( new LogHandlerContainerFinishedEvent(container21, 0)); ContainerId container12 = BuilderUtils.newContainerId(appAttemptId1, 2); - writeContainerLogs(app1LogDir, container12); + writeContainerLogs(app1LogDir, container12, fileNames); logAggregationService.handle( new LogHandlerContainerFinishedEvent(container12, 0)); @@ -365,22 +386,22 @@ public void testMultipleAppsLogAggregation() throws Exception { reset(appEventHandler); ContainerId container31 = BuilderUtils.newContainerId(appAttemptId3, 1); - writeContainerLogs(app3LogDir, container31); + writeContainerLogs(app3LogDir, container31, fileNames); logAggregationService.handle( new LogHandlerContainerFinishedEvent(container31, 0)); ContainerId container32 = BuilderUtils.newContainerId(appAttemptId3, 2); - writeContainerLogs(app3LogDir, container32); + writeContainerLogs(app3LogDir, container32, fileNames); logAggregationService.handle( new LogHandlerContainerFinishedEvent(container32, 1)); // Failed ContainerId container22 = BuilderUtils.newContainerId(appAttemptId2, 2); - writeContainerLogs(app2LogDir, container22); + writeContainerLogs(app2LogDir, container22, fileNames); logAggregationService.handle( new LogHandlerContainerFinishedEvent(container22, 0)); ContainerId container33 = BuilderUtils.newContainerId(appAttemptId3, 3); - writeContainerLogs(app3LogDir, container33); + writeContainerLogs(app3LogDir, container33, fileNames); logAggregationService.handle( new LogHandlerContainerFinishedEvent(container33, 0)); @@ -395,11 +416,11 @@ public void testMultipleAppsLogAggregation() throws Exception { assertEquals(0, logAggregationService.getNumAggregators()); verifyContainerLogs(logAggregationService, application1, - new ContainerId[] { container11, container12 }); + new ContainerId[] { container11, container12 }, fileNames, 3, 1); verifyContainerLogs(logAggregationService, application2, - new ContainerId[] { container21 }); + new ContainerId[] { container21 }, fileNames, 3, 1); verifyContainerLogs(logAggregationService, application3, - new ContainerId[] { container31, container32 }); + new ContainerId[] { container31, container32 }, fileNames, 3, 1); dispatcher.await(); @@ -588,10 +609,10 @@ public void testLogAggregationInitAppFailsWithoutKillingNM() throws Exception { ApplicationId appId = BuilderUtils.newApplicationId( System.currentTimeMillis(), (int)Math.random()); - doThrow(new YarnRuntimeException("KABOOM!")) - .when(logAggregationService).initAppAggregator( - eq(appId), eq(user), any(Credentials.class), - any(ContainerLogsRetentionPolicy.class), anyMap()); + doThrow(new YarnRuntimeException("KABOOM!")).when(logAggregationService) + .initAppAggregator(eq(appId), eq(user), any(Credentials.class), + any(ContainerLogsRetentionPolicy.class), anyMap(), + any(LogContext.class)); logAggregationService.handle(new LogHandlerAppStartedEvent(appId, this.user, null, @@ -672,25 +693,47 @@ public void testLogAggregationCreateDirsFailsWithoutKillingNM() assertEquals(0, logAggregationService.getNumAggregators()); } - private void writeContainerLogs(File appLogDir, ContainerId containerId) - throws IOException { + private void writeContainerLogs(File appLogDir, ContainerId containerId, + String[] fileName) throws IOException { // ContainerLogDir should be created String containerStr = ConverterUtils.toString(containerId); File containerLogDir = new File(appLogDir, containerStr); containerLogDir.mkdir(); - for (String fileType : new String[] { "stdout", "stderr", "syslog" }) { + for (String fileType : fileName) { Writer writer11 = new FileWriter(new File(containerLogDir, fileType)); writer11.write(containerStr + " Hello " + fileType + "!"); writer11.close(); } } - private void verifyContainerLogs( - LogAggregationService logAggregationService, ApplicationId appId, - ContainerId[] expectedContainerIds) throws IOException { + private void verifyContainerLogs(LogAggregationService logAggregationService, + ApplicationId appId, ContainerId[] expectedContainerIds, + String[] logFiles, int numOfContainerLogs, int iteratorNum) + throws IOException { + Path nodeLogDir = + logAggregationService.getRemoteNodeLogFileForApp(appId, this.user); + RemoteIterator nodeFiles = null; + try { + Path qualifiedLogDir = + FileContext.getFileContext(this.conf).makeQualified( + nodeLogDir); + nodeFiles = + FileContext.getFileContext(qualifiedLogDir.toUri(), this.conf) + .listStatus(nodeLogDir); + } catch (FileNotFoundException fnf) { + Assert.fail("Should have log files"); + } + + Assert.assertTrue(nodeFiles.hasNext()); + FileStatus thisNodeFile = null; + while (iteratorNum > 0) { + thisNodeFile = nodeFiles.next(); + iteratorNum--; + } + Assert.assertNotNull(thisNodeFile); AggregatedLogFormat.LogReader reader = new AggregatedLogFormat.LogReader(this.conf, - logAggregationService.getRemoteNodeLogFileForApp(appId, this.user)); + thisNodeFile.getPath()); Assert.assertEquals(this.user, reader.getApplicationOwner()); verifyAcls(reader.getApplicationAcls()); @@ -749,8 +792,8 @@ private void verifyContainerLogs( for (ContainerId cId : expectedContainerIds) { String containerStr = ConverterUtils.toString(cId); Map thisContainerMap = logMap.remove(containerStr); - Assert.assertEquals(3, thisContainerMap.size()); - for (String fileType : new String[] { "stdout", "stderr", "syslog" }) { + Assert.assertEquals(numOfContainerLogs, thisContainerMap.size()); + for (String fileType : logFiles) { String expectedValue = containerStr + " Hello " + fileType + "!"; LOG.info("Expected log-content : " + new String(expectedValue)); String foundValue = thisContainerMap.remove(fileType); @@ -987,4 +1030,266 @@ private static String eventToString(Event event, String[] methods) throws Exc sb.append("]"); return sb.toString(); } + + @Test (timeout = 50000) + @SuppressWarnings("unchecked") + public void testLogAggregationServiceWithPatterns() throws Exception { + + LogContext logContextWithIncludePatterns = + Records.newRecord(LogContext.class); + Set includePatterns = new HashSet(); + includePatterns.add("stdout"); + includePatterns.add("syslog"); + logContextWithIncludePatterns.setLogIncludePatterns(includePatterns); + + LogContext logContextWithExcludePatterns = + Records.newRecord(LogContext.class); + Set excludePatterns = new HashSet(); + excludePatterns.add("stdout"); + excludePatterns.add("syslog"); + + logContextWithExcludePatterns.setLogExcludePatterns(excludePatterns); + + this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); + this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + this.remoteRootLogDir.getAbsolutePath()); + + DrainDispatcher dispatcher = createDispatcher(); + EventHandler appEventHandler = mock(EventHandler.class); + dispatcher.register(ApplicationEventType.class, appEventHandler); + + LogAggregationService logAggregationService = + new LogAggregationService(dispatcher, this.context, this.delSrvc, + super.dirsHandler); + logAggregationService.init(this.conf); + logAggregationService.start(); + + // LogContext for application1 has includePatten which includes + // stdout and syslog. + // After logAggregation is finished, we expect the logs for application1 + // has only logs from stdout and syslog + ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1); + + // AppLogDir should be created + File appLogDir1 = + new File(localLogDir, ConverterUtils.toString(application1)); + appLogDir1.mkdir(); + logAggregationService.handle(new LogHandlerAppStartedEvent(application1, + this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls, + logContextWithIncludePatterns)); + + ApplicationAttemptId appAttemptId1 = + BuilderUtils.newApplicationAttemptId(application1, 1); + ContainerId container1 = BuilderUtils.newContainerId(appAttemptId1, 1); + + // Simulate log-file creation + writeContainerLogs(appLogDir1, container1, new String[] { "stdout", + "stderr", "syslog" }); + logAggregationService.handle(new LogHandlerContainerFinishedEvent( + container1, 0)); + + // LogContext for application2 has excludePatten which includes + // stdout and syslog. + // After logAggregation is finished, we expect the logs for application2 + // has only logs from stderr + ApplicationId application2 = BuilderUtils.newApplicationId(1234, 2); + ApplicationAttemptId appAttemptId2 = + BuilderUtils.newApplicationAttemptId(application2, 1); + + File app2LogDir = + new File(localLogDir, ConverterUtils.toString(application2)); + app2LogDir.mkdir(); + logAggregationService.handle(new LogHandlerAppStartedEvent(application2, + this.user, null, ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY, + this.acls, logContextWithExcludePatterns)); + ContainerId container2 = BuilderUtils.newContainerId(appAttemptId2, 1); + + writeContainerLogs(app2LogDir, container2, new String[] { "stdout", + "stderr", "syslog" }); + logAggregationService.handle( + new LogHandlerContainerFinishedEvent(container2, 0)); + + dispatcher.await(); + ApplicationEvent expectedInitEvents[] = + new ApplicationEvent[] { new ApplicationEvent(application1, + ApplicationEventType.APPLICATION_LOG_HANDLING_INITED), + new ApplicationEvent(application2, + ApplicationEventType.APPLICATION_LOG_HANDLING_INITED) }; + checkEvents(appEventHandler, expectedInitEvents, false, "getType", + "getApplicationID"); + reset(appEventHandler); + + logAggregationService.handle(new LogHandlerAppFinishedEvent(application1)); + logAggregationService.handle(new LogHandlerAppFinishedEvent(application2)); + logAggregationService.stop(); + assertEquals(0, logAggregationService.getNumAggregators()); + + String[] logFiles = new String[] { "stdout", "syslog" }; + verifyContainerLogs(logAggregationService, application1, + new ContainerId[] { container1 }, logFiles, 2, 1); + + logFiles = new String[] { "stderr" }; + verifyContainerLogs(logAggregationService, application2, + new ContainerId[] { container2 }, logFiles, 1, 1); + + dispatcher.await(); + + ApplicationEvent[] expectedFinishedEvents = + new ApplicationEvent[] { new ApplicationEvent(application1, + ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED), + new ApplicationEvent(application2, + ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)}; + checkEvents(appEventHandler, expectedFinishedEvents, false, "getType", + "getApplicationID"); + dispatcher.stop(); + } + + @SuppressWarnings("unchecked") + @Test (timeout = 50000) + public void testLogAggregationServiceWithInterval() throws Exception { + + int max_attempt = 50; + + LogContext logContextWithInterval = + Records.newRecord(LogContext.class); + logContextWithInterval.setLogInterval(5000); + + this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); + this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + this.remoteRootLogDir.getAbsolutePath()); + this.conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600); + + DrainDispatcher dispatcher = createDispatcher(); + EventHandler appEventHandler = mock(EventHandler.class); + dispatcher.register(ApplicationEventType.class, appEventHandler); + + ApplicationId application = BuilderUtils.newApplicationId(123456, 1); + ApplicationAttemptId appAttemptId = + BuilderUtils.newApplicationAttemptId(application, 1); + ContainerId container = BuilderUtils.newContainerId(appAttemptId, 1); + + Context context = spy(this.context); + ConcurrentMap maps = + new ConcurrentHashMap(); + Application app = mock(Application.class); + Map containers = new HashMap(); + containers.put(container, mock(Container.class)); + maps.put(application, app); + when(app.getContainers()).thenReturn(containers); + when(context.getApplications()).thenReturn(maps); + + LogAggregationService logAggregationService = + new LogAggregationService(dispatcher, context, this.delSrvc, + super.dirsHandler); + + logAggregationService.init(this.conf); + logAggregationService.start(); + + // AppLogDir should be created + File appLogDir = + new File(localLogDir, ConverterUtils.toString(application)); + appLogDir.mkdir(); + logAggregationService.handle(new LogHandlerAppStartedEvent(application, + this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls, + logContextWithInterval)); + + // Simulate log-file creation + String[] logFiles1 = new String[] { "stdout", "stderr", "syslog" }; + writeContainerLogs(appLogDir, container, logFiles1); + + // Do log aggregation + AppLogAggregatorImpl aggregator = + (AppLogAggregatorImpl) logAggregationService.getAppLogAggregators() + .get(application); + aggregator.doLogAggregationOutOfBand(); + + // Make sure the log is uploaded + int count = 0; + while(count <= max_attempt) { + if (checkLogFileAvailable(logAggregationService, application, 1)) { + break; + } + Thread.sleep(200); + count ++; + } + + Assert.assertTrue(checkLogFileAvailable(logAggregationService, + application, 1)); + // Container logs should be uploaded + verifyContainerLogs(logAggregationService, application, + new ContainerId[] { container }, logFiles1, 3, 1); + + // Do log aggregation + String[] logFiles2 = new String[] { "stdout_1", "stderr_1", "syslog_1" }; + writeContainerLogs(appLogDir, container, logFiles2); + + aggregator.doLogAggregationOutOfBand(); + + // Make sure the log is uploaded + count = 0; + while(count <= max_attempt) { + if (checkLogFileAvailable(logAggregationService, application, 2)) { + break; + } + Thread.sleep(200); + count ++; + } + + Assert.assertTrue(checkLogFileAvailable(logAggregationService, application, 2)); + + // Container logs should be uploaded + verifyContainerLogs(logAggregationService, application, + new ContainerId[] { container }, logFiles2, 3, 2); + + // create another logs + String[] logFiles3 = new String[] { "stdout_2", "stderr_2", "syslog_2" }; + writeContainerLogs(appLogDir, container, logFiles3); + logAggregationService.handle( + new LogHandlerContainerFinishedEvent(container, 0)); + + dispatcher.await(); + logAggregationService.handle(new LogHandlerAppFinishedEvent(application)); + logAggregationService.stop(); + assertEquals(0, logAggregationService.getNumAggregators()); + // Make sure the log is uploaded + count = 0; + while(count <= max_attempt) { + if (checkLogFileAvailable(logAggregationService, application, 3)) { + break; + } + Thread.sleep(200); + count ++; + } + + Assert.assertTrue(checkLogFileAvailable(logAggregationService, application, 3)); + verifyContainerLogs(logAggregationService, application, + new ContainerId[] { container }, logFiles3, 3, 3); + dispatcher.stop(); + + } + + private boolean checkLogFileAvailable( + LogAggregationService logAggregationService, ApplicationId appId, + int numOfLogFiles) throws IOException { + Path nodeLogDir = + logAggregationService.getRemoteNodeLogFileForApp(appId, this.user); + RemoteIterator nodeFiles = null; + try { + Path qualifiedLogDir = + FileContext.getFileContext(this.conf).makeQualified(nodeLogDir); + nodeFiles = + FileContext.getFileContext(qualifiedLogDir.toUri(), this.conf) + .listStatus(nodeLogDir); + } catch (FileNotFoundException fnf) { + return false; + } + int count = 0; + while (nodeFiles.hasNext()) { + FileStatus status = nodeFiles.next(); + if (!status.getPath().getName().contains(LogAggregationUtils.TMP_FILE)) { + count++; + } + } + return count == numOfLogFiles; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java index 0dd9ba1..d203bed 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java @@ -188,6 +188,9 @@ private ContainerLaunchContext createAMContainerLaunchContext( // Construct the actual Container ContainerLaunchContext container = applicationMasterContext.getAMContainerSpec(); + if (applicationMasterContext.getLogContext() != null) { + container.setLogContext(applicationMasterContext.getLogContext()); + } LOG.info("Command to launch container " + containerID + " : "