diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
index 723a2e0..05d7648 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
@@ -54,6 +54,7 @@
* validityInterval into failure count. If failure count reaches to
* maxAppAttempts, the application will be failed.
*
+ *
{@link LogContext} of the application.
*
*
*
@@ -128,6 +129,21 @@ public static ApplicationSubmissionContext newInstance(
return context;
}
+ @Public
+ @Unstable
+ public static ApplicationSubmissionContext newInstance(
+ ApplicationId applicationId, String applicationName, String queue,
+ Priority priority, ContainerLaunchContext amContainer,
+ boolean isUnmanagedAM, boolean cancelTokensWhenComplete,
+ int maxAppAttempts, Resource resource, String applicationType,
+ boolean keepContainers, LogContext logContext) {
+ ApplicationSubmissionContext context =
+ newInstance(applicationId, applicationName, queue, priority,
+ amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts,
+ resource, applicationType, keepContainers);
+ context.setLogContext(logContext);
+ return context;
+ }
/**
* Get the ApplicationId of the submitted application.
* @return ApplicationId of the submitted application
@@ -381,4 +397,21 @@ public abstract void setKeepContainersAcrossApplicationAttempts(
@Stable
public abstract void setAttemptFailuresValidityInterval(
long attemptFailuresValidityInterval);
-}
\ No newline at end of file
+
+
+ /**
+ * Get LogContext of the application
+ * @return LogContext of the application
+ */
+ @Public
+ @Stable
+ public abstract LogContext getLogContext();
+
+ /**
+ * Set LogContext for the application
+ * @param logContext for the application
+ */
+ @Public
+ @Stable
+ public abstract void setLogContext(LogContext logContext);
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java
index a648fef..cd20939 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java
@@ -22,8 +22,10 @@
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
@@ -46,6 +48,7 @@
*
Optional, application-specific binary service data.
*
Environment variables for the launched process.
*
Command to launch the container.
+ *
{@link LogContext} will be set by RM when it launches AM.
*
*
*
@@ -73,6 +76,18 @@ public static ContainerLaunchContext newInstance(
return container;
}
+ @Private
+ @Unstable
+ public static ContainerLaunchContext newInstance(
+ Map localResources,
+ Map environment, List commands,
+ Map serviceData, ByteBuffer tokens,
+ Map acls, LogContext logContext) {
+ ContainerLaunchContext container = newInstance(localResources, environment,
+ commands, serviceData, tokens, acls);
+ container.setLogContext(logContext);
+ return container;
+ }
/**
* Get all the tokens needed by this container. It may include file-system
* tokens, ApplicationMaster related tokens if this container is an
@@ -196,4 +211,12 @@ public static ContainerLaunchContext newInstance(
@Public
@Stable
public abstract void setApplicationACLs(Map acls);
+
+ @Private
+ @Stable
+ public abstract LogContext getLogContext();
+
+ @Private
+ @Stable
+ public abstract void setLogContext(LogContext logContext);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogContext.java
new file mode 100644
index 0000000..c730573
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogContext.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.util.Records;
+
+import com.google.common.base.Preconditions;
+
+/**
+ *
LogContext represents all of the
+ * information needed by the NodeManager to handle
+ * the logs for an application.
+ *
+ *
It includes details such as:
+ *
+ *
logIncludePattern. It defines include patterns which is used to
+ * filter the log files. The log files which match the include
+ * patterns will be uploaded.
+ *
logExcludePattern. It defines include patterns which is used to
+ * filter the log files. The log files which match the include
+ * patterns will not be uploaded.
+ *
logInterval. The default value is 0. By default,
+ * the logAggregationService only uploads container logs when
+ * the application is finished. This configure defines
+ * how often the logAggregationSerivce uploads container logs in days
+ * By setting this configure, the logAggregationSerivce can upload container
+ * logs periodically when the application is running.
+ *
+ *
+ *
+ *
+ * @see ContainerLaunchContext
+ * @see ApplicationSubmissionContext
+ */
+public abstract class LogContext {
+
+ @Public
+ @Unstable
+ public static LogContext newInstance(Set logIncludePattern,
+ Set logExcludePattern, long logInterval) {
+ Preconditions.checkArgument(logInterval >= 0);
+ LogContext context = Records.newRecord(LogContext.class);
+ context.setLogIncludePatterns(logIncludePattern);
+ context.setLogExcludePatterns(logExcludePattern);
+ context.setLogInterval(logInterval);
+ return context;
+ }
+
+ /**
+ * Get include patterns
+ * @return set of include patterns
+ */
+ @Public
+ @Stable
+ public abstract Set getLogIncludePatterns();
+
+ /**
+ * Set include patterns
+ * @param includePatterns to set
+ */
+ @Public
+ @Stable
+ public abstract void setLogIncludePatterns(Set includePatterns);
+
+ /**
+ * Get exclude patterns
+ * @return set of exclude patterns
+ */
+ @Public
+ @Stable
+ public abstract Set getLogExcludePatterns();
+
+ /**
+ * Set exclude patterns
+ * @param excludePatterns to set
+ */
+ @Public
+ @Stable
+ public abstract void setLogExcludePatterns(Set excludePatterns);
+
+ /**
+ * Get LogInterval per day
+ * @return the logInterval
+ */
+ @Public
+ @Stable
+ public abstract long getLogInterval();
+
+ /**
+ * Set LogInterval per day
+ * @param logInterval
+ */
+ @Public
+ @Stable
+ public abstract void setLogInterval(long logInterval);
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index d8c42cc..846cdc5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -292,6 +292,13 @@ message ApplicationSubmissionContextProto {
optional bool keep_containers_across_application_attempts = 11 [default = false];
repeated string applicationTags = 12;
optional int64 attempt_failures_validity_interval = 13 [default = -1];
+ optional LogContextProto log_context = 14;
+}
+
+message LogContextProto {
+ repeated string log_include_pattern = 1;
+ repeated string log_exclude_pattern = 2;
+ optional int64 log_monitor_interval = 3 [default = 0];
}
enum ApplicationAccessTypeProto {
@@ -344,6 +351,7 @@ message ContainerLaunchContextProto {
repeated StringStringMapProto environment = 4;
repeated string command = 5;
repeated ApplicationACLMapProto application_ACLs = 6;
+ optional LogContextProto log_context = 7;
}
message ContainerStatusProto {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java
index eb6169c..c31316a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java
@@ -31,7 +31,6 @@
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -39,8 +38,6 @@
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
-import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -113,17 +110,18 @@ public int run(String[] args) throws Exception {
System.err.println("Invalid ApplicationId specified");
return -1;
}
-
+
try {
int resultCode = verifyApplicationState(appId);
if (resultCode != 0) {
- System.out.println("Application has not completed." +
- " Logs are only available after an application completes");
+ System.out.println("Application is not running or completed."
+ + " Logs are only available when an application is running"
+ + " or completes");
return resultCode;
}
} catch (Exception e) {
- System.err.println("Unable to get ApplicationState." +
- " Attempting to fetch logs directly from the filesystem.");
+ System.err.println("Unable to get ApplicationState."
+ + " Attempting to fetch logs directly from the filesystem.");
}
LogCLIHelpers logCliHelper = new LogCLIHelpers();
@@ -141,18 +139,9 @@ public int run(String[] args) throws Exception {
printHelpMessage(printOpts);
resultCode = -1;
} else {
- Path remoteRootLogDir =
- new Path(getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
- YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
- AggregatedLogFormat.LogReader reader =
- new AggregatedLogFormat.LogReader(getConf(),
- LogAggregationUtils.getRemoteNodeLogFileForApp(
- remoteRootLogDir,
- appId,
- appOwner,
- ConverterUtils.toNodeId(nodeAddress),
- LogAggregationUtils.getRemoteNodeLogDirSuffix(getConf())));
- resultCode = logCliHelper.dumpAContainerLogs(containerIdStr, reader, System.out);
+ resultCode =
+ logCliHelper.dumpAContainersLogs(appIdStr, containerIdStr,
+ nodeAddress, appOwner);
}
return resultCode;
@@ -167,10 +156,10 @@ private int verifyApplicationState(ApplicationId appId) throws IOException,
switch (appReport.getYarnApplicationState()) {
case NEW:
case NEW_SAVING:
+ return -1;
case ACCEPTED:
case SUBMITTED:
case RUNNING:
- return -1;
case FAILED:
case FINISHED:
case KILLED:
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
index 7b49a16..83d174d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
@@ -19,11 +19,13 @@
package org.apache.hadoop.yarn.api.records.impl.pb;
import com.google.common.base.CharMatcher;
+
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LogContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -31,6 +33,7 @@
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.LogContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
@@ -53,6 +56,7 @@
private ContainerLaunchContext amContainer = null;
private Resource resource = null;
private Set applicationTags = null;
+ private LogContext logContext = null;
public ApplicationSubmissionContextPBImpl() {
builder = ApplicationSubmissionContextProto.newBuilder();
@@ -110,6 +114,9 @@ private void mergeLocalToBuilder() {
builder.clearApplicationTags();
builder.addAllApplicationTags(this.applicationTags);
}
+ if (this.logContext != null) {
+ builder.setLogContext(convertToProtoFormat(this.logContext));
+ }
}
private void mergeLocalToProto() {
@@ -413,6 +420,35 @@ public long getAttemptFailuresValidityInterval() {
public void setAttemptFailuresValidityInterval(
long attemptFailuresValidityInterval) {
maybeInitBuilder();
- builder.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
+ }
+
+
+ private LogContextPBImpl convertFromProtoFormat(LogContextProto p) {
+ return new LogContextPBImpl(p);
+ }
+
+ private LogContextProto convertToProtoFormat(LogContext t) {
+ return ((LogContextPBImpl)t).getProto();
+ }
+
+ @Override
+ public LogContext getLogContext() {
+ ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.logContext != null) {
+ return this.logContext;
+ } // Else via proto
+ if (!p.hasLogContext()) {
+ return null;
+ }
+ logContext = convertFromProtoFormat(p.getLogContext());
+ return logContext;
+ }
+
+ @Override
+ public void setLogContext(LogContext logContext) {
+ maybeInitBuilder();
+ if (logContext == null)
+ builder.clearLogContext();
+ this.logContext = logContext;
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java
index 12dcfcd..0373ef2 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java
@@ -30,10 +30,12 @@
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LogContext;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.LogContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.StringBytesMapProto;
import org.apache.hadoop.yarn.proto.YarnProtos.StringLocalResourceMapProto;
import org.apache.hadoop.yarn.proto.YarnProtos.StringStringMapProto;
@@ -56,6 +58,7 @@
private Map environment = null;
private List commands = null;
private Map applicationACLS = null;
+ private LogContext logContext = null;
public ContainerLaunchContextPBImpl() {
builder = ContainerLaunchContextProto.newBuilder();
@@ -120,6 +123,9 @@ private void mergeLocalToBuilder() {
if (this.applicationACLS != null) {
addApplicationACLs();
}
+ if (this.logContext != null) {
+ builder.setLogContext(convertToProtoFormat(this.logContext));
+ }
}
private void mergeLocalToProto() {
@@ -463,4 +469,34 @@ private LocalResourcePBImpl convertFromProtoFormat(LocalResourceProto p) {
private LocalResourceProto convertToProtoFormat(LocalResource t) {
return ((LocalResourcePBImpl)t).getProto();
}
+
+
+ private LogContextPBImpl convertFromProtoFormat(LogContextProto p) {
+ return new LogContextPBImpl(p);
+ }
+
+ private LogContextProto convertToProtoFormat(LogContext t) {
+ return ((LogContextPBImpl)t).getProto();
+ }
+
+ @Override
+ public LogContext getLogContext() {
+ ContainerLaunchContextProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.logContext != null) {
+ return this.logContext;
+ } // Else via proto
+ if (!p.hasLogContext()) {
+ return null;
+ }
+ logContext = convertFromProtoFormat(p.getLogContext());
+ return logContext;
+ }
+
+ @Override
+ public void setLogContext(LogContext logContext) {
+ maybeInitBuilder();
+ if (logContext == null)
+ builder.clearLogContext();
+ this.logContext = logContext;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogContextPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogContextPBImpl.java
new file mode 100644
index 0000000..73ccf71
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogContextPBImpl.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records.impl.pb;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.LogContext;
+import org.apache.hadoop.yarn.proto.YarnProtos.LogContextProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.LogContextProtoOrBuilder;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.TextFormat;
+
+@Private
+@Unstable
+public class LogContextPBImpl extends LogContext{
+ LogContextProto proto =
+ LogContextProto.getDefaultInstance();
+ LogContextProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private Set logIncludePatterns = null;
+ private Set logExcludePatterns = null;
+
+ public LogContextPBImpl() {
+ builder = LogContextProto.newBuilder();
+ }
+
+ public LogContextPBImpl(LogContextProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public LogContextProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+ private void mergeLocalToBuilder() {
+ if (this.logIncludePatterns != null && !this.logIncludePatterns.isEmpty()) {
+ builder.clearLogIncludePattern();
+ builder.addAllLogIncludePattern(this.logIncludePatterns);
+ }
+
+ if (this.logExcludePatterns != null && !this.logExcludePatterns.isEmpty()) {
+ builder.clearLogExcludePattern();
+ builder.addAllLogExcludePattern(this.logExcludePatterns);
+ }
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = LogContextProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public Set getLogIncludePatterns() {
+ initLogIncludePatterns();
+ return this.logIncludePatterns;
+ }
+
+ @Override
+ public void setLogIncludePatterns(Set logPattern) {
+ maybeInitBuilder();
+ if (logPattern == null || logPattern.isEmpty()) {
+ builder.clearLogIncludePattern();
+ this.logIncludePatterns = null;
+ return;
+ }
+
+ this.logIncludePatterns = new HashSet();
+ this.logIncludePatterns.addAll(logPattern);
+ }
+
+ private void initLogIncludePatterns() {
+ if (this.logIncludePatterns != null) {
+ return;
+ }
+ LogContextProtoOrBuilder p = viaProto ? proto : builder;
+ this.logIncludePatterns = new HashSet();
+ this.logIncludePatterns.addAll(p.getLogIncludePatternList());
+ }
+
+ @Override
+ public Set getLogExcludePatterns() {
+ initLogExcludePatterns();
+ return this.logExcludePatterns;
+ }
+
+ @Override
+ public void setLogExcludePatterns(Set logPattern) {
+ maybeInitBuilder();
+ if (logPattern == null || logPattern.isEmpty()) {
+ builder.clearLogExcludePattern();
+ this.logExcludePatterns = null;
+ return;
+ }
+
+ this.logExcludePatterns = new HashSet();
+ this.logExcludePatterns.addAll(logPattern);
+ }
+
+ private void initLogExcludePatterns() {
+ if (this.logExcludePatterns != null) {
+ return;
+ }
+ LogContextProtoOrBuilder p = viaProto ? proto : builder;
+ this.logExcludePatterns = new HashSet();
+ this.logExcludePatterns.addAll(p.getLogExcludePatternList());
+ }
+
+ @Override
+ public long getLogInterval() {
+ LogContextProtoOrBuilder p = viaProto ? proto : builder;
+ return p.getLogMonitorInterval();
+ }
+
+ @Override
+ public void setLogInterval(long logInterval) {
+ Preconditions.checkArgument(logInterval >= 0);
+ maybeInitBuilder();
+ builder.setLogMonitorInterval(logInterval);
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
index 3568de2..729a275 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
@@ -24,6 +24,7 @@
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
+import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
@@ -35,10 +36,12 @@
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-
+import java.util.Set;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.apache.commons.io.input.BoundedInputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -60,10 +63,15 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LogContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
@Public
@Evolving
public class AggregatedLogFormat {
@@ -149,6 +157,10 @@ public String toString() {
private final List rootLogDirs;
private final ContainerId containerId;
private final String user;
+ private final LogContext logContext;
+ private Set uploadedFiles = new HashSet();
+ private final Set alreadyUploadedLogFiles;
+ private Set allExistingFileMeta = new HashSet();
// TODO Maybe add a version string here. Instead of changing the version of
// the entire k-v format
@@ -160,6 +172,21 @@ public LogValue(List rootLogDirs, ContainerId containerId,
// Ensure logs are processed in lexical order
Collections.sort(this.rootLogDirs);
+ this.logContext = null;
+ this.alreadyUploadedLogFiles = new HashSet();
+ }
+
+ public LogValue(List rootLogDirs, ContainerId containerId,
+ String user, LogContext logContext,
+ Set alreadyUploadedLogFiles) {
+ this.rootLogDirs = new ArrayList(rootLogDirs);
+ this.containerId = containerId;
+ this.user = user;
+
+ // Ensure logs are processed in lexical order
+ Collections.sort(this.rootLogDirs);
+ this.logContext = logContext;
+ this.alreadyUploadedLogFiles = alreadyUploadedLogFiles;
}
public void write(DataOutputStream out) throws IOException {
@@ -178,10 +205,13 @@ public void write(DataOutputStream out) throws IOException {
}
// Write out log files in lexical order
- File[] logFiles = containerLogDir.listFiles();
- Arrays.sort(logFiles);
- for (File logFile : logFiles) {
+ this.uploadedFiles = getLogFiles(containerLogDir);
+
+ List fileList = new ArrayList(uploadedFiles);
+ Collections.sort(fileList);
+
+ for (File logFile : fileList) {
final long fileLength = logFile.length();
// Write the logFile Type
@@ -232,6 +262,73 @@ public void write(DataOutputStream out) throws IOException {
public String getUser() {
return user;
}
+
+ private Set getLogFiles(File containerLogDir) {
+ Set candiates =
+ new HashSet(Arrays.asList(containerLogDir.listFiles()));
+ for (File logFile : candiates) {
+ this.allExistingFileMeta.add(fileMeta(logFile));
+ }
+
+ if (this.logContext != null && candiates.size() > 0) {
+ if (this.logContext.getLogIncludePatterns() != null
+ && !this.logContext.getLogIncludePatterns().isEmpty()) {
+ FileFilter includeLogFileFilter =
+ new WildcardFileFilter(new ArrayList(
+ this.logContext.getLogIncludePatterns()));
+ candiates =
+ new HashSet(Arrays.asList(containerLogDir
+ .listFiles(includeLogFileFilter)));
+ }
+
+ FileFilter excludeLogFileFilter = null;
+ if (this.logContext.getLogExcludePatterns() != null
+ && !this.logContext.getLogExcludePatterns().isEmpty()) {
+ excludeLogFileFilter =
+ new WildcardFileFilter(new ArrayList(
+ this.logContext.getLogExcludePatterns()));
+ }
+ final Set excludeFiles =
+ excludeLogFileFilter == null ? new HashSet()
+ : new HashSet(Arrays.asList(containerLogDir
+ .listFiles(excludeLogFileFilter)));
+ Iterable mask =
+ Iterables.filter(candiates, new Predicate() {
+ @Override
+ public boolean apply(File next) {
+ return !alreadyUploadedLogFiles.contains(fileMeta(next))
+ && !excludeFiles.contains(next);
+ }
+ });
+ candiates = Sets.newHashSet(mask);
+ }
+ return candiates;
+ }
+
+ public Set getCurrentUpLoadedFilesPath() {
+ Set path = new HashSet();
+ for (File file : this.uploadedFiles) {
+ path.add(new Path(file.getAbsolutePath()));
+ }
+ return path;
+ }
+
+ public Set getCurrentUpLoadedFileMeta() {
+ Set info = new HashSet();
+ for (File file : this.uploadedFiles) {
+ info.add(fileMeta(file));
+ }
+ return info;
+ }
+
+ public Set getAllExistingFilesMeta() {
+ return this.allExistingFileMeta;
+ }
+
+ private String fileMeta(File file) {
+ return containerId.toString() + "_" + file.getName() + "_"
+ + file.lastModified();
+ }
}
/**
@@ -252,6 +349,9 @@ public LogWriter(final Configuration conf, final Path remoteAppLogFile,
public FSDataOutputStream run() throws Exception {
FileContext fc = FileContext.getFileContext(conf);
fc.setUMask(APP_LOG_FILE_UMASK);
+ if (fc.util().exists(remoteAppLogFile)) {
+ fc.delete(remoteAppLogFile, false);
+ }
return fc.create(
remoteAppLogFile,
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
index 4445ff9..96cbdf6 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
@@ -28,6 +28,7 @@
@Private
public class LogAggregationUtils {
+ public static final String TMP_FILE = "TEMP.tmp";
/**
* Constructs the full filename for an application's log file per node.
* @param remoteRootLogDir
@@ -44,6 +45,21 @@ public static Path getRemoteNodeLogFileForApp(Path remoteRootLogDir,
}
/**
+ * Get the remote node dir under remote app log dir.
+ * @param remoteRootLogDir
+ * @param appId
+ * @param user
+ * @param nodeId
+ * @param suffix
+ * @return the remote log file.
+ */
+ public static Path getRemoteNodeLogDirForApp(Path remoteRootLogDir,
+ ApplicationId appId, String user, NodeId nodeId, String suffix) {
+ return new Path(getRemoteAppLogDir(remoteRootLogDir, appId, user, suffix),
+ getNodeString(nodeId));
+ }
+
+ /**
* Gets the remote app log dir.
* @param remoteRootLogDir
* @param appId
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
index 3bafdb3..1fbf5dc 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
@@ -52,19 +52,42 @@ public int dumpAContainersLogs(String appId, String containerId,
YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(getConf());
- Path logPath = LogAggregationUtils.getRemoteNodeLogFileForApp(
+ Path remoteAppNodeLogDir = LogAggregationUtils.getRemoteNodeLogFileForApp(
remoteRootLogDir, ConverterUtils.toApplicationId(appId), jobOwner,
ConverterUtils.toNodeId(nodeId), suffix);
- AggregatedLogFormat.LogReader reader;
+ RemoteIterator nodeFiles;
try {
- reader = new AggregatedLogFormat.LogReader(getConf(), logPath);
- } catch (FileNotFoundException fnfe) {
- System.out.println("Logs not available at " + logPath.toString());
+ Path qualifiedLogDir =
+ FileContext.getFileContext(getConf()).makeQualified(
+ remoteAppNodeLogDir);
+ nodeFiles =
+ FileContext.getFileContext(qualifiedLogDir.toUri(), getConf())
+ .listStatus(remoteAppNodeLogDir);
+ } catch (FileNotFoundException fnf) {
+ System.out.println("Logs not available at "
+ + remoteAppNodeLogDir.toString());
System.out
.println("Log aggregation has not completed or is not enabled.");
return -1;
}
- return dumpAContainerLogs(containerId, reader, System.out);
+ boolean foundContainerLogs = false;
+ while (nodeFiles.hasNext()) {
+ FileStatus thisNodeFile = nodeFiles.next();
+ if (!thisNodeFile.getPath().getName()
+ .equals(LogAggregationUtils.TMP_FILE)) {
+ AggregatedLogFormat.LogReader reader =
+ new AggregatedLogFormat.LogReader(getConf(), new Path(
+ remoteAppNodeLogDir, thisNodeFile.getPath().getName()));
+ if (dumpAContainerLogs(containerId, reader, System.out) > -1) {
+ foundContainerLogs = true;
+ }
+ }
+ }
+ if (!foundContainerLogs) {
+ System.out.println("Logs for container " + containerId
+ + " are not present in this log-file.");
+ }
+ return 0;
}
@Private
@@ -81,8 +104,6 @@ public int dumpAContainerLogs(String containerIdStr,
}
if (valueStream == null) {
- System.out.println("Logs for container " + containerIdStr
- + " are not present in this log-file.");
return -1;
}
@@ -99,55 +120,77 @@ public int dumpAContainerLogs(String containerIdStr,
@Private
public int dumpAllContainersLogs(ApplicationId appId, String appOwner,
PrintStream out) throws IOException {
- Path remoteRootLogDir = new Path(getConf().get(
- YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
- YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
+ Path remoteRootLogDir =
+ new Path(getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+ YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
String user = appOwner;
- String logDirSuffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(getConf());
+ String logDirSuffix =
+ LogAggregationUtils.getRemoteNodeLogDirSuffix(getConf());
// TODO Change this to get a list of files from the LAS.
- Path remoteAppLogDir = LogAggregationUtils.getRemoteAppLogDir(
- remoteRootLogDir, appId, user, logDirSuffix);
+ Path remoteAppLogDir =
+ LogAggregationUtils.getRemoteAppLogDir(remoteRootLogDir, appId, user,
+ logDirSuffix);
RemoteIterator nodeFiles;
try {
Path qualifiedLogDir =
FileContext.getFileContext(getConf()).makeQualified(remoteAppLogDir);
- nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(),
- getConf()).listStatus(remoteAppLogDir);
+ nodeFiles =
+ FileContext.getFileContext(qualifiedLogDir.toUri(), getConf())
+ .listStatus(remoteAppLogDir);
} catch (FileNotFoundException fnf) {
System.out.println("Logs not available at " + remoteAppLogDir.toString());
System.out
- .println("Log aggregation has not completed or is not enabled.");
+ .println("Log aggregation has not completed or is not enabled.");
return -1;
}
while (nodeFiles.hasNext()) {
- FileStatus thisNodeFile = nodeFiles.next();
- AggregatedLogFormat.LogReader reader = new AggregatedLogFormat.LogReader(
- getConf(), new Path(remoteAppLogDir, thisNodeFile.getPath().getName()));
- try {
-
- DataInputStream valueStream;
- LogKey key = new LogKey();
- valueStream = reader.next(key);
-
- while (valueStream != null) {
- String containerString = "\n\nContainer: " + key + " on "
- + thisNodeFile.getPath().getName();
- out.println(containerString);
- out.println(StringUtils.repeat("=", containerString.length()));
- while (true) {
+ FileStatus thisNodeDir = nodeFiles.next();
+ if (thisNodeDir.isDirectory()) {
+ RemoteIterator logFiles;
+ try {
+ Path qualifiedLogDir =
+ FileContext.getFileContext(getConf()).makeQualified(
+ thisNodeDir.getPath());
+ logFiles =
+ FileContext.getFileContext(qualifiedLogDir.toUri(), getConf())
+ .listStatus(thisNodeDir.getPath());
+ } catch (FileNotFoundException fnf) {
+ continue;
+ }
+ while (logFiles.hasNext()) {
+ FileStatus logFile = logFiles.next();
+ if (!logFile.getPath().getName().equals(LogAggregationUtils.TMP_FILE)) {
+ AggregatedLogFormat.LogReader reader =
+ new AggregatedLogFormat.LogReader(getConf(),logFile.getPath());
try {
- LogReader.readAContainerLogsForALogType(valueStream, out);
- } catch (EOFException eof) {
- break;
+
+ DataInputStream valueStream;
+ LogKey key = new LogKey();
+ valueStream = reader.next(key);
+
+ while (valueStream != null) {
+ String containerString =
+ "\n\nContainer: " + key + " on "
+ + logFile.getPath().getName();
+ out.println(containerString);
+ out.println(StringUtils.repeat("=", containerString.length()));
+ while (true) {
+ try {
+ LogReader.readAContainerLogsForALogType(valueStream, out);
+ } catch (EOFException eof) {
+ break;
+ }
+ }
+
+ // Next container
+ key = new LogKey();
+ valueStream = reader.next(key);
+ }
+ } finally {
+ reader.close();
}
}
-
- // Next container
- key = new LogKey();
- valueStream = reader.next(key);
}
- } finally {
- reader.close();
}
}
return 0;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
index c6572e9..caacef8 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
@@ -178,6 +178,7 @@ public static void setup() throws Exception {
"http", "localhost", 8080, "file0"));
typeValueCache.put(SerializedException.class,
SerializedException.newInstance(new IOException("exception for test")));
+ generateByNewInstance(LogContext.class);
generateByNewInstance(ApplicationId.class);
generateByNewInstance(ApplicationAttemptId.class);
generateByNewInstance(ContainerId.class);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 12166e0..eb8b8c6 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -72,9 +72,11 @@
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.LogContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.LogContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
@@ -275,11 +277,16 @@ private void recoverApplication(ContainerManagerApplicationProto p)
aclProto.getAcl());
}
+ LogContext logContext = null;
+ if (p.getLogContext() != null) {
+ logContext = new LogContextPBImpl(p.getLogContext());
+ }
+
LOG.info("Recovering application " + appId);
ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId,
creds, context);
context.getApplications().put(appId, app);
- app.handle(new ApplicationInitEvent(appId, acls));
+ app.handle(new ApplicationInitEvent(appId, acls, logContext));
}
@SuppressWarnings("unchecked")
@@ -719,13 +726,15 @@ protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier,
private ContainerManagerApplicationProto buildAppProto(ApplicationId appId,
String user, Credentials credentials,
- Map appAcls) {
+ Map appAcls, LogContext logContext) {
ContainerManagerApplicationProto.Builder builder =
ContainerManagerApplicationProto.newBuilder();
builder.setId(((ApplicationIdPBImpl) appId).getProto());
builder.setUser(user);
-
+ if (logContext != null) {
+ builder.setLogContext(((LogContextPBImpl) logContext).getProto());
+ }
builder.clearCredentials();
if (credentials != null) {
DataOutputBuffer dob = new DataOutputBuffer();
@@ -828,10 +837,12 @@ private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
LOG.info("Creating a new application reference for app " + applicationID);
Map appAcls =
container.getLaunchContext().getApplicationACLs();
+ LogContext logContext = container.getLaunchContext().getLogContext();
context.getNMStateStore().storeApplication(applicationID,
- buildAppProto(applicationID, user, credentials, appAcls));
+ buildAppProto(applicationID, user, credentials, appAcls,
+ logContext));
dispatcher.getEventHandler().handle(
- new ApplicationInitEvent(applicationID, appAcls));
+ new ApplicationInitEvent(applicationID, appAcls, logContext));
}
this.context.getNMStateStore().storeContainer(containerId, request);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
index cc5544c..5eeb37b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
@@ -237,7 +237,7 @@ public void transition(ApplicationImpl app, ApplicationEvent event) {
app.dispatcher.getEventHandler().handle(
new LogHandlerAppStartedEvent(app.appId, app.user,
app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS,
- app.applicationACLs));
+ app.applicationACLs, initEvent.getLogContext()));
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java
index 5746b6a..fdd828f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java
@@ -22,18 +22,32 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.LogContext;
public class ApplicationInitEvent extends ApplicationEvent {
private final Map applicationACLs;
+ private final LogContext logContext;
public ApplicationInitEvent(ApplicationId appId,
Map acls) {
super(appId, ApplicationEventType.INIT_APPLICATION);
this.applicationACLs = acls;
+ this.logContext = null;
+ }
+
+ public ApplicationInitEvent(ApplicationId appId,
+ Map acls, LogContext logContext) {
+ super(appId, ApplicationEventType.INIT_APPLICATION);
+ this.applicationACLs = acls;
+ this.logContext = logContext;
}
public Map getApplicationACLs() {
return this.applicationACLs;
}
+
+ public LogContext getLogContext() {
+ return this.logContext;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index 1af48bb..8d7c649 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -20,14 +20,17 @@
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -36,35 +39,44 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LogContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
public class AppLogAggregatorImpl implements AppLogAggregator {
private static final Log LOG = LogFactory
.getLog(AppLogAggregatorImpl.class);
private static final int THREAD_SLEEP_TIME = 1000;
- private static final String TMP_FILE_SUFFIX = ".tmp";
private final LocalDirsHandlerService dirsHandler;
private final Dispatcher dispatcher;
private final ApplicationId appId;
+ private final NodeId nodeId;
private final String applicationId;
private boolean logAggregationDisabled = false;
private final Configuration conf;
private final DeletionService delService;
private final UserGroupInformation userUgi;
- private final Path remoteNodeLogFileForApp;
- private final Path remoteNodeTmpLogFileForApp;
+ private final Path remoteNodeLogDirForApp;
+ private Path remoteNodeTmpLogFileForApp;
private final ContainerLogsRetentionPolicy retentionPolicy;
private final BlockingQueue pendingContainers;
@@ -72,53 +84,37 @@
private final AtomicBoolean appAggregationFinished = new AtomicBoolean();
private final AtomicBoolean aborted = new AtomicBoolean();
private final Map appAcls;
-
- private LogWriter writer = null;
+ private final LogContext logContext;
+ private final Context context;
+ private String remoteLogFileName;
+ private Set currentUploadedFiles = new HashSet();
+ private Set alreadyUploadedLogs = new HashSet();
+ private Set currentExistingLogFiles = new HashSet();
public AppLogAggregatorImpl(Dispatcher dispatcher,
- DeletionService deletionService, Configuration conf, ApplicationId appId,
- UserGroupInformation userUgi, LocalDirsHandlerService dirsHandler,
- Path remoteNodeLogFileForApp,
+ DeletionService deletionService, Configuration conf, NodeId nodeId,
+ ApplicationId appId, UserGroupInformation userUgi,
+ LocalDirsHandlerService dirsHandler, Path remoteNodeLogDirForApp,
ContainerLogsRetentionPolicy retentionPolicy,
- Map appAcls) {
+ Map appAcls, LogContext logContext,
+ Context context) {
this.dispatcher = dispatcher;
this.conf = conf;
this.delService = deletionService;
+ this.nodeId = nodeId;
this.appId = appId;
this.applicationId = ConverterUtils.toString(appId);
this.userUgi = userUgi;
this.dirsHandler = dirsHandler;
- this.remoteNodeLogFileForApp = remoteNodeLogFileForApp;
- this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp();
+ this.remoteNodeLogDirForApp = remoteNodeLogDirForApp;
this.retentionPolicy = retentionPolicy;
this.pendingContainers = new LinkedBlockingQueue();
this.appAcls = appAcls;
+ this.logContext = logContext;
+ this.context = context;
}
- private void uploadLogsForContainer(ContainerId containerId) {
-
- if (this.logAggregationDisabled) {
- return;
- }
-
- // Lazy creation of the writer
- if (this.writer == null) {
- LOG.info("Starting aggregate log-file for app " + this.applicationId
- + " at " + this.remoteNodeTmpLogFileForApp);
- try {
- this.writer =
- new LogWriter(this.conf, this.remoteNodeTmpLogFileForApp,
- this.userUgi);
- //Write ACLs once when and if the writer is created.
- this.writer.writeApplicationACLs(appAcls);
- this.writer.writeApplicationOwner(this.userUgi.getShortUserName());
- } catch (IOException e) {
- LOG.error("Cannot create writer for app " + this.applicationId
- + ". Disabling log-aggregation for this app.", e);
- this.logAggregationDisabled = true;
- return;
- }
- }
+ private void uploadLogsForContainer(ContainerId containerId, LogWriter writer) {
LOG.info("Uploading logs for container " + containerId
+ ". Current good log dirs are "
@@ -126,13 +122,89 @@ private void uploadLogsForContainer(ContainerId containerId) {
LogKey logKey = new LogKey(containerId);
LogValue logValue =
new LogValue(dirsHandler.getLogDirs(), containerId,
- userUgi.getShortUserName());
+ userUgi.getShortUserName(), this.logContext, this.alreadyUploadedLogs);
try {
- this.writer.append(logKey, logValue);
+ writer.append(logKey, logValue);
} catch (IOException e) {
LOG.error("Couldn't upload logs for " + containerId
+ ". Skipping this container.");
}
+ currentUploadedFiles.addAll(logValue.getCurrentUpLoadedFilesPath());
+ alreadyUploadedLogs.addAll(logValue.getCurrentUpLoadedFileMeta());
+ currentExistingLogFiles.addAll(logValue.getAllExistingFilesMeta());
+ }
+
+ private void uploadLogsForContainers(Set containers) {
+ if (this.logAggregationDisabled || containers.isEmpty()) {
+ return;
+ }
+ this.remoteLogFileName =
+ nodeId.toString().replace(":", "_") + "_" + System.currentTimeMillis();
+ this.remoteNodeTmpLogFileForApp =
+ getRemoteNodeTmpLogFileForApp(this.remoteNodeLogDirForApp);
+
+ LogWriter writer = null;
+ try {
+ try {
+ writer =
+ new LogWriter(this.conf, this.remoteNodeTmpLogFileForApp,
+ this.userUgi);
+ // Write ACLs once when the writer is created.
+ writer.writeApplicationACLs(appAcls);
+ writer.writeApplicationOwner(this.userUgi.getShortUserName());
+
+ } catch (IOException e1) {
+ LOG.error("Cannot create writer for app " + this.applicationId
+ + ". Skip log upload this time. ");
+ return;
+ }
+
+ currentUploadedFiles.clear();
+ currentExistingLogFiles.clear();
+ for (ContainerId container : containers) {
+ uploadLogsForContainer(container, writer);
+ }
+
+ this.delService.delete(this.userUgi.getShortUserName(), null,
+ currentUploadedFiles.toArray(new Path[currentUploadedFiles.size()]));
+
+ // if any of the previous uoloaded logs have been deleted,
+ // we need to remove them from alreadyUploadedLogs
+ Iterable mask =
+ Iterables.filter(alreadyUploadedLogs, new Predicate() {
+ @Override
+ public boolean apply(String next) {
+ return currentExistingLogFiles.contains(next);
+ }
+ });
+
+ alreadyUploadedLogs = Sets.newHashSet(mask);
+
+ if (writer != null) {
+ writer.close();
+ }
+
+ try {
+ userUgi.doAs(new PrivilegedExceptionAction