Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java (revision 1479472) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java (working copy) @@ -264,6 +264,19 @@ logValue.write(out); out.close(); } + + public void append(LogKey logKey, DataInputStream in) throws IOException { + DataOutputStream out = this.writer.prepareAppendKey(-1); + logKey.write(out); + out.close(); + out = this.writer.prepareAppendValue(-1); + byte[] buf = new byte[65535]; + int len = 0; + while ((len = in.read(buf)) != -1) { + out.write(buf, 0, len); + } + out.close(); + } public void closeWriter() { try { Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java (revision 1479472) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java (working copy) @@ -18,10 +18,14 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation; +import java.io.DataInputStream; import java.io.IOException; import java.security.PrivilegedExceptionAction; +import java.text.NumberFormat; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -29,8 +33,11 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -39,6 +46,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; @@ -71,6 +79,9 @@ private final AtomicBoolean appFinishing = new AtomicBoolean(); private final AtomicBoolean appAggregationFinished = new AtomicBoolean(); private final Map appAcls; + private final Set completedContainers; + private boolean needToCheckLocalDirsForAggregation; + private String containerPrefix; private LogWriter writer = null; @@ -92,8 +103,18 @@ this.retentionPolicy = retentionPolicy; this.pendingContainers = new LinkedBlockingQueue(); this.appAcls = appAcls; + this.completedContainers = new HashSet(); + this.containerPrefix = getContainerPrefix(appId); } + private static String getContainerPrefix(ApplicationId appId) { + NumberFormat fmt = NumberFormat.getInstance(); + fmt.setGroupingUsed(false); + fmt.setMinimumIntegerDigits(4); + return "container_" + appId.getClusterTimestamp() + "_" + + fmt.format(appId.getId()) + "_"; + } + private void uploadLogsForContainer(ContainerId containerId) { if (this.logAggregationDisabled) { @@ -111,7 +132,8 @@ //Write ACLs once when and if the writer is created. this.writer.writeApplicationACLs(appAcls); this.writer.writeApplicationOwner(this.userUgi.getShortUserName()); - } catch (IOException e) { + checkAndCopyPreviousLogs(); + } catch (Exception e) { LOG.error("Cannot create writer for app " + this.applicationId + ". Disabling log-aggregation for this app.", e); this.logAggregationDisabled = true; @@ -132,6 +154,38 @@ } } + private void checkAndCopyPreviousLogs() throws IOException, + InterruptedException { + userUgi.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + try { + FileSystem remoteFS = FileSystem.get(conf); + if (remoteFS.exists(remoteNodeLogFileForApp)) { + needToCheckLocalDirsForAggregation = true; + LogReader reader = new LogReader(conf, remoteNodeLogFileForApp); + LogKey key = new LogKey(); + DataInputStream valueStream = reader.next(key); + while (valueStream != null + && key.toString().startsWith(containerPrefix)) { + LOG.info("Copying logs from previous file to new file for container : " + + key.toString()); + writer.append(key, valueStream); + valueStream = reader.next(key); + } + remoteFS.delete(remoteNodeLogFileForApp, true); + } else if (remoteFS.exists(remoteNodeTmpLogFileForApp)) { + needToCheckLocalDirsForAggregation = true; + } + } catch (Exception e) { + LOG.error("Couldn't check for the existing aggregate log files.", + e); + } + return null; + } + }); + } + @Override public void run() { try { @@ -172,6 +226,34 @@ localAppLogDirs[index] = new Path(rootLogDir, this.applicationId); index++; } + if (needToCheckLocalDirsForAggregation) { + LOG.info("Checking local log dirs for missing " + + "containers aggregation. Not considering the status " + + "of the container for aggregation due to status unavailabiity."); + Set oldContainers = new HashSet(); + try { + FileContext context = FileContext.getLocalFSFileContext(); + for (Path localAppLogDir : localAppLogDirs) { + RemoteIterator listStatus = context + .listStatus(localAppLogDir); + while (listStatus.hasNext()) { + FileStatus fileStatus = listStatus.next(); + String name = fileStatus.getPath().getName(); + ContainerId id = ConverterUtils.toContainerId(name); + if (completedContainers.contains(id) == false) { + oldContainers.add(id); + } + } + } + } catch (Exception e) { + LOG.error("Error occured while checking for " + + "missing containers for aggregation", e); + } + for (ContainerId oldContainer : oldContainers) { + LOG.info("Uploading logs for missed container : " + oldContainer); + uploadLogsForContainer(oldContainer); + } + } this.delService.delete(this.userUgi.getShortUserName(), null, localAppLogDirs); @@ -245,6 +327,7 @@ + " for log-aggregation"); this.pendingContainers.add(containerId); } + this.completedContainers.add(containerId); } @Override