Index: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java =================================================================== --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java (revision 1528266) +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java (working copy) @@ -61,6 +61,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer; import org.apache.hadoop.mapreduce.task.ReduceContextImpl; +import org.apache.hadoop.yarn.util.LogUtils; import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.Progress; @@ -1011,6 +1012,7 @@ LOG.warn("Failure sending commit pending: " + StringUtils.stringifyException(ie)); if (--retries == 0) { + LogUtils.triggerFlush(LOG); System.exit(67); } } @@ -1108,6 +1110,7 @@ } private void sendDone(TaskUmbilicalProtocol umbilical) throws IOException { + LogUtils.triggerFlush(LOG); int retries = MAX_RETRIES; while (true) { try { Index: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java =================================================================== --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java (revision 1528266) +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java (working copy) @@ -19,7 +19,11 @@ import java.io.File; import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; @@ -27,11 +31,17 @@ import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.mapred.SortedRanges.Range; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.yarn.util.LogUtils; +import org.apache.log4j.LogManager; +import org.apache.log4j.PropertyConfigurator; public class TestTaskCommit extends HadoopTestCase { Path rootDir = new Path(System.getProperty("test.build.data", "/tmp"), "test"); + private static final Log LOG = + LogFactory.getLog(TestTaskCommit.class); + static class CommitterWithCommitFail extends FileOutputCommitter { public void commitTask(TaskAttemptContext context) throws IOException { Path taskOutputPath = getTaskAttemptPath(context); @@ -277,7 +287,79 @@ return testTask; } + /** + * A test to make sure the log is correctly written and be truncated to the size limit + */ + public void testTaskCommitLogFlush() throws Exception { + //init log4j + // -Dlog4j.configuration=container-log4j.properties + // -Dyarn.app.mapreduce.container.log.dir=/Users/phan/.../ntainer_1377715051394_0002_01_000001 + // -Dyarn.app.mapreduce.container.log.filesize=2097152 + // -Dhadoop.root.logger=DEBUG,CLA + System.setProperty("log4j.configuration", "container-log4j.properties"); + System.setProperty("hadoop.root.logger", "DEBUG,CLA"); + Path tmpLogDir = new Path(rootDir, "tmplog"); + System.setProperty("yarn.app.mapreduce.container.log.dir", tmpLogDir.toString()); + final String userLogSizeLimitInBytes = "2000"; + System.setProperty("yarn.app.mapreduce.container.log.filesize", userLogSizeLimitInBytes); + LogManager.resetConfiguration(); + InputStream log4jPropsAsStream = this.getClass().getResourceAsStream("/container-log4j.properties"); + PropertyConfigurator.configure(log4jPropsAsStream); + + // Mimic a job with a special committer that does not cleanup + // files when a task fails. + JobConf job = new JobConf(); + job.setOutputCommitter(CommitterWithoutCleanup.class); + Path outDir = new Path(rootDir, "output"); + FileOutputFormat.setOutputPath(job, outDir); + + // Mimic job setup + String dummyAttemptID = "attempt_201307121733_0001_m_000000_0"; + TaskAttemptID attemptID = TaskAttemptID.forName(dummyAttemptID); + OutputCommitter committer = new CommitterWithoutCleanup(); + JobContext jContext = new JobContextImpl(job, attemptID.getJobID()); + committer.setupJob(jContext); + + // Mimic a map task + dummyAttemptID = "attempt_201307121733_0001_m_000001_0"; + attemptID = TaskAttemptID.forName(dummyAttemptID); + Task task = new MapTask(null, attemptID, 0, null, 1); + task.setConf(job); + task.localizeConfiguration(job); + task.initialize(job, attemptID.getJobID(), Reporter.NULL, false); + + // Mimic the map task writing some output. + String file = "testLogFlush.txt"; + FileSystem localFs = FileSystem.getLocal(job); + TextOutputFormat theOutputFormat + = new TextOutputFormat(); + RecordWriter theRecordWriter = + theOutputFormat.getRecordWriter(localFs, + job, file, Reporter.NULL); + theRecordWriter.write(new Text("key"), new Text("value")); + theRecordWriter.close(Reporter.NULL); + + char[] logLine = new char[100]; + Arrays.fill(logLine, 'a'); + String logStr = new String(logLine); + for (int i = 1; i < 81; i++) { + LOG.info(i); + LOG.info(logStr); + } + + task.setTaskCleanupTask(); + + MyUmbilical umbilical = new MyUmbilical(); + task.run(job, umbilical); + LogUtils.triggerFlush(LOG, new Exception()); + LogManager.shutdown(); + // the log file truncation is an approximate thing, adding buffer 3000 char to expected max limit + assertTrue("Log file truncation failed!", + (localFs.getFileStatus(new Path(tmpLogDir, "syslog"))).getLen() < Integer.parseInt(userLogSizeLimitInBytes) + 3000 ); + } + + public static void main(String[] argv) throws Exception { TestTaskCommit td = new TestTaskCommit(); Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ContainerLogAppender.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ContainerLogAppender.java (revision 1528266) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ContainerLogAppender.java (working copy) @@ -19,9 +19,11 @@ package org.apache.hadoop.yarn; import java.io.File; +import java.util.Iterator; import java.util.LinkedList; import java.util.Queue; +import org.apache.hadoop.yarn.util.LogUtils; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.log4j.FileAppender; @@ -44,6 +46,22 @@ synchronized (this) { if (maxEvents > 0) { tail = new LinkedList(); + + // Try to salvage as much as possible when JVM is killed. + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + synchronized (this) + { + if ((tail != null) && (tail.size() > 0)) { + System.err.println(this.getClass().getName() + + " Error: logger buffer is not empty when shutting down. Log may be truncated! Log entry remaining:" + + tail.size() + ". Starting with: " + tail.peek().getMessage()); + } + flush(); + } + } + }); } setFile(new File(this.containerLogDir, "syslog").toString()); setAppend(true); @@ -61,11 +79,20 @@ tail.remove(); } tail.add(event); + processFlushEvent(event); } } } - - public void flush() { + + public synchronized void flush() { + if (tail != null) { + Iterator iter = tail.iterator(); + while (iter.hasNext()) { + LoggingEvent event = iter.next(); + super.append(event); + iter.remove(); + } + } if (qw != null) { qw.flush(); } @@ -73,14 +100,22 @@ @Override public synchronized void close() { - if (tail != null) { - for(LoggingEvent event: tail) { - super.append(event); - } - } + flush(); super.close(); } + private void processFlushEvent(LoggingEvent event) { + if (event.getMessage() == null || !event.getMessage().equals(LogUtils.MESSAGE_FLUSH_LOG_NOW)) { + return; + } + // print the stack info if passed in with such usage as: LogUtils.triggerFlush(LOG, new Exception()); + if (event.getThrowableInformation() != null + && event.getThrowableInformation().getThrowable() != null) { + event.getThrowableInformation().getThrowable().printStackTrace(System.out); + } + flush(); + } + /** * Getter/Setter methods for log4j. */ Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/LogUtils.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/LogUtils.java (revision 0) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/LogUtils.java (revision 0) @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.util; + +import org.apache.commons.logging.Log; + +/** + * help to trigger log flushing with pre-defined log message, + * supported by {@link org.apache.hadoop.yarn.ContainerLogAppender} class + */ +public class LogUtils { + + public static final String MESSAGE_FLUSH_LOG_NOW = "FLUSH_LOG_NOW"; + + public static void triggerFlush(Log logger) { + logger.info(MESSAGE_FLUSH_LOG_NOW); + } + + public static void triggerFlush(Log logger, Exception e) { + logger.info(MESSAGE_FLUSH_LOG_NOW, e); + } +}