Optional, application-specific binary service data.
*
Environment variables for the launched process.
*
Command to launch the container.
+ *
Optional, application-specific {@link LogContext}
*
*
*
@@ -73,6 +74,18 @@ public static ContainerLaunchContext newInstance(
return container;
}
+ @Public
+ @Stable
+ public static ContainerLaunchContext newInstance(
+ Map localResources,
+ Map environment, List commands,
+ Map serviceData, ByteBuffer tokens,
+ Map acls, LogContext logContext) {
+ ContainerLaunchContext container = newInstance(localResources, environment,
+ commands, serviceData, tokens, acls);
+ container.setLogContext(logContext);
+ return container;
+ }
/**
* Get all the tokens needed by this container. It may include file-system
* tokens, ApplicationMaster related tokens if this container is an
@@ -196,4 +209,23 @@ public static ContainerLaunchContext newInstance(
@Public
@Stable
public abstract void setApplicationACLs(Map acls);
+
+ /**
+ * Get LogContext of the application
+ *
+ * @return LogContext of the application
+ */
+ @Public
+ @Stable
+ public abstract LogContext getLogContext();
+
+ /**
+ * Set LogContext for the application
+ *
+ * @param logContext
+ * for the application
+ */
+ @Public
+ @Stable
+ public abstract void setLogContext(LogContext logContext);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogContext.java
new file mode 100644
index 0000000..cf4883d
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogContext.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.yarn.util.Records;
+
+import com.google.common.base.Preconditions;
+
+/**
+ *
LogContext represents all of the
+ * information needed by the NodeManager to handle
+ * the logs for an application.
+ *
+ *
It includes details such as:
+ *
+ *
logIncludePatterns. It defines include patterns which is used to
+ * filter the log files. The log files which match the include
+ * patterns will be uploaded.
+ *
logExcludePatterns. It defines exclude patterns which is used to
+ * filter the log files. The log files which match the include
+ * patterns will not be uploaded. if the log file name matches both the
+ * include and the exclude patterns, the file will be excluded eventually
+ *
logInterval. The default value is 0. By default,
+ * the logAggregationService only uploads container logs when
+ * the application is finished. This configure defines
+ * how often the logAggregationSerivce uploads container logs in days
+ * By setting this configure, the logAggregationSerivce can upload container
+ * logs periodically when the application is running.
+ *
+ *
+ *
+ *
+ * @see ContainerLaunchContext
+ */
+public abstract class LogContext {
+
+ @Public
+ @Stable
+ public static LogContext newInstance(Set logIncludePatterns,
+ Set logExcludePatterns, long logInterval) {
+ Preconditions.checkArgument(logInterval >= 0);
+ LogContext context = Records.newRecord(LogContext.class);
+ context.setLogIncludePatterns(logIncludePatterns);
+ context.setLogExcludePatterns(logExcludePatterns);
+ context.setLogInterval(logInterval);
+ return context;
+ }
+
+ /**
+ * Get include patterns
+ * @return set of include patterns
+ */
+ @Public
+ @Stable
+ public abstract Set getLogIncludePatterns();
+
+ /**
+ * Set include patterns
+ * @param includePatterns to set
+ */
+ @Public
+ @Stable
+ public abstract void setLogIncludePatterns(Set includePatterns);
+
+ /**
+ * Get exclude patterns
+ * @return set of exclude patterns
+ */
+ @Public
+ @Stable
+ public abstract Set getLogExcludePatterns();
+
+ /**
+ * Set exclude patterns
+ * @param excludePatterns to set
+ */
+ @Public
+ @Stable
+ public abstract void setLogExcludePatterns(Set excludePatterns);
+
+ /**
+ * Get LogInterval per day
+ * @return the logInterval
+ */
+ @Public
+ @Stable
+ public abstract long getLogInterval();
+
+ /**
+ * Set LogInterval per day
+ * @param logInterval
+ */
+ @Public
+ @Stable
+ public abstract void setLogInterval(long logInterval);
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index acc4a05..e0f2e51 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -667,6 +667,14 @@
+ "log-aggregation-enable";
public static final boolean DEFAULT_LOG_AGGREGATION_ENABLED = false;
+ /**
+ * This is for debug and test purpose.
+ * In order to do log aggregation more frequently,
+ * set a very small number (smaller than 1) for this configuration.
+ */
+ public static final String DEBUG_LOG_AGGREGATION_SPEED_UP_RATIO =
+ YARN_PREFIX + "log-aggregation.speed-up-ratio";
+ public static final float DEFAULT_DEBUG_LOG_AGGREGATION_SPEED_UP_RATIO = 1;
/**
* How long to wait before deleting aggregated logs, -1 disables.
* Be careful set this too small and you will spam the name node.
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index d8c42cc..35b2b60 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -294,6 +294,12 @@ message ApplicationSubmissionContextProto {
optional int64 attempt_failures_validity_interval = 13 [default = -1];
}
+message LogContextProto {
+ repeated string log_include_pattern = 1;
+ repeated string log_exclude_pattern = 2;
+ optional int64 log_monitor_interval = 3 [default = 0];
+}
+
enum ApplicationAccessTypeProto {
APPACCESS_VIEW_APP = 1;
APPACCESS_MODIFY_APP = 2;
@@ -344,6 +350,7 @@ message ContainerLaunchContextProto {
repeated StringStringMapProto environment = 4;
repeated string command = 5;
repeated ApplicationACLMapProto application_ACLs = 6;
+ optional LogContextProto log_context = 7;
}
message ContainerStatusProto {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java
index eb6169c..aa67183 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java
@@ -18,8 +18,6 @@
package org.apache.hadoop.yarn.client.cli;
-import java.io.IOException;
-
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
@@ -31,16 +29,11 @@
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
-import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -113,18 +106,6 @@ public int run(String[] args) throws Exception {
System.err.println("Invalid ApplicationId specified");
return -1;
}
-
- try {
- int resultCode = verifyApplicationState(appId);
- if (resultCode != 0) {
- System.out.println("Application has not completed." +
- " Logs are only available after an application completes");
- return resultCode;
- }
- } catch (Exception e) {
- System.err.println("Unable to get ApplicationState." +
- " Attempting to fetch logs directly from the filesystem.");
- }
LogCLIHelpers logCliHelper = new LogCLIHelpers();
logCliHelper.setConf(getConf());
@@ -141,49 +122,14 @@ public int run(String[] args) throws Exception {
printHelpMessage(printOpts);
resultCode = -1;
} else {
- Path remoteRootLogDir =
- new Path(getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
- YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
- AggregatedLogFormat.LogReader reader =
- new AggregatedLogFormat.LogReader(getConf(),
- LogAggregationUtils.getRemoteNodeLogFileForApp(
- remoteRootLogDir,
- appId,
- appOwner,
- ConverterUtils.toNodeId(nodeAddress),
- LogAggregationUtils.getRemoteNodeLogDirSuffix(getConf())));
- resultCode = logCliHelper.dumpAContainerLogs(containerIdStr, reader, System.out);
+ resultCode =
+ logCliHelper.dumpAContainersLogs(appIdStr, containerIdStr,
+ nodeAddress, appOwner);
}
return resultCode;
}
- private int verifyApplicationState(ApplicationId appId) throws IOException,
- YarnException {
- YarnClient yarnClient = createYarnClient();
-
- try {
- ApplicationReport appReport = yarnClient.getApplicationReport(appId);
- switch (appReport.getYarnApplicationState()) {
- case NEW:
- case NEW_SAVING:
- case ACCEPTED:
- case SUBMITTED:
- case RUNNING:
- return -1;
- case FAILED:
- case FINISHED:
- case KILLED:
- default:
- break;
-
- }
- } finally {
- yarnClient.close();
- }
- return 0;
- }
-
@VisibleForTesting
protected YarnClient createYarnClient() {
YarnClient yarnClient = YarnClient.createYarnClient();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java
index 12dcfcd..0373ef2 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java
@@ -30,10 +30,12 @@
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LogContext;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.LogContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.StringBytesMapProto;
import org.apache.hadoop.yarn.proto.YarnProtos.StringLocalResourceMapProto;
import org.apache.hadoop.yarn.proto.YarnProtos.StringStringMapProto;
@@ -56,6 +58,7 @@
private Map environment = null;
private List commands = null;
private Map applicationACLS = null;
+ private LogContext logContext = null;
public ContainerLaunchContextPBImpl() {
builder = ContainerLaunchContextProto.newBuilder();
@@ -120,6 +123,9 @@ private void mergeLocalToBuilder() {
if (this.applicationACLS != null) {
addApplicationACLs();
}
+ if (this.logContext != null) {
+ builder.setLogContext(convertToProtoFormat(this.logContext));
+ }
}
private void mergeLocalToProto() {
@@ -463,4 +469,34 @@ private LocalResourcePBImpl convertFromProtoFormat(LocalResourceProto p) {
private LocalResourceProto convertToProtoFormat(LocalResource t) {
return ((LocalResourcePBImpl)t).getProto();
}
+
+
+ private LogContextPBImpl convertFromProtoFormat(LogContextProto p) {
+ return new LogContextPBImpl(p);
+ }
+
+ private LogContextProto convertToProtoFormat(LogContext t) {
+ return ((LogContextPBImpl)t).getProto();
+ }
+
+ @Override
+ public LogContext getLogContext() {
+ ContainerLaunchContextProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.logContext != null) {
+ return this.logContext;
+ } // Else via proto
+ if (!p.hasLogContext()) {
+ return null;
+ }
+ logContext = convertFromProtoFormat(p.getLogContext());
+ return logContext;
+ }
+
+ @Override
+ public void setLogContext(LogContext logContext) {
+ maybeInitBuilder();
+ if (logContext == null)
+ builder.clearLogContext();
+ this.logContext = logContext;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogContextPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogContextPBImpl.java
new file mode 100644
index 0000000..73ccf71
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogContextPBImpl.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records.impl.pb;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.LogContext;
+import org.apache.hadoop.yarn.proto.YarnProtos.LogContextProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.LogContextProtoOrBuilder;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.TextFormat;
+
+@Private
+@Unstable
+public class LogContextPBImpl extends LogContext{
+ LogContextProto proto =
+ LogContextProto.getDefaultInstance();
+ LogContextProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private Set logIncludePatterns = null;
+ private Set logExcludePatterns = null;
+
+ public LogContextPBImpl() {
+ builder = LogContextProto.newBuilder();
+ }
+
+ public LogContextPBImpl(LogContextProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public LogContextProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+ private void mergeLocalToBuilder() {
+ if (this.logIncludePatterns != null && !this.logIncludePatterns.isEmpty()) {
+ builder.clearLogIncludePattern();
+ builder.addAllLogIncludePattern(this.logIncludePatterns);
+ }
+
+ if (this.logExcludePatterns != null && !this.logExcludePatterns.isEmpty()) {
+ builder.clearLogExcludePattern();
+ builder.addAllLogExcludePattern(this.logExcludePatterns);
+ }
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = LogContextProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public Set getLogIncludePatterns() {
+ initLogIncludePatterns();
+ return this.logIncludePatterns;
+ }
+
+ @Override
+ public void setLogIncludePatterns(Set logPattern) {
+ maybeInitBuilder();
+ if (logPattern == null || logPattern.isEmpty()) {
+ builder.clearLogIncludePattern();
+ this.logIncludePatterns = null;
+ return;
+ }
+
+ this.logIncludePatterns = new HashSet();
+ this.logIncludePatterns.addAll(logPattern);
+ }
+
+ private void initLogIncludePatterns() {
+ if (this.logIncludePatterns != null) {
+ return;
+ }
+ LogContextProtoOrBuilder p = viaProto ? proto : builder;
+ this.logIncludePatterns = new HashSet();
+ this.logIncludePatterns.addAll(p.getLogIncludePatternList());
+ }
+
+ @Override
+ public Set getLogExcludePatterns() {
+ initLogExcludePatterns();
+ return this.logExcludePatterns;
+ }
+
+ @Override
+ public void setLogExcludePatterns(Set logPattern) {
+ maybeInitBuilder();
+ if (logPattern == null || logPattern.isEmpty()) {
+ builder.clearLogExcludePattern();
+ this.logExcludePatterns = null;
+ return;
+ }
+
+ this.logExcludePatterns = new HashSet();
+ this.logExcludePatterns.addAll(logPattern);
+ }
+
+ private void initLogExcludePatterns() {
+ if (this.logExcludePatterns != null) {
+ return;
+ }
+ LogContextProtoOrBuilder p = viaProto ? proto : builder;
+ this.logExcludePatterns = new HashSet();
+ this.logExcludePatterns.addAll(p.getLogExcludePatternList());
+ }
+
+ @Override
+ public long getLogInterval() {
+ LogContextProtoOrBuilder p = viaProto ? proto : builder;
+ return p.getLogMonitorInterval();
+ }
+
+ @Override
+ public void setLogInterval(long logInterval) {
+ Preconditions.checkArgument(logInterval >= 0);
+ maybeInitBuilder();
+ builder.setLogMonitorInterval(logInterval);
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
index 3568de2..11ea2c3 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
@@ -24,6 +24,7 @@
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
+import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
@@ -35,10 +36,12 @@
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-
+import java.util.Set;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.apache.commons.io.input.BoundedInputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -60,10 +63,15 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LogContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
@Public
@Evolving
public class AggregatedLogFormat {
@@ -149,6 +157,10 @@ public String toString() {
private final List rootLogDirs;
private final ContainerId containerId;
private final String user;
+ private final LogContext logContext;
+ private Set uploadedFiles = new HashSet();
+ private final Set alreadyUploadedLogFiles;
+ private Set allExistingFileMeta = new HashSet();
// TODO Maybe add a version string here. Instead of changing the version of
// the entire k-v format
@@ -160,9 +172,24 @@ public LogValue(List rootLogDirs, ContainerId containerId,
// Ensure logs are processed in lexical order
Collections.sort(this.rootLogDirs);
+ this.logContext = null;
+ this.alreadyUploadedLogFiles = new HashSet();
}
- public void write(DataOutputStream out) throws IOException {
+ public LogValue(List rootLogDirs, ContainerId containerId,
+ String user, LogContext logContext,
+ Set alreadyUploadedLogFiles) {
+ this.rootLogDirs = new ArrayList(rootLogDirs);
+ this.containerId = containerId;
+ this.user = user;
+
+ // Ensure logs are processed in lexical order
+ Collections.sort(this.rootLogDirs);
+ this.logContext = logContext;
+ this.alreadyUploadedLogFiles = alreadyUploadedLogFiles;
+ }
+
+ private int getNumOfLogFilesToUpload() {
for (String rootLogDir : this.rootLogDirs) {
File appLogDir =
new File(rootLogDir,
@@ -177,61 +204,131 @@ public void write(DataOutputStream out) throws IOException {
continue; // ContainerDir may have been deleted by the user.
}
- // Write out log files in lexical order
- File[] logFiles = containerLogDir.listFiles();
- Arrays.sort(logFiles);
- for (File logFile : logFiles) {
-
- final long fileLength = logFile.length();
-
- // Write the logFile Type
- out.writeUTF(logFile.getName());
-
- // Write the log length as UTF so that it is printable
- out.writeUTF(String.valueOf(fileLength));
-
- // Write the log itself
- FileInputStream in = null;
- try {
- in = SecureIOUtils.openForRead(logFile, getUser(), null);
- byte[] buf = new byte[65535];
- int len = 0;
- long bytesLeft = fileLength;
- while ((len = in.read(buf)) != -1) {
- //If buffer contents within fileLength, write
- if (len < bytesLeft) {
- out.write(buf, 0, len);
- bytesLeft-=len;
- }
- //else only write contents within fileLength, then exit early
- else {
- out.write(buf, 0, (int)bytesLeft);
- break;
- }
- }
- long newLength = logFile.length();
- if(fileLength < newLength) {
- LOG.warn("Aggregated logs truncated by approximately "+
- (newLength-fileLength) +" bytes.");
+ this.uploadedFiles.addAll(getFilteredLogFiles(containerLogDir));
+ }
+ return this.uploadedFiles.size();
+ }
+
+ public void write(DataOutputStream out) throws IOException {
+ List fileList = new ArrayList(this.uploadedFiles);
+ Collections.sort(fileList);
+
+ for (File logFile : fileList) {
+ final long fileLength = logFile.length();
+ // Write the logFile Type
+ out.writeUTF(logFile.getName());
+
+ // Write the log length as UTF so that it is printable
+ out.writeUTF(String.valueOf(fileLength));
+
+ // Write the log itself
+ FileInputStream in = null;
+ try {
+ in = SecureIOUtils.openForRead(logFile, getUser(), null);
+ byte[] buf = new byte[65535];
+ int len = 0;
+ long bytesLeft = fileLength;
+ while ((len = in.read(buf)) != -1) {
+ //If buffer contents within fileLength, write
+ if (len < bytesLeft) {
+ out.write(buf, 0, len);
+ bytesLeft-=len;
}
- } catch (IOException e) {
- String message = "Error aggregating log file. Log file : "
- + logFile.getAbsolutePath() + e.getMessage();
- LOG.error(message, e);
- out.write(message.getBytes());
- } finally {
- if (in != null) {
- in.close();
+ //else only write contents within fileLength, then exit early
+ else {
+ out.write(buf, 0, (int)bytesLeft);
+ break;
}
}
+ long newLength = logFile.length();
+ if(fileLength < newLength) {
+ LOG.warn("Aggregated logs truncated by approximately "+
+ (newLength-fileLength) +" bytes.");
+ }
+ } catch (IOException e) {
+ String message = "Error aggregating log file. Log file : "
+ + logFile.getAbsolutePath() + e.getMessage();
+ LOG.error(message, e);
+ out.write(message.getBytes());
+ } finally {
+ if (in != null) {
+ in.close();
+ }
}
}
}
-
+
// Added for testing purpose.
public String getUser() {
return user;
}
+
+ private Set getFilteredLogFiles(File containerLogDir) {
+ Set candiates =
+ new HashSet(Arrays.asList(containerLogDir.listFiles()));
+ for (File logFile : candiates) {
+ this.allExistingFileMeta.add(fileMeta(logFile));
+ }
+
+ if (this.logContext != null && candiates.size() > 0) {
+ if (this.logContext.getLogIncludePatterns() != null
+ && !this.logContext.getLogIncludePatterns().isEmpty()) {
+ FileFilter includeLogFileFilter =
+ new WildcardFileFilter(new ArrayList(
+ this.logContext.getLogIncludePatterns()));
+ candiates =
+ new HashSet(Arrays.asList(containerLogDir
+ .listFiles(includeLogFileFilter)));
+ }
+
+ FileFilter excludeLogFileFilter = null;
+ if (this.logContext.getLogExcludePatterns() != null
+ && !this.logContext.getLogExcludePatterns().isEmpty()) {
+ excludeLogFileFilter =
+ new WildcardFileFilter(new ArrayList(
+ this.logContext.getLogExcludePatterns()));
+ }
+ final Set excludeFiles =
+ excludeLogFileFilter == null ? new HashSet()
+ : new HashSet(Arrays.asList(containerLogDir
+ .listFiles(excludeLogFileFilter)));
+ Iterable mask =
+ Iterables.filter(candiates, new Predicate() {
+ @Override
+ public boolean apply(File next) {
+ return !alreadyUploadedLogFiles.contains(fileMeta(next))
+ && !excludeFiles.contains(next);
+ }
+ });
+ candiates = Sets.newHashSet(mask);
+ }
+ return candiates;
+ }
+
+ public Set getCurrentUpLoadedFilesPath() {
+ Set path = new HashSet();
+ for (File file : this.uploadedFiles) {
+ path.add(new Path(file.getAbsolutePath()));
+ }
+ return path;
+ }
+
+ public Set getCurrentUpLoadedFileMeta() {
+ Set info = new HashSet();
+ for (File file : this.uploadedFiles) {
+ info.add(fileMeta(file));
+ }
+ return info;
+ }
+
+ public Set getAllExistingFilesMeta() {
+ return this.allExistingFileMeta;
+ }
+
+ private String fileMeta(File file) {
+ return containerId.toString() + "_" + file.getName() + "_"
+ + file.lastModified();
+ }
}
/**
@@ -242,16 +339,22 @@ public String getUser() {
private final FSDataOutputStream fsDataOStream;
private final TFile.Writer writer;
+ private final Path remoteAppLogFile;
+ private FileContext fc;
public LogWriter(final Configuration conf, final Path remoteAppLogFile,
UserGroupInformation userUgi) throws IOException {
+ this.remoteAppLogFile = remoteAppLogFile;
try {
this.fsDataOStream =
userUgi.doAs(new PrivilegedExceptionAction() {
@Override
public FSDataOutputStream run() throws Exception {
- FileContext fc = FileContext.getFileContext(conf);
+ fc = FileContext.getFileContext(conf);
fc.setUMask(APP_LOG_FILE_UMASK);
+ if (fc.util().exists(remoteAppLogFile)) {
+ fc.delete(remoteAppLogFile, false);
+ }
return fc.create(
remoteAppLogFile,
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
@@ -304,6 +407,14 @@ public void writeApplicationACLs(Map appAcls)
}
public void append(LogKey logKey, LogValue logValue) throws IOException {
+ if (logValue.getNumOfLogFilesToUpload() == 0) {
+ // No new logs need to uploaded at this time
+ // delete the temporary remote App file
+ if (fc.util().exists(remoteAppLogFile)) {
+ fc.delete(remoteAppLogFile, false);
+ return;
+ }
+ }
DataOutputStream out = this.writer.prepareAppendKey(-1);
logKey.write(out);
out.close();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
index 4445ff9..b54792c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
@@ -28,8 +28,9 @@
@Private
public class LogAggregationUtils {
+ public static final String TMP_FILE_SUFFIX = "TEMP.tmp";
/**
- * Constructs the full filename for an application's log file per node.
+ * Get the remote node dir under remote app log dir.
* @param remoteRootLogDir
* @param appId
* @param user
@@ -37,7 +38,7 @@
* @param suffix
* @return the remote log file.
*/
- public static Path getRemoteNodeLogFileForApp(Path remoteRootLogDir,
+ public static Path getRemoteNodeLogDirForApp(Path remoteRootLogDir,
ApplicationId appId, String user, NodeId nodeId, String suffix) {
return new Path(getRemoteAppLogDir(remoteRootLogDir, appId, user, suffix),
getNodeString(nodeId));
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
index 3bafdb3..01b64a7 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
@@ -52,19 +52,50 @@ public int dumpAContainersLogs(String appId, String containerId,
YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(getConf());
- Path logPath = LogAggregationUtils.getRemoteNodeLogFileForApp(
+ Path remoteAppNodeLogDir = LogAggregationUtils.getRemoteNodeLogDirForApp(
remoteRootLogDir, ConverterUtils.toApplicationId(appId), jobOwner,
ConverterUtils.toNodeId(nodeId), suffix);
- AggregatedLogFormat.LogReader reader;
+ RemoteIterator nodeFiles;
try {
- reader = new AggregatedLogFormat.LogReader(getConf(), logPath);
- } catch (FileNotFoundException fnfe) {
- System.out.println("Logs not available at " + logPath.toString());
+ Path qualifiedLogDir =
+ FileContext.getFileContext(getConf()).makeQualified(
+ remoteAppNodeLogDir);
+ nodeFiles =
+ FileContext.getFileContext(qualifiedLogDir.toUri(), getConf())
+ .listStatus(remoteAppNodeLogDir);
+ } catch (FileNotFoundException fnf) {
+ System.out.println("Logs not available at "
+ + remoteAppNodeLogDir.toString());
System.out
.println("Log aggregation has not completed or is not enabled.");
return -1;
}
- return dumpAContainerLogs(containerId, reader, System.out);
+ boolean foundContainerLogs = false;
+ while (nodeFiles.hasNext()) {
+ FileStatus thisNodeFile = nodeFiles.next();
+ String fileName = thisNodeFile.getPath().getName();
+ if (!fileName.contains(LogAggregationUtils.TMP_FILE_SUFFIX)) {
+ AggregatedLogFormat.LogReader reader = null;
+ try {
+ reader =
+ new AggregatedLogFormat.LogReader(getConf(), new Path(
+ remoteAppNodeLogDir, thisNodeFile.getPath().getName()));
+ if (dumpAContainerLogs(containerId, reader, System.out) > -1) {
+ foundContainerLogs = true;
+ }
+ } finally {
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ }
+ }
+ if (!foundContainerLogs) {
+ System.out.println("Logs for container " + containerId
+ + " are not present in this log-file.");
+ return -1;
+ }
+ return 0;
}
@Private
@@ -81,8 +112,6 @@ public int dumpAContainerLogs(String containerIdStr,
}
if (valueStream == null) {
- System.out.println("Logs for container " + containerIdStr
- + " are not present in this log-file.");
return -1;
}
@@ -119,37 +148,65 @@ public int dumpAllContainersLogs(ApplicationId appId, String appOwner,
.println("Log aggregation has not completed or is not enabled.");
return -1;
}
+ boolean foundAnyLogs = false;
while (nodeFiles.hasNext()) {
- FileStatus thisNodeFile = nodeFiles.next();
- AggregatedLogFormat.LogReader reader = new AggregatedLogFormat.LogReader(
- getConf(), new Path(remoteAppLogDir, thisNodeFile.getPath().getName()));
- try {
-
- DataInputStream valueStream;
- LogKey key = new LogKey();
- valueStream = reader.next(key);
-
- while (valueStream != null) {
- String containerString = "\n\nContainer: " + key + " on "
- + thisNodeFile.getPath().getName();
- out.println(containerString);
- out.println(StringUtils.repeat("=", containerString.length()));
- while (true) {
+ FileStatus thisNodeDir = nodeFiles.next();
+ if (thisNodeDir.isDirectory()) {
+ RemoteIterator logFiles;
+ try {
+ Path qualifiedLogDir =
+ FileContext.getFileContext(getConf()).makeQualified(
+ thisNodeDir.getPath());
+ logFiles =
+ FileContext.getFileContext(qualifiedLogDir.toUri(), getConf())
+ .listStatus(thisNodeDir.getPath());
+ } catch (FileNotFoundException fnf) {
+ continue;
+ }
+ while (logFiles.hasNext()) {
+ FileStatus logFile = logFiles.next();
+ if (!logFile.getPath().getName().contains(
+ LogAggregationUtils.TMP_FILE_SUFFIX)) {
+ AggregatedLogFormat.LogReader reader =
+ new AggregatedLogFormat.LogReader(getConf(),logFile.getPath());
try {
- LogReader.readAContainerLogsForALogType(valueStream, out);
- } catch (EOFException eof) {
- break;
+
+ DataInputStream valueStream;
+ LogKey key = new LogKey();
+ valueStream = reader.next(key);
+
+ while (valueStream != null) {
+ String containerString =
+ "\n\nContainer: " + key + " on "
+ + logFile.getPath().getName();
+ out.println(containerString);
+ out.println(StringUtils.repeat("=", containerString.length()));
+ while (true) {
+ try {
+ LogReader.readAContainerLogsForALogType(valueStream, out);
+ foundAnyLogs = true;
+ } catch (EOFException eof) {
+ break;
+ }
+ }
+
+ // Next container
+ key = new LogKey();
+ valueStream = reader.next(key);
+ }
+ } finally {
+ reader.close();
}
}
-
- // Next container
- key = new LogKey();
- valueStream = reader.next(key);
}
- } finally {
- reader.close();
}
}
+ if (! foundAnyLogs) {
+ System.out.println("Logs not available at " + remoteAppLogDir.toString());
+ System.out
+ .println("Log aggregation has not completed or is not enabled.");
+ return -1;
+ }
return 0;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java
index 2b83e69..dbc7f02 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java
@@ -30,7 +30,10 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -59,113 +62,121 @@
@Override
protected void render(Block html) {
- AggregatedLogFormat.LogReader reader = null;
- try {
- ContainerId containerId = verifyAndGetContainerId(html);
- NodeId nodeId = verifyAndGetNodeId(html);
- String appOwner = verifyAndGetAppOwner(html);
- LogLimits logLimits = verifyAndGetLogLimits(html);
- if (containerId == null || nodeId == null || appOwner == null
- || appOwner.isEmpty() || logLimits == null) {
- return;
- }
-
- ApplicationId applicationId = containerId.getApplicationAttemptId()
- .getApplicationId();
- String logEntity = $(ENTITY_STRING);
- if (logEntity == null || logEntity.isEmpty()) {
- logEntity = containerId.toString();
- }
+ ContainerId containerId = verifyAndGetContainerId(html);
+ NodeId nodeId = verifyAndGetNodeId(html);
+ String appOwner = verifyAndGetAppOwner(html);
+ LogLimits logLimits = verifyAndGetLogLimits(html);
+ if (containerId == null || nodeId == null || appOwner == null
+ || appOwner.isEmpty() || logLimits == null) {
+ return;
+ }
- if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
- YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
- html.h1()
- ._("Aggregation is not enabled. Try the nodemanager at " + nodeId)
- ._();
- return;
- }
+ ApplicationId applicationId = containerId.getApplicationAttemptId()
+ .getApplicationId();
+ String logEntity = $(ENTITY_STRING);
+ if (logEntity == null || logEntity.isEmpty()) {
+ logEntity = containerId.toString();
+ }
- Path remoteRootLogDir = new Path(conf.get(
- YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
- YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
+ if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
+ YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
+ html.h1()
+ ._("Aggregation is not enabled. Try the nodemanager at " + nodeId)
+ ._();
+ return;
+ }
- try {
- reader = new AggregatedLogFormat.LogReader(conf,
- LogAggregationUtils.getRemoteNodeLogFileForApp(remoteRootLogDir,
- applicationId, appOwner, nodeId,
- LogAggregationUtils.getRemoteNodeLogDirSuffix(conf)));
- } catch (FileNotFoundException e) {
- // ACLs not available till the log file is opened.
- html.h1()
- ._("Logs not available for " + logEntity
- + ". Aggregation may not be complete, "
- + "Check back later or try the nodemanager at " + nodeId)._();
- return;
- } catch (IOException e) {
- html.h1()._("Error getting logs for " + logEntity)._();
- LOG.error("Error getting logs for " + logEntity, e);
- return;
- }
+ Path remoteRootLogDir = new Path(conf.get(
+ YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+ YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
+ Path remoteAppNodeLogDir = LogAggregationUtils.getRemoteNodeLogDirForApp(
+ remoteRootLogDir, applicationId, appOwner, nodeId,
+ LogAggregationUtils.getRemoteNodeLogDirSuffix(conf));
+ RemoteIterator nodeFiles;
+ try {
+ Path qualifiedLogDir =
+ FileContext.getFileContext(conf).makeQualified(
+ remoteAppNodeLogDir);
+ nodeFiles =
+ FileContext.getFileContext(qualifiedLogDir.toUri(), conf)
+ .listStatus(remoteAppNodeLogDir);
+ } catch (FileNotFoundException fnf) {
+ html.h1()
+ ._("Logs not available for " + logEntity
+ + ". Aggregation may not be complete, "
+ + "Check back later or try the nodemanager at " + nodeId)._();
+ return;
+ } catch (Exception ex) {
+ html.h1()
+ ._("Error getting logs at " + nodeId)._();
+ return;
+ }
- String owner = null;
- Map appAcls = null;
- try {
- owner = reader.getApplicationOwner();
- appAcls = reader.getApplicationAcls();
- } catch (IOException e) {
- html.h1()._("Error getting logs for " + logEntity)._();
- LOG.error("Error getting logs for " + logEntity, e);
- return;
- }
- ApplicationACLsManager aclsManager = new ApplicationACLsManager(conf);
- aclsManager.addApplication(applicationId, appAcls);
+ boolean foundLog = false;
+ String desiredLogType = $(CONTAINER_LOG_TYPE);
+ try {
+ while (nodeFiles.hasNext()) {
+ AggregatedLogFormat.LogReader reader = null;
+ try {
+ FileStatus thisNodeFile = nodeFiles.next();
+ reader = new AggregatedLogFormat.LogReader(conf, new Path(
+ remoteAppNodeLogDir, thisNodeFile.getPath().getName()));
+
+ String owner = null;
+ Map appAcls = null;
+ try {
+ owner = reader.getApplicationOwner();
+ appAcls = reader.getApplicationAcls();
+ } catch (IOException e) {
+ LOG.error("Error getting logs for " + logEntity, e);
+ continue;
+ }
+ ApplicationACLsManager aclsManager = new ApplicationACLsManager(conf);
+ aclsManager.addApplication(applicationId, appAcls);
- String remoteUser = request().getRemoteUser();
- UserGroupInformation callerUGI = null;
- if (remoteUser != null) {
- callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
- }
- if (callerUGI != null
- && !aclsManager.checkAccess(callerUGI,
+ String remoteUser = request().getRemoteUser();
+ UserGroupInformation callerUGI = null;
+ if (remoteUser != null) {
+ callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
+ }
+ if (callerUGI != null && !aclsManager.checkAccess(callerUGI,
ApplicationAccessType.VIEW_APP, owner, applicationId)) {
- html.h1()
- ._("User [" + remoteUser
- + "] is not authorized to view the logs for " + logEntity)._();
- return;
- }
+ html.h1()
+ ._("User [" + remoteUser
+ + "] is not authorized to view the logs for " + logEntity
+ + " in log file [" + thisNodeFile.getPath().getName() + "]")._();
+ LOG.error("User [" + remoteUser
+ + "] is not authorized to view the logs for " + logEntity);
+ continue;
+ }
- String desiredLogType = $(CONTAINER_LOG_TYPE);
- try {
- AggregatedLogFormat.ContainerLogsReader logReader = reader
+ AggregatedLogFormat.ContainerLogsReader logReader = reader
.getContainerLogsReader(containerId);
- if (logReader == null) {
- html.h1()
- ._("Logs not available for " + logEntity
- + ". Could be caused by the rentention policy")._();
- return;
- }
-
- boolean foundLog = readContainerLogs(html, logReader, logLimits,
- desiredLogType);
-
- if (!foundLog) {
- if (desiredLogType.isEmpty()) {
- html.h1("No logs available for container " + containerId.toString());
- } else {
- html.h1("Unable to locate '" + desiredLogType
- + "' log for container " + containerId.toString());
+ if (logReader == null) {
+ continue;
}
- return;
+
+ foundLog = readContainerLogs(html, logReader, logLimits,
+ desiredLogType);
+ } catch (IOException ex) {
+ LOG.error("Error getting logs for " + logEntity, ex);
+ continue;
+ } finally {
+ if (reader != null)
+ reader.close();
}
- } catch (IOException e) {
- html.h1()._("Error getting logs for " + logEntity)._();
- LOG.error("Error getting logs for " + logEntity, e);
- return;
}
- } finally {
- if (reader != null) {
- reader.close();
+ if (!foundLog) {
+ if (desiredLogType.isEmpty()) {
+ html.h1("No logs available for container " + containerId.toString());
+ } else {
+ html.h1("Unable to locate '" + desiredLogType
+ + "' log for container " + containerId.toString());
+ }
}
+ } catch (IOException e) {
+ html.h1()._("Error getting logs for " + logEntity)._();
+ LOG.error("Error getting logs for " + logEntity, e);
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
index c6572e9..caacef8 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
@@ -178,6 +178,7 @@ public static void setup() throws Exception {
"http", "localhost", 8080, "file0"));
typeValueCache.put(SerializedException.class,
SerializedException.newInstance(new IOException("exception for test")));
+ generateByNewInstance(LogContext.class);
generateByNewInstance(ApplicationId.class);
generateByNewInstance(ApplicationAttemptId.class);
generateByNewInstance(ContainerId.class);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java
index 94902d4..092f6d1 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java
@@ -173,7 +173,7 @@ public void testNoLogs() throws Exception {
block.getWriter().flush();
String out = data.toString();
- assertTrue(out.contains("No logs available for container container_0_0001_01_000001"));
+ assertTrue(out.contains("No logs available"));
}
@@ -211,7 +211,8 @@ private void writeLog(Configuration configuration, String user)
ContainerId containerId = ContainerIdPBImpl.newInstance(appAttemptId, 1);
String path = "target/logs/" + user
- + "/logs/application_0_0001/localhost_1234";
+ + "/logs/application_0_0001/localhost_1234/localhost_1234_"
+ + System.currentTimeMillis();
File f = new File(path);
if (!f.getParentFile().exists()) {
assertTrue(f.getParentFile().mkdirs());
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 12166e0..eb8b8c6 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -72,9 +72,11 @@
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.LogContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.LogContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
@@ -275,11 +277,16 @@ private void recoverApplication(ContainerManagerApplicationProto p)
aclProto.getAcl());
}
+ LogContext logContext = null;
+ if (p.getLogContext() != null) {
+ logContext = new LogContextPBImpl(p.getLogContext());
+ }
+
LOG.info("Recovering application " + appId);
ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId,
creds, context);
context.getApplications().put(appId, app);
- app.handle(new ApplicationInitEvent(appId, acls));
+ app.handle(new ApplicationInitEvent(appId, acls, logContext));
}
@SuppressWarnings("unchecked")
@@ -719,13 +726,15 @@ protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier,
private ContainerManagerApplicationProto buildAppProto(ApplicationId appId,
String user, Credentials credentials,
- Map appAcls) {
+ Map appAcls, LogContext logContext) {
ContainerManagerApplicationProto.Builder builder =
ContainerManagerApplicationProto.newBuilder();
builder.setId(((ApplicationIdPBImpl) appId).getProto());
builder.setUser(user);
-
+ if (logContext != null) {
+ builder.setLogContext(((LogContextPBImpl) logContext).getProto());
+ }
builder.clearCredentials();
if (credentials != null) {
DataOutputBuffer dob = new DataOutputBuffer();
@@ -828,10 +837,12 @@ private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
LOG.info("Creating a new application reference for app " + applicationID);
Map appAcls =
container.getLaunchContext().getApplicationACLs();
+ LogContext logContext = container.getLaunchContext().getLogContext();
context.getNMStateStore().storeApplication(applicationID,
- buildAppProto(applicationID, user, credentials, appAcls));
+ buildAppProto(applicationID, user, credentials, appAcls,
+ logContext));
dispatcher.getEventHandler().handle(
- new ApplicationInitEvent(applicationID, appAcls));
+ new ApplicationInitEvent(applicationID, appAcls, logContext));
}
this.context.getNMStateStore().storeContainer(containerId, request);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
index cc5544c..5eeb37b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
@@ -237,7 +237,7 @@ public void transition(ApplicationImpl app, ApplicationEvent event) {
app.dispatcher.getEventHandler().handle(
new LogHandlerAppStartedEvent(app.appId, app.user,
app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS,
- app.applicationACLs));
+ app.applicationACLs, initEvent.getLogContext()));
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java
index 5746b6a..fdd828f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java
@@ -22,18 +22,32 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.LogContext;
public class ApplicationInitEvent extends ApplicationEvent {
private final Map applicationACLs;
+ private final LogContext logContext;
public ApplicationInitEvent(ApplicationId appId,
Map acls) {
super(appId, ApplicationEventType.INIT_APPLICATION);
this.applicationACLs = acls;
+ this.logContext = null;
+ }
+
+ public ApplicationInitEvent(ApplicationId appId,
+ Map acls, LogContext logContext) {
+ super(appId, ApplicationEventType.INIT_APPLICATION);
+ this.applicationACLs = acls;
+ this.logContext = logContext;
}
public Map getApplicationACLs() {
return this.applicationACLs;
}
+
+ public LogContext getLogContext() {
+ return this.logContext;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index 1af48bb..37f14bc 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -20,14 +20,17 @@
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -36,35 +39,45 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LogContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
public class AppLogAggregatorImpl implements AppLogAggregator {
private static final Log LOG = LogFactory
.getLog(AppLogAggregatorImpl.class);
private static final int THREAD_SLEEP_TIME = 1000;
- private static final String TMP_FILE_SUFFIX = ".tmp";
private final LocalDirsHandlerService dirsHandler;
private final Dispatcher dispatcher;
private final ApplicationId appId;
+ private final NodeId nodeId;
private final String applicationId;
private boolean logAggregationDisabled = false;
private final Configuration conf;
private final DeletionService delService;
private final UserGroupInformation userUgi;
- private final Path remoteNodeLogFileForApp;
- private final Path remoteNodeTmpLogFileForApp;
+ private final Path remoteNodeLogDirForApp;
+ private Path remoteNodeTmpLogFileForApp;
private final ContainerLogsRetentionPolicy retentionPolicy;
private final BlockingQueue pendingContainers;
@@ -72,53 +85,37 @@
private final AtomicBoolean appAggregationFinished = new AtomicBoolean();
private final AtomicBoolean aborted = new AtomicBoolean();
private final Map appAcls;
-
- private LogWriter writer = null;
+ private final LogContext logContext;
+ private final Context context;
+ private String remoteLogFileName;
+ private Set currentUploadedFiles = new HashSet();
+ private Set alreadyUploadedLogs = new HashSet();
+ private Set currentExistingLogFiles = new HashSet();
public AppLogAggregatorImpl(Dispatcher dispatcher,
- DeletionService deletionService, Configuration conf, ApplicationId appId,
- UserGroupInformation userUgi, LocalDirsHandlerService dirsHandler,
- Path remoteNodeLogFileForApp,
+ DeletionService deletionService, Configuration conf, NodeId nodeId,
+ ApplicationId appId, UserGroupInformation userUgi,
+ LocalDirsHandlerService dirsHandler, Path remoteNodeLogDirForApp,
ContainerLogsRetentionPolicy retentionPolicy,
- Map appAcls) {
+ Map appAcls, LogContext logContext,
+ Context context) {
this.dispatcher = dispatcher;
this.conf = conf;
this.delService = deletionService;
+ this.nodeId = nodeId;
this.appId = appId;
this.applicationId = ConverterUtils.toString(appId);
this.userUgi = userUgi;
this.dirsHandler = dirsHandler;
- this.remoteNodeLogFileForApp = remoteNodeLogFileForApp;
- this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp();
+ this.remoteNodeLogDirForApp = remoteNodeLogDirForApp;
this.retentionPolicy = retentionPolicy;
this.pendingContainers = new LinkedBlockingQueue();
this.appAcls = appAcls;
+ this.logContext = logContext;
+ this.context = context;
}
- private void uploadLogsForContainer(ContainerId containerId) {
-
- if (this.logAggregationDisabled) {
- return;
- }
-
- // Lazy creation of the writer
- if (this.writer == null) {
- LOG.info("Starting aggregate log-file for app " + this.applicationId
- + " at " + this.remoteNodeTmpLogFileForApp);
- try {
- this.writer =
- new LogWriter(this.conf, this.remoteNodeTmpLogFileForApp,
- this.userUgi);
- //Write ACLs once when and if the writer is created.
- this.writer.writeApplicationACLs(appAcls);
- this.writer.writeApplicationOwner(this.userUgi.getShortUserName());
- } catch (IOException e) {
- LOG.error("Cannot create writer for app " + this.applicationId
- + ". Disabling log-aggregation for this app.", e);
- this.logAggregationDisabled = true;
- return;
- }
- }
+ private void uploadLogsForContainer(ContainerId containerId, LogWriter writer) {
LOG.info("Uploading logs for container " + containerId
+ ". Current good log dirs are "
@@ -126,13 +123,92 @@ private void uploadLogsForContainer(ContainerId containerId) {
LogKey logKey = new LogKey(containerId);
LogValue logValue =
new LogValue(dirsHandler.getLogDirs(), containerId,
- userUgi.getShortUserName());
+ userUgi.getShortUserName(), this.logContext, this.alreadyUploadedLogs);
try {
- this.writer.append(logKey, logValue);
+ writer.append(logKey, logValue);
} catch (IOException e) {
LOG.error("Couldn't upload logs for " + containerId
+ ". Skipping this container.");
}
+ currentUploadedFiles.addAll(logValue.getCurrentUpLoadedFilesPath());
+ alreadyUploadedLogs.addAll(logValue.getCurrentUpLoadedFileMeta());
+ currentExistingLogFiles.addAll(logValue.getAllExistingFilesMeta());
+ }
+
+ private void uploadLogsForContainers(Set containers) {
+ if (this.logAggregationDisabled || containers.isEmpty()) {
+ return;
+ }
+ this.remoteLogFileName =
+ nodeId.toString().replace(":", "_") + "_" + System.currentTimeMillis();
+ this.remoteNodeTmpLogFileForApp =
+ getRemoteNodeTmpLogFileForApp(this.remoteNodeLogDirForApp,
+ remoteLogFileName);
+
+ LogWriter writer = null;
+ try {
+ try {
+ writer =
+ new LogWriter(this.conf, this.remoteNodeTmpLogFileForApp,
+ this.userUgi);
+ // Write ACLs once when the writer is created.
+ writer.writeApplicationACLs(appAcls);
+ writer.writeApplicationOwner(this.userUgi.getShortUserName());
+
+ } catch (IOException e1) {
+ LOG.error("Cannot create writer for app " + this.applicationId
+ + ". Skip log upload this time. ");
+ return;
+ }
+
+ currentUploadedFiles.clear();
+ currentExistingLogFiles.clear();
+ for (ContainerId container : containers) {
+ uploadLogsForContainer(container, writer);
+ }
+
+ this.delService.delete(this.userUgi.getShortUserName(), null,
+ currentUploadedFiles.toArray(new Path[currentUploadedFiles.size()]));
+
+ // if any of the previous uoloaded logs have been deleted,
+ // we need to remove them from alreadyUploadedLogs
+ Iterable mask =
+ Iterables.filter(alreadyUploadedLogs, new Predicate() {
+ @Override
+ public boolean apply(String next) {
+ return currentExistingLogFiles.contains(next);
+ }
+ });
+
+ alreadyUploadedLogs = Sets.newHashSet(mask);
+
+ if (writer != null) {
+ writer.close();
+ }
+
+ try {
+ userUgi.doAs(new PrivilegedExceptionAction