diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 4c48cf5..0adee53 100644
--- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -248,6 +248,12 @@
public static final String DEFAULT_LOG_LEVEL = "INFO";
+ public static final String LOG_INCLUDE_PATTERNS = "mapreduce.log.include.patterns";
+
+ public static final String LOG_EXCLUDE_PATTERNS = "mapreduce.log.exclude.patterns";
+
+ public static final String LOG_INTERVAL = "mapreduce.log.interval";
+
public static final String REDUCE_MERGE_INMEM_THRESHOLD = "mapreduce.reduce.merge.inmem.threshold";
public static final String REDUCE_INPUT_BUFFER_PERCENT = "mapreduce.reduce.input.buffer.percent";
diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 703a103..80e2f81 100644
--- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -1489,4 +1489,33 @@
- HTTPS_ONLY : Service is provided only on https
+
+
+ mapreduce.log.include.patterns
+
+ This configure defines include patterns which is used to
+ filter the log files. The log files which match the include
+ patterns will be uploaded.
+ This takes a comma-separated list of patterns.
+
+
+
+
+ mapreduce.log.exclude.patterns
+
+ This configure defines exclude patterns which is used to
+ filter the log files. The log files which match the exclude
+ patterns will not be uploaded.
+ This takes a comma-separated list of patterns.
+
+
+
+
+ mapreduce.log.interval
+
+ This configure defines how often the logAggregationSerivce uploads
+ container logs in millseconds.
+
+
+
diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
index 5120c85..70b478e 100644
--- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
+++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
@@ -26,6 +26,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.Vector;
import org.apache.commons.logging.Log;
@@ -76,6 +77,7 @@
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.LogContext;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -85,6 +87,7 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenSelector;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
import com.google.common.annotations.VisibleForTesting;
@@ -503,6 +506,25 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
if (tagsFromConf != null && !tagsFromConf.isEmpty()) {
appContext.setApplicationTags(new HashSet(tagsFromConf));
}
+ LogContext logContext = Records.newRecord(LogContext.class);
+ if (conf.getLong(MRJobConfig.LOG_INTERVAL, 0) != 0) {
+ logContext.setLogInterval(conf.getLong(MRJobConfig.LOG_INTERVAL, 0));
+ }
+ if (conf.get(MRJobConfig.LOG_INCLUDE_PATTERNS) != null) {
+ Set includePattern = new HashSet();
+ for (String str : conf.get(MRJobConfig.LOG_INCLUDE_PATTERNS).split(",")) {
+ includePattern.add(str);
+ }
+ logContext.setLogIncludePatterns(includePattern);
+ }
+ if (conf.get(MRJobConfig.LOG_EXCLUDE_PATTERNS) != null) {
+ Set excludePattern = new HashSet();
+ for (String str : conf.get(MRJobConfig.LOG_EXCLUDE_PATTERNS).split(",")) {
+ excludePattern.add(str);
+ }
+ logContext.setLogExcludePatterns(excludePattern);
+ }
+ appContext.setLogContext(logContext);
return appContext;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
index 1ee04f0..5c97425 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
@@ -103,6 +103,21 @@ public static ApplicationSubmissionContext newInstance(
resource, null);
}
+ @Public
+ @Unstable
+ public static ApplicationSubmissionContext newInstance(
+ ApplicationId applicationId, String applicationName, String queue,
+ Priority priority, ContainerLaunchContext amContainer,
+ boolean isUnmanagedAM, boolean cancelTokensWhenComplete,
+ int maxAppAttempts, Resource resource, String applicationType,
+ boolean keepContainers, LogContext logContext) {
+ ApplicationSubmissionContext context =
+ newInstance(applicationId, applicationName, queue, priority,
+ amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts,
+ resource, applicationType, keepContainers);
+ context.setLogContext(logContext);
+ return context;
+ }
/**
* Get the ApplicationId of the submitted application.
* @return ApplicationId of the submitted application
@@ -338,4 +353,12 @@ public abstract void setKeepContainersAcrossApplicationAttempts(
@Public
@Stable
public abstract void setApplicationTags(Set tags);
+
+ @Public
+ @Stable
+ public abstract LogContext getLogContext();
+
+ @Public
+ @Stable
+ public abstract void setLogContext(LogContext logContext);
}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java
index a648fef..cd20939 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java
@@ -22,8 +22,10 @@
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
@@ -46,6 +48,7 @@
* Optional, application-specific binary service data.
* Environment variables for the launched process.
* Command to launch the container.
+ * {@link LogContext} will be set by RM when it launches AM.
*
*
*
@@ -73,6 +76,18 @@ public static ContainerLaunchContext newInstance(
return container;
}
+ @Private
+ @Unstable
+ public static ContainerLaunchContext newInstance(
+ Map localResources,
+ Map environment, List commands,
+ Map serviceData, ByteBuffer tokens,
+ Map acls, LogContext logContext) {
+ ContainerLaunchContext container = newInstance(localResources, environment,
+ commands, serviceData, tokens, acls);
+ container.setLogContext(logContext);
+ return container;
+ }
/**
* Get all the tokens needed by this container. It may include file-system
* tokens, ApplicationMaster related tokens if this container is an
@@ -196,4 +211,12 @@ public static ContainerLaunchContext newInstance(
@Public
@Stable
public abstract void setApplicationACLs(Map acls);
+
+ @Private
+ @Stable
+ public abstract LogContext getLogContext();
+
+ @Private
+ @Stable
+ public abstract void setLogContext(LogContext logContext);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogContext.java
new file mode 100644
index 0000000..a235653
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogContext.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.util.Records;
+
+public abstract class LogContext {
+
+ @Public
+ @Unstable
+ public static LogContext newInstance(Set logIncludePattern,
+ Set logExcludePattern, long logInterval) {
+ LogContext context = Records.newRecord(LogContext.class);
+ context.setLogIncludePatterns(logIncludePattern);
+ context.setLogExcludePatterns(logExcludePattern);
+ context.setLogInterval(logInterval);
+ return context;
+ }
+
+ @Public
+ @Stable
+ public abstract Set getLogIncludePatterns();
+
+ @Public
+ @Stable
+ public abstract void setLogIncludePatterns(Set logPattern);
+
+ @Public
+ @Stable
+ public abstract Set getLogExcludePatterns();
+
+ @Public
+ @Stable
+ public abstract void setLogExcludePatterns(Set logPattern);
+
+ @Public
+ @Stable
+ public abstract long getLogInterval();
+
+ @Public
+ @Stable
+ public abstract void setLogInterval(long logInterval);
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 3f1fa6c..f758d97 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -288,6 +288,13 @@ message ApplicationSubmissionContextProto {
optional string applicationType = 10 [default = "YARN"];
optional bool keep_containers_across_application_attempts = 11 [default = false];
repeated string applicationTags = 12;
+ optional LogContextProto log_context = 13;
+}
+
+message LogContextProto {
+ repeated string log_include_pattern = 1;
+ repeated string log_exclude_pattern = 2;
+ optional int64 log_monitor_interval = 3 [default = 0];
}
enum ApplicationAccessTypeProto {
@@ -340,6 +347,7 @@ message ContainerLaunchContextProto {
repeated StringStringMapProto environment = 4;
repeated string command = 5;
repeated ApplicationACLMapProto application_ACLs = 6;
+ optional LogContextProto log_context = 7;
}
message ContainerStatusProto {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
index c2f3268..77e255f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
@@ -19,11 +19,13 @@
package org.apache.hadoop.yarn.api.records.impl.pb;
import com.google.common.base.CharMatcher;
+
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LogContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -31,6 +33,7 @@
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.LogContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
@@ -53,6 +56,7 @@
private ContainerLaunchContext amContainer = null;
private Resource resource = null;
private Set applicationTags = null;
+ private LogContext logContext = null;
public ApplicationSubmissionContextPBImpl() {
builder = ApplicationSubmissionContextProto.newBuilder();
@@ -110,6 +114,9 @@ private void mergeLocalToBuilder() {
builder.clearApplicationTags();
builder.addAllApplicationTags(this.applicationTags);
}
+ if (this.logContext != null) {
+ builder.setLogContext(convertToProtoFormat(this.logContext));
+ }
}
private void mergeLocalToProto() {
@@ -402,4 +409,33 @@ private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
private ResourceProto convertToProtoFormat(Resource t) {
return ((ResourcePBImpl)t).getProto();
}
+
+ private LogContextPBImpl convertFromProtoFormat(LogContextProto p) {
+ return new LogContextPBImpl(p);
+ }
+
+ private LogContextProto convertToProtoFormat(LogContext t) {
+ return ((LogContextPBImpl)t).getProto();
+ }
+
+ @Override
+ public LogContext getLogContext() {
+ ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.logContext != null) {
+ return this.logContext;
+ } // Else via proto
+ if (!p.hasLogContext()) {
+ return null;
+ }
+ logContext = convertFromProtoFormat(p.getLogContext());
+ return logContext;
+ }
+
+ @Override
+ public void setLogContext(LogContext logContext) {
+ maybeInitBuilder();
+ if (logContext == null)
+ builder.clearLogContext();
+ this.logContext = logContext;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java
index 12dcfcd..0373ef2 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java
@@ -30,10 +30,12 @@
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LogContext;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.LogContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.StringBytesMapProto;
import org.apache.hadoop.yarn.proto.YarnProtos.StringLocalResourceMapProto;
import org.apache.hadoop.yarn.proto.YarnProtos.StringStringMapProto;
@@ -56,6 +58,7 @@
private Map environment = null;
private List commands = null;
private Map applicationACLS = null;
+ private LogContext logContext = null;
public ContainerLaunchContextPBImpl() {
builder = ContainerLaunchContextProto.newBuilder();
@@ -120,6 +123,9 @@ private void mergeLocalToBuilder() {
if (this.applicationACLS != null) {
addApplicationACLs();
}
+ if (this.logContext != null) {
+ builder.setLogContext(convertToProtoFormat(this.logContext));
+ }
}
private void mergeLocalToProto() {
@@ -463,4 +469,34 @@ private LocalResourcePBImpl convertFromProtoFormat(LocalResourceProto p) {
private LocalResourceProto convertToProtoFormat(LocalResource t) {
return ((LocalResourcePBImpl)t).getProto();
}
+
+
+ private LogContextPBImpl convertFromProtoFormat(LogContextProto p) {
+ return new LogContextPBImpl(p);
+ }
+
+ private LogContextProto convertToProtoFormat(LogContext t) {
+ return ((LogContextPBImpl)t).getProto();
+ }
+
+ @Override
+ public LogContext getLogContext() {
+ ContainerLaunchContextProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.logContext != null) {
+ return this.logContext;
+ } // Else via proto
+ if (!p.hasLogContext()) {
+ return null;
+ }
+ logContext = convertFromProtoFormat(p.getLogContext());
+ return logContext;
+ }
+
+ @Override
+ public void setLogContext(LogContext logContext) {
+ maybeInitBuilder();
+ if (logContext == null)
+ builder.clearLogContext();
+ this.logContext = logContext;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogContextPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogContextPBImpl.java
new file mode 100644
index 0000000..6cd2b68
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogContextPBImpl.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records.impl.pb;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.LogContext;
+import org.apache.hadoop.yarn.proto.YarnProtos.LogContextProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.LogContextProtoOrBuilder;
+
+import com.google.protobuf.TextFormat;
+
+@Private
+@Unstable
+public class LogContextPBImpl extends LogContext{
+ LogContextProto proto =
+ LogContextProto.getDefaultInstance();
+ LogContextProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private Set logIncludePatterns = null;
+ private Set logExcludePatterns = null;
+
+ public LogContextPBImpl() {
+ builder = LogContextProto.newBuilder();
+ }
+
+ public LogContextPBImpl(LogContextProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public LogContextProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+ private void mergeLocalToBuilder() {
+ if (this.logIncludePatterns != null && !this.logIncludePatterns.isEmpty()) {
+ builder.clearLogIncludePattern();
+ builder.addAllLogIncludePattern(this.logIncludePatterns);
+ }
+
+ if (this.logExcludePatterns != null && !this.logExcludePatterns.isEmpty()) {
+ builder.clearLogExcludePattern();
+ builder.addAllLogExcludePattern(this.logExcludePatterns);
+ }
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = LogContextProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public Set getLogIncludePatterns() {
+ initLogIncludePatterns();
+ return this.logIncludePatterns;
+ }
+
+ @Override
+ public void setLogIncludePatterns(Set logPattern) {
+ maybeInitBuilder();
+ if (logPattern == null || logPattern.isEmpty()) {
+ builder.clearLogIncludePattern();
+ this.logIncludePatterns = null;
+ return;
+ }
+
+ this.logIncludePatterns = new HashSet();
+ this.logIncludePatterns.addAll(logPattern);
+ }
+
+ private void initLogIncludePatterns() {
+ if (this.logIncludePatterns != null) {
+ return;
+ }
+ LogContextProtoOrBuilder p = viaProto ? proto : builder;
+ this.logIncludePatterns = new HashSet();
+ this.logIncludePatterns.addAll(p.getLogIncludePatternList());
+ }
+
+ @Override
+ public Set getLogExcludePatterns() {
+ initLogExcludePatterns();
+ return this.logExcludePatterns;
+ }
+
+ @Override
+ public void setLogExcludePatterns(Set logPattern) {
+ maybeInitBuilder();
+ if (logPattern == null || logPattern.isEmpty()) {
+ builder.clearLogExcludePattern();
+ this.logExcludePatterns = null;
+ return;
+ }
+
+ this.logExcludePatterns = new HashSet();
+ this.logExcludePatterns.addAll(logPattern);
+ }
+
+ private void initLogExcludePatterns() {
+ if (this.logExcludePatterns != null) {
+ return;
+ }
+ LogContextProtoOrBuilder p = viaProto ? proto : builder;
+ this.logExcludePatterns = new HashSet();
+ this.logExcludePatterns.addAll(p.getLogExcludePatternList());
+ }
+
+ @Override
+ public long getLogInterval() {
+ LogContextProtoOrBuilder p = viaProto ? proto : builder;
+ return p.getLogMonitorInterval();
+ }
+
+ @Override
+ public void setLogInterval(long logInterval) {
+ maybeInitBuilder();
+ builder.setLogMonitorInterval(logInterval);
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
index 3568de2..799d518 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
@@ -24,6 +24,7 @@
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
+import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
@@ -35,10 +36,15 @@
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.apache.commons.io.input.BoundedInputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -60,6 +66,7 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LogContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -149,6 +156,9 @@ public String toString() {
private final List rootLogDirs;
private final ContainerId containerId;
private final String user;
+ private final LogContext logContext;
+ private Set uploadedFiles = new HashSet();
+ private long previousUploadTime;
// TODO Maybe add a version string here. Instead of changing the version of
// the entire k-v format
@@ -160,6 +170,19 @@ public LogValue(List rootLogDirs, ContainerId containerId,
// Ensure logs are processed in lexical order
Collections.sort(this.rootLogDirs);
+ this.logContext = null;
+ }
+
+ public LogValue(List rootLogDirs, ContainerId containerId,
+ String user, LogContext logContext, long previousUploadTime) {
+ this.rootLogDirs = new ArrayList(rootLogDirs);
+ this.containerId = containerId;
+ this.user = user;
+
+ // Ensure logs are processed in lexical order
+ Collections.sort(this.rootLogDirs);
+ this.logContext = logContext;
+ this.previousUploadTime = previousUploadTime;
}
public void write(DataOutputStream out) throws IOException {
@@ -178,7 +201,9 @@ public void write(DataOutputStream out) throws IOException {
}
// Write out log files in lexical order
- File[] logFiles = containerLogDir.listFiles();
+ File[] logFiles = getLogFiles(containerLogDir);
+
+ addToUpLoadedFiles(logFiles);
Arrays.sort(logFiles);
for (File logFile : logFiles) {
@@ -232,6 +257,62 @@ public void write(DataOutputStream out) throws IOException {
public String getUser() {
return user;
}
+
+ private File[] getLogFiles(File containerLogDir) {
+ File[] logFiles = containerLogDir.listFiles();
+ if (this.logContext != null) {
+ if (this.logContext.getLogIncludePatterns() != null
+ && !this.logContext.getLogIncludePatterns().isEmpty()) {
+ FileFilter includeLogFileFilter =
+ new WildcardFileFilter(new ArrayList(
+ this.logContext.getLogIncludePatterns()));
+ logFiles = containerLogDir.listFiles(includeLogFileFilter);
+ }
+ // Filter out the very old log Files
+ // All the log Files which created before
+ // this.previousUploadTime are considered as old files.
+ // Those old files should have already been uploaded in previous runs.
+ Set newLogs = new HashSet();
+ for (File logFile : logFiles) {
+ if (logFile.lastModified() > this.previousUploadTime) {
+ newLogs.add(logFile);
+ }
+ }
+ logFiles = newLogs.toArray(new File[newLogs.size()]);
+
+ if (this.logContext.getLogExcludePatterns() != null
+ && !this.logContext.getLogExcludePatterns().isEmpty()) {
+ Set unMatchedFiles = new HashSet();
+ for (File logFile : logFiles) {
+ boolean matched = false;
+ for (String excludePattern : this.logContext
+ .getLogExcludePatterns()) {
+ Pattern p = Pattern.compile(excludePattern);
+ Matcher m = p.matcher(logFile.getName());
+ if (m.find()) {
+ matched = true;
+ break;
+ }
+ }
+ if (!matched) {
+ unMatchedFiles.add(logFile);
+ }
+ }
+ logFiles = unMatchedFiles.toArray(new File[unMatchedFiles.size()]);
+ }
+ }
+ return logFiles;
+ }
+
+ private void addToUpLoadedFiles(File[] logFiles) {
+ for (File logFile : logFiles) {
+ this.uploadedFiles.add(new Path(logFile.getAbsolutePath()));
+ }
+ }
+
+ public Set getUpLoadedFiles() {
+ return this.uploadedFiles;
+ }
}
/**
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
index 4445ff9..96cbdf6 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
@@ -28,6 +28,7 @@
@Private
public class LogAggregationUtils {
+ public static final String TMP_FILE = "TEMP.tmp";
/**
* Constructs the full filename for an application's log file per node.
* @param remoteRootLogDir
@@ -44,6 +45,21 @@ public static Path getRemoteNodeLogFileForApp(Path remoteRootLogDir,
}
/**
+ * Get the remote node dir under remote app log dir.
+ * @param remoteRootLogDir
+ * @param appId
+ * @param user
+ * @param nodeId
+ * @param suffix
+ * @return the remote log file.
+ */
+ public static Path getRemoteNodeLogDirForApp(Path remoteRootLogDir,
+ ApplicationId appId, String user, NodeId nodeId, String suffix) {
+ return new Path(getRemoteAppLogDir(remoteRootLogDir, appId, user, suffix),
+ getNodeString(nodeId));
+ }
+
+ /**
* Gets the remote app log dir.
* @param remoteRootLogDir
* @param appId
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
index 3bafdb3..0dc2778 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
@@ -52,19 +52,35 @@ public int dumpAContainersLogs(String appId, String containerId,
YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(getConf());
- Path logPath = LogAggregationUtils.getRemoteNodeLogFileForApp(
+ Path remoteAppNodeLogDir = LogAggregationUtils.getRemoteNodeLogFileForApp(
remoteRootLogDir, ConverterUtils.toApplicationId(appId), jobOwner,
ConverterUtils.toNodeId(nodeId), suffix);
- AggregatedLogFormat.LogReader reader;
+ RemoteIterator nodeFiles;
try {
- reader = new AggregatedLogFormat.LogReader(getConf(), logPath);
- } catch (FileNotFoundException fnfe) {
- System.out.println("Logs not available at " + logPath.toString());
+ Path qualifiedLogDir =
+ FileContext.getFileContext(getConf()).makeQualified(
+ remoteAppNodeLogDir);
+ nodeFiles =
+ FileContext.getFileContext(qualifiedLogDir.toUri(), getConf())
+ .listStatus(remoteAppNodeLogDir);
+ } catch (FileNotFoundException fnf) {
+ System.out.println("Logs not available at "
+ + remoteAppNodeLogDir.toString());
System.out
.println("Log aggregation has not completed or is not enabled.");
return -1;
}
- return dumpAContainerLogs(containerId, reader, System.out);
+ while (nodeFiles.hasNext()) {
+ FileStatus thisNodeFile = nodeFiles.next();
+ if (!thisNodeFile.getPath().getName()
+ .equals(LogAggregationUtils.TMP_FILE)) {
+ AggregatedLogFormat.LogReader reader =
+ new AggregatedLogFormat.LogReader(getConf(), new Path(
+ remoteAppNodeLogDir, thisNodeFile.getPath().getName()));
+ dumpAContainerLogs(containerId, reader, System.out);
+ }
+ }
+ return 0;
}
@Private
@@ -99,55 +115,78 @@ public int dumpAContainerLogs(String containerIdStr,
@Private
public int dumpAllContainersLogs(ApplicationId appId, String appOwner,
PrintStream out) throws IOException {
- Path remoteRootLogDir = new Path(getConf().get(
- YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
- YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
+ Path remoteRootLogDir =
+ new Path(getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+ YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
String user = appOwner;
- String logDirSuffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(getConf());
+ String logDirSuffix =
+ LogAggregationUtils.getRemoteNodeLogDirSuffix(getConf());
// TODO Change this to get a list of files from the LAS.
- Path remoteAppLogDir = LogAggregationUtils.getRemoteAppLogDir(
- remoteRootLogDir, appId, user, logDirSuffix);
+ Path remoteAppLogDir =
+ LogAggregationUtils.getRemoteAppLogDir(remoteRootLogDir, appId, user,
+ logDirSuffix);
RemoteIterator nodeFiles;
try {
Path qualifiedLogDir =
FileContext.getFileContext(getConf()).makeQualified(remoteAppLogDir);
- nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(),
- getConf()).listStatus(remoteAppLogDir);
+ nodeFiles =
+ FileContext.getFileContext(qualifiedLogDir.toUri(), getConf())
+ .listStatus(remoteAppLogDir);
} catch (FileNotFoundException fnf) {
System.out.println("Logs not available at " + remoteAppLogDir.toString());
System.out
- .println("Log aggregation has not completed or is not enabled.");
+ .println("Log aggregation has not completed or is not enabled.");
return -1;
}
while (nodeFiles.hasNext()) {
- FileStatus thisNodeFile = nodeFiles.next();
- AggregatedLogFormat.LogReader reader = new AggregatedLogFormat.LogReader(
- getConf(), new Path(remoteAppLogDir, thisNodeFile.getPath().getName()));
- try {
-
- DataInputStream valueStream;
- LogKey key = new LogKey();
- valueStream = reader.next(key);
-
- while (valueStream != null) {
- String containerString = "\n\nContainer: " + key + " on "
- + thisNodeFile.getPath().getName();
- out.println(containerString);
- out.println(StringUtils.repeat("=", containerString.length()));
- while (true) {
+ FileStatus thisNodeDir = nodeFiles.next();
+ if (thisNodeDir.isDirectory()) {
+ RemoteIterator logFiles;
+ try {
+ Path qualifiedLogDir =
+ FileContext.getFileContext(getConf()).makeQualified(
+ thisNodeDir.getPath());
+ logFiles =
+ FileContext.getFileContext(qualifiedLogDir.toUri(), getConf())
+ .listStatus(thisNodeDir.getPath());
+ } catch (FileNotFoundException fnf) {
+ continue;
+ }
+ while (logFiles.hasNext()) {
+ FileStatus logFile = logFiles.next();
+ if (!logFile.getPath().getName().equals(LogAggregationUtils.TMP_FILE)) {
+ AggregatedLogFormat.LogReader reader =
+ new AggregatedLogFormat.LogReader(getConf(), new Path(
+ remoteAppLogDir, logFile.getPath().getName()));
try {
- LogReader.readAContainerLogsForALogType(valueStream, out);
- } catch (EOFException eof) {
- break;
+
+ DataInputStream valueStream;
+ LogKey key = new LogKey();
+ valueStream = reader.next(key);
+
+ while (valueStream != null) {
+ String containerString =
+ "\n\nContainer: " + key + " on "
+ + logFile.getPath().getName();
+ out.println(containerString);
+ out.println(StringUtils.repeat("=", containerString.length()));
+ while (true) {
+ try {
+ LogReader.readAContainerLogsForALogType(valueStream, out);
+ } catch (EOFException eof) {
+ break;
+ }
+ }
+
+ // Next container
+ key = new LogKey();
+ valueStream = reader.next(key);
+ }
+ } finally {
+ reader.close();
}
}
-
- // Next container
- key = new LogKey();
- valueStream = reader.next(key);
}
- } finally {
- reader.close();
}
}
return 0;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
index c6572e9..caacef8 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
@@ -178,6 +178,7 @@ public static void setup() throws Exception {
"http", "localhost", 8080, "file0"));
typeValueCache.put(SerializedException.class,
SerializedException.newInstance(new IOException("exception for test")));
+ generateByNewInstance(LogContext.class);
generateByNewInstance(ApplicationId.class);
generateByNewInstance(ApplicationAttemptId.class);
generateByNewInstance(ContainerId.class);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 12166e0..eb8b8c6 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -72,9 +72,11 @@
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.LogContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.LogContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
@@ -275,11 +277,16 @@ private void recoverApplication(ContainerManagerApplicationProto p)
aclProto.getAcl());
}
+ LogContext logContext = null;
+ if (p.getLogContext() != null) {
+ logContext = new LogContextPBImpl(p.getLogContext());
+ }
+
LOG.info("Recovering application " + appId);
ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId,
creds, context);
context.getApplications().put(appId, app);
- app.handle(new ApplicationInitEvent(appId, acls));
+ app.handle(new ApplicationInitEvent(appId, acls, logContext));
}
@SuppressWarnings("unchecked")
@@ -719,13 +726,15 @@ protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier,
private ContainerManagerApplicationProto buildAppProto(ApplicationId appId,
String user, Credentials credentials,
- Map appAcls) {
+ Map appAcls, LogContext logContext) {
ContainerManagerApplicationProto.Builder builder =
ContainerManagerApplicationProto.newBuilder();
builder.setId(((ApplicationIdPBImpl) appId).getProto());
builder.setUser(user);
-
+ if (logContext != null) {
+ builder.setLogContext(((LogContextPBImpl) logContext).getProto());
+ }
builder.clearCredentials();
if (credentials != null) {
DataOutputBuffer dob = new DataOutputBuffer();
@@ -828,10 +837,12 @@ private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
LOG.info("Creating a new application reference for app " + applicationID);
Map appAcls =
container.getLaunchContext().getApplicationACLs();
+ LogContext logContext = container.getLaunchContext().getLogContext();
context.getNMStateStore().storeApplication(applicationID,
- buildAppProto(applicationID, user, credentials, appAcls));
+ buildAppProto(applicationID, user, credentials, appAcls,
+ logContext));
dispatcher.getEventHandler().handle(
- new ApplicationInitEvent(applicationID, appAcls));
+ new ApplicationInitEvent(applicationID, appAcls, logContext));
}
this.context.getNMStateStore().storeContainer(containerId, request);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
index cc5544c..d6b3baf 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
@@ -237,7 +237,7 @@ public void transition(ApplicationImpl app, ApplicationEvent event) {
app.dispatcher.getEventHandler().handle(
new LogHandlerAppStartedEvent(app.appId, app.user,
app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS,
- app.applicationACLs));
+ app.applicationACLs, initEvent.getLogContext()));
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java
index 5746b6a..fdd828f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java
@@ -22,18 +22,32 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.LogContext;
public class ApplicationInitEvent extends ApplicationEvent {
private final Map applicationACLs;
+ private final LogContext logContext;
public ApplicationInitEvent(ApplicationId appId,
Map acls) {
super(appId, ApplicationEventType.INIT_APPLICATION);
this.applicationACLs = acls;
+ this.logContext = null;
+ }
+
+ public ApplicationInitEvent(ApplicationId appId,
+ Map acls, LogContext logContext) {
+ super(appId, ApplicationEventType.INIT_APPLICATION);
+ this.applicationACLs = acls;
+ this.logContext = logContext;
}
public Map getApplicationACLs() {
return this.applicationACLs;
}
+
+ public LogContext getLogContext() {
+ return this.logContext;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index 1af48bb..0f3045a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -20,8 +20,10 @@
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -33,14 +35,19 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LogContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
@@ -53,18 +60,18 @@
private static final Log LOG = LogFactory
.getLog(AppLogAggregatorImpl.class);
private static final int THREAD_SLEEP_TIME = 1000;
- private static final String TMP_FILE_SUFFIX = ".tmp";
private final LocalDirsHandlerService dirsHandler;
private final Dispatcher dispatcher;
private final ApplicationId appId;
+ private final NodeId nodeId;
private final String applicationId;
private boolean logAggregationDisabled = false;
private final Configuration conf;
private final DeletionService delService;
private final UserGroupInformation userUgi;
- private final Path remoteNodeLogFileForApp;
- private final Path remoteNodeTmpLogFileForApp;
+ private final Path remoteNodeLogDirForApp;
+ private Path remoteNodeTmpLogFileForApp;
private final ContainerLogsRetentionPolicy retentionPolicy;
private final BlockingQueue pendingContainers;
@@ -72,67 +79,118 @@
private final AtomicBoolean appAggregationFinished = new AtomicBoolean();
private final AtomicBoolean aborted = new AtomicBoolean();
private final Map appAcls;
-
- private LogWriter writer = null;
+ private final LogContext logContext;
+ private final Context context;
+ private String remoteLogFileName;
+ private Set uploadedFiles = new HashSet();
+ private long previousUploadTime = 0;
+ private long currentUploadTime = 0;
public AppLogAggregatorImpl(Dispatcher dispatcher,
- DeletionService deletionService, Configuration conf, ApplicationId appId,
- UserGroupInformation userUgi, LocalDirsHandlerService dirsHandler,
- Path remoteNodeLogFileForApp,
+ DeletionService deletionService, Configuration conf, NodeId nodeId,
+ ApplicationId appId, UserGroupInformation userUgi,
+ LocalDirsHandlerService dirsHandler, Path remoteNodeLogDirForApp,
ContainerLogsRetentionPolicy retentionPolicy,
- Map appAcls) {
+ Map appAcls, LogContext logContext,
+ Context context) {
this.dispatcher = dispatcher;
this.conf = conf;
this.delService = deletionService;
+ this.nodeId = nodeId;
this.appId = appId;
this.applicationId = ConverterUtils.toString(appId);
this.userUgi = userUgi;
this.dirsHandler = dirsHandler;
- this.remoteNodeLogFileForApp = remoteNodeLogFileForApp;
- this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp();
+ this.remoteNodeLogDirForApp = remoteNodeLogDirForApp;
this.retentionPolicy = retentionPolicy;
this.pendingContainers = new LinkedBlockingQueue();
this.appAcls = appAcls;
+ this.logContext = logContext;
+ this.context = context;
}
- private void uploadLogsForContainer(ContainerId containerId) {
+ private void uploadLogsForContainer(ContainerId containerId, LogWriter writer) {
if (this.logAggregationDisabled) {
return;
}
- // Lazy creation of the writer
- if (this.writer == null) {
- LOG.info("Starting aggregate log-file for app " + this.applicationId
- + " at " + this.remoteNodeTmpLogFileForApp);
- try {
- this.writer =
- new LogWriter(this.conf, this.remoteNodeTmpLogFileForApp,
- this.userUgi);
- //Write ACLs once when and if the writer is created.
- this.writer.writeApplicationACLs(appAcls);
- this.writer.writeApplicationOwner(this.userUgi.getShortUserName());
- } catch (IOException e) {
- LOG.error("Cannot create writer for app " + this.applicationId
- + ". Disabling log-aggregation for this app.", e);
- this.logAggregationDisabled = true;
- return;
- }
- }
-
LOG.info("Uploading logs for container " + containerId
+ ". Current good log dirs are "
+ StringUtils.join(",", dirsHandler.getLogDirs()));
LogKey logKey = new LogKey(containerId);
LogValue logValue =
new LogValue(dirsHandler.getLogDirs(), containerId,
- userUgi.getShortUserName());
+ userUgi.getShortUserName(), this.logContext, this.previousUploadTime);
try {
- this.writer.append(logKey, logValue);
+ writer.append(logKey, logValue);
} catch (IOException e) {
LOG.error("Couldn't upload logs for " + containerId
+ ". Skipping this container.");
}
+ uploadedFiles.addAll(logValue.getUpLoadedFiles());
+ }
+
+ private void uploadLogsForContainers(Set containers) {
+ if (this.logAggregationDisabled || containers.isEmpty()) {
+ return;
+ }
+ this.remoteLogFileName =
+ nodeId.toString().replace(":", "_") + "_" + Time.now();
+ this.remoteNodeTmpLogFileForApp =
+ getRemoteNodeTmpLogFileForApp(this.remoteNodeLogDirForApp);
+ LogWriter writer = null;
+ try {
+ try {
+ writer =
+ new LogWriter(this.conf, this.remoteNodeTmpLogFileForApp,
+ this.userUgi);
+ // Write ACLs once when the writer is created.
+ writer.writeApplicationACLs(appAcls);
+ writer.writeApplicationOwner(this.userUgi.getShortUserName());
+
+ } catch (IOException e1) {
+ LOG.error("Cannot create writer for app " + this.applicationId
+ + ". Disabling log-aggregation for this app.");
+ this.logAggregationDisabled = true;
+ return;
+ }
+
+ uploadedFiles.clear();
+ this.currentUploadTime = Time.now();
+ for (ContainerId container : containers) {
+ uploadLogsForContainer(container, writer);
+ }
+ this.previousUploadTime = this.currentUploadTime;
+
+ this.delService.delete(this.userUgi.getShortUserName(), null,
+ uploadedFiles.toArray(new Path[uploadedFiles.size()]));
+
+ if (writer != null) {
+ writer.close();
+ }
+
+ try {
+ userUgi.doAs(new PrivilegedExceptionAction