diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 015baa1..25051df 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -755,10 +755,37 @@ private static void addDeprecatedKeys() { * The remote log dir will be created at * NM_REMOTE_APP_LOG_DIR/${user}/NM_REMOTE_APP_LOG_DIR_SUFFIX/${appId} */ - public static final String NM_REMOTE_APP_LOG_DIR_SUFFIX = + public static final String NM_REMOTE_APP_LOG_DIR_SUFFIX = NM_PREFIX + "remote-app-log-dir-suffix"; public static final String DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX="logs"; + /** Whether to enable aggregated log compaction */ + public static final String AGGREGATION_LOG_COMPACTION_ENABLED = YARN_PREFIX + + "aggregated-log-compaction-enable"; + public static final boolean DEFAULT_AGGREGATION_LOG_COMPACTION_ENABLED = + false; + + /** Aggregated log compaction ZooKeeper connection string */ + public static final String AGGREGATION_LOG_COMPACTION_ZOOKEEPER_CONNECTION_STRING = + YARN_PREFIX + "aggregated-log-compaction-zookeeper-connection-string"; + public static final String DEFAULT_AGGREGATION_LOG_COMPACTION_ZOOKEEPER_CONNECTION_STRING = + "localhost:2181"; + + /** Aggregated log compaction distributed lock location in ZooKeeper */ + public static final String AGGREGATION_LOG_COMPACTION_LOCK_LOCATION = + YARN_PREFIX + "aggregated-log-compaction-lock-location"; + public static final String DEFAULT_AGGREGATION_LOG_COMPACTION_LOCK_LOCATION = + "/yarn/log/compaction"; + + /** + * Aggregated log compaction distributed lock reaper leader election location + * in ZooKeeper + * */ + public static final String AGGREGATION_LOG_COMPACTION_LOCK_REAPER_LEADER_LOCATION = + YARN_PREFIX + "aggregated-log-compaction-lock-reaper-leader-location"; + public static final String DEFAULT_AGGREGATION_LOG_COMPACTION_LOCK_REAPER_LEADER_LOCATION = + "/yarn/log/compaction_leader"; + public static final String YARN_LOG_SERVER_URL = YARN_PREFIX + "log.server.url"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java index 4c1d152..811d5a5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java @@ -46,7 +46,7 @@ import com.google.common.annotations.VisibleForTesting; /** - * A service that periodically deletes aggregated logs. + * A service that periodically deletes aggregated and compacted aggregated logs. */ @InterfaceAudience.LimitedPrivate({"yarn", "mapreduce"}) public class AggregatedLogDeletionService extends AbstractService { @@ -76,7 +76,7 @@ public LogDeletionTask(Configuration conf, long retentionSecs, ApplicationClient @Override public void run() { long cutoffMillis = System.currentTimeMillis() - retentionMillis; - LOG.info("aggregated log deletion started."); + LOG.info("aggregated and compacted aggregated log deletion started."); try { FileSystem fs = remoteRootLogDir.getFileSystem(conf); for(FileStatus userDir : fs.listStatus(remoteRootLogDir)) { @@ -89,7 +89,7 @@ public void run() { logIOException("Error reading root log dir this deletion " + "attempt is being aborted", e); } - LOG.info("aggregated log deletion finished."); + LOG.info("aggregated and compacted aggregated log deletion finished."); } private static void deleteOldLogDirsFrom(Path dir, long cutoffMillis, @@ -103,7 +103,8 @@ private static void deleteOldLogDirsFrom(Path dir, long cutoffMillis, .getPath().getName()), rmClient); if(appTerminated && shouldDeleteLogDir(appDir, cutoffMillis, fs)) { try { - LOG.info("Deleting aggregated logs in "+appDir.getPath()); + LOG.info("Deleting aggregated and compacted aggregated logs in " + +appDir.getPath()); fs.delete(appDir.getPath(), true); } catch (IOException e) { logIOException("Could not delete "+appDir.getPath(), e); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index b669332..2ad1063 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -462,7 +462,7 @@ public void close() { @Public @Evolving - public static class LogReader { + public static class LogReader implements LogFormatReader { private final FSDataInputStream fsDataIStream; private final TFile.Reader.Scanner scanner; @@ -486,6 +486,7 @@ public LogReader(Configuration conf, Path remoteAppLogFile) * @return the application owner. * @throws IOException */ + @Override public String getApplicationOwner() throws IOException { TFile.Reader.Scanner ownerScanner = reader.createScanner(); LogKey key = new LogKey(); @@ -508,6 +509,7 @@ public String getApplicationOwner() throws IOException { * @return a map of the Application ACLs. * @throws IOException */ + @Override public Map getApplicationAcls() throws IOException { // TODO Seek directly to the key once a comparator is specified. @@ -579,6 +581,7 @@ public DataInputStream next(LogKey key) throws IOException { * @throws IOException */ @Private + @Override public ContainerLogsReader getContainerLogsReader( ContainerId containerId) throws IOException { ContainerLogsReader logReader = null; @@ -742,6 +745,7 @@ public static void readAContainerLogsForALogType( readAContainerLogsForALogType(valueStream, out, -1); } + @Override public void close() { IOUtils.cleanup(LOG, scanner, reader, fsDataIStream); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/CompactedAggregatedLogFormat.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/CompactedAggregatedLogFormat.java new file mode 100644 index 0000000..7ab43ab --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/CompactedAggregatedLogFormat.java @@ -0,0 +1,293 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.logaggregation; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.input.BoundedInputStream; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ContainerId; + +import java.io.DataInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.security.PrivilegedExceptionAction; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Map; +import java.util.Scanner; +import java.util.Set; + +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class CompactedAggregatedLogFormat { + + private static final int CURRENT_VERSION = 1; + private static final String DELIMITER = ","; + private static final String LINE_DELIMITER = System.lineSeparator(); + + /** + * Umask for the log file. + */ + private static final FsPermission APP_LOG_FILE_UMASK = FsPermission + .createImmutable((short) (0640 ^ 0777)); + + public CompactedAggregatedLogFormat() { + } + + public static class LogWriter { + + private final FSDataOutputStream fsDataOStream; + private final OutputStreamWriter indexWriter; + private FileContext fc; + private long currentOffset; + private int indexLineCount; + + public LogWriter(final Configuration conf, final Path indexFile, + final Path logFile, final UserGroupInformation userUgi) + throws IOException { + try { + this.fc = + userUgi.doAs(new PrivilegedExceptionAction() { + @Override + public FileContext run() throws Exception { + fc = FileContext.getFileContext(conf); + fc.setUMask(APP_LOG_FILE_UMASK); + return fc; + } + }); + } catch (InterruptedException e) { + throw new IOException(e); + } + + try { + currentOffset = fc.getFileStatus(logFile).getLen(); + + // Verify the header is not missing any lines + if (currentOffset > 0l) { + Scanner indexScanner = null; + try { + indexScanner = new Scanner(fc.open(indexFile)); + indexLineCount = 0; + while (indexLineCount < 3 && indexScanner.hasNextLine()) { + indexScanner.nextLine(); + indexLineCount++; + } + } finally { + if (indexScanner != null) { + indexScanner.close(); + } + } + } + } catch (FileNotFoundException fnfe) { + currentOffset = 0l; + indexLineCount = 0; + } + + this.fsDataOStream = fc.create( + logFile, + EnumSet.of(CreateFlag.CREATE, CreateFlag.APPEND), + new Options.CreateOpts[]{}); + + indexWriter = new OutputStreamWriter(fc.create( + indexFile, + EnumSet.of(CreateFlag.CREATE, CreateFlag.APPEND), + new Options.CreateOpts[]{})); + } + + private void writeApplicationACLs(Map appAcls) + throws IOException { + indexWriter.write(LINE_DELIMITER + appAcls.size()); + for (Map.Entry entry : appAcls.entrySet()) { + indexWriter.write(DELIMITER + entry.getKey() + + DELIMITER + entry.getValue()); + } + } + + public void append(AggregatedLogFormat.LogReader reader) throws IOException { + if (indexLineCount < 3) { + switch(indexLineCount) { + case 0: + indexWriter.write(Integer.toString(CURRENT_VERSION)); + indexLineCount++; + case 1: + writeApplicationACLs(reader.getApplicationAcls()); + indexLineCount++; + case 2: + indexWriter.write(LINE_DELIMITER + reader.getApplicationOwner()); + indexLineCount++; + } + } + + AggregatedLogFormat.LogKey key = new AggregatedLogFormat.LogKey(); + DataInputStream valueStream = reader.next(key); + while (valueStream != null) { + indexWriter.write(LINE_DELIMITER + key); + long length = IOUtils.copyLarge(valueStream, fsDataOStream); + indexWriter.write(DELIMITER + currentOffset + DELIMITER + length); + currentOffset += length; + + // Next container + key = new AggregatedLogFormat.LogKey(); + valueStream = reader.next(key); + } + } + + public void close() { + org.apache.hadoop.io.IOUtils.closeStream(indexWriter); + org.apache.hadoop.io.IOUtils.closeStream(fsDataOStream); + } + } + + public static class LogReader implements LogFormatReader { + + private final FSDataInputStream fsDataIStream; + private Map acls; + private String owner; + private Map index; + + public LogReader(Configuration conf, Path indexFile, Path logFile) throws IOException { + FileContext fileContext = FileContext.getFileContext(conf); + + this.fsDataIStream = fileContext.open(logFile); + + Scanner indexScanner = null; + try { + indexScanner = new Scanner(fileContext.open(indexFile)); + loadIndexFile(indexScanner); + } finally { + org.apache.hadoop.io.IOUtils.closeStream(indexScanner); + } + } + + @Override + public Map getApplicationAcls() { + return acls; + } + + @Override + public String getApplicationOwner() { + return owner; + } + + private void readApplicationACLs(Scanner indexScanner) throws IOException { + String line = indexScanner.nextLine(); + Scanner sc = new Scanner(line); + sc.useDelimiter(DELIMITER); + int numAcls = sc.nextInt(); + acls = new HashMap(numAcls); + for (int i = 0; i < numAcls; i++) { + String appAccessOp = sc.next(); + String aclString = sc.next(); + acls.put(ApplicationAccessType.valueOf(appAccessOp), aclString); + } + } + + private void loadIndexFile(Scanner indexScanner) throws IOException { + String line = indexScanner.nextLine(); + int version = Integer.parseInt(line); + if (version != CURRENT_VERSION) { + throw new IOException("Incorrect version: expected " + CURRENT_VERSION + + " but found " + version); + } + readApplicationACLs(indexScanner); + owner = indexScanner.nextLine(); + + index = new HashMap(); + while (indexScanner.hasNextLine()) { + Scanner sc = new Scanner(indexScanner.nextLine()); + sc.useDelimiter(DELIMITER); + try { + String containerId = sc.next(); + long offset = sc.nextLong(); + long length = sc.nextLong(); + index.put(containerId, new IndexEntry(offset, length)); + } catch (Exception e) { + // this entry is invalid or incomplete; skip it + } + } + } + + @VisibleForTesting + public Set getContainerIds() { + return index.keySet(); + } + + /** + * Get a ContainerLogsReader to read the logs for + * the specified container. + * + * @param containerId + * @return object to read the container's logs or null if the + * logs could not be found + * @throws IOException + */ + @InterfaceAudience.Private + @Override + public AggregatedLogFormat.ContainerLogsReader getContainerLogsReader( + ContainerId containerId) throws IOException { + AggregatedLogFormat.ContainerLogsReader logReader = null; + IndexEntry iEntry = index.get(containerId.toString()); + if (iEntry != null) { + fsDataIStream.seek(iEntry.getOffset()); + BoundedInputStream boundedFsDataIStream = new BoundedInputStream(fsDataIStream, iEntry.getLength()); + DataInputStream valueStream = new DataInputStream(boundedFsDataIStream); + if (valueStream != null) { + logReader = new AggregatedLogFormat.ContainerLogsReader(valueStream); + } + } + return logReader; + } + + @Override + public void close() { + org.apache.hadoop.io.IOUtils.closeStream(fsDataIStream); + } + } + + private static class IndexEntry { + private long offset; + private long length; + + public IndexEntry(long offset, long length) { + this.offset = offset; + this.length = length; + } + + public long getOffset() { + return offset; + } + + public long getLength() { + return length; + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java index 34c9100..384f68a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java @@ -100,6 +100,17 @@ public static String getRemoteNodeLogDirSuffix(Configuration conf) { YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX); } + public static Path getRemoteCompactedAggregatedLogFileForApp( + Path remoteRootLogDir,ApplicationId appId, String user, String suffix) { + return new Path(getRemoteAppLogDir(remoteRootLogDir, appId, user, suffix), + appId.toString()); + } + + public static Path getRemoteCompactedAggregatedLogIndexFileForApp( + Path remoteRootLogDir, ApplicationId appId, String user, String suffix) { + return new Path(getRemoteAppLogDir(remoteRootLogDir, appId, user, suffix), + appId.toString() + ".index"); + } /** * Converts a nodeId to a form used in the app log file name. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogFormatReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogFormatReader.java new file mode 100644 index 0000000..6ac440a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogFormatReader.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.logaggregation; + +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ContainerId; + +import java.io.IOException; +import java.util.Map; + +public interface LogFormatReader { + + public String getApplicationOwner() throws IOException; + + public Map getApplicationAcls() + throws IOException; + + public AggregatedLogFormat.ContainerLogsReader getContainerLogsReader( + ContainerId containerId) throws IOException; + + public void close(); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java index 620d097..19b42fa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java @@ -41,7 +41,9 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; +import org.apache.hadoop.yarn.logaggregation.CompactedAggregatedLogFormat; import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; +import org.apache.hadoop.yarn.logaggregation.LogFormatReader; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Times; @@ -90,102 +92,149 @@ protected void render(Block html) { Path remoteRootLogDir = new Path(conf.get( YarnConfiguration.NM_REMOTE_APP_LOG_DIR, YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); - Path remoteAppDir = LogAggregationUtils.getRemoteAppLogDir( - remoteRootLogDir, applicationId, appOwner, - LogAggregationUtils.getRemoteNodeLogDirSuffix(conf)); - RemoteIterator nodeFiles; - try { - Path qualifiedLogDir = - FileContext.getFileContext(conf).makeQualified( - remoteAppDir); - nodeFiles = - FileContext.getFileContext(qualifiedLogDir.toUri(), conf) - .listStatus(remoteAppDir); - } catch (FileNotFoundException fnf) { - html.h1() - ._("Logs not available for " + logEntity - + ". Aggregation may not be complete, " - + "Check back later or try the nodemanager at " + nodeId)._(); - return; - } catch (Exception ex) { - html.h1() - ._("Error getting logs at " + nodeId)._(); - return; - } + String remoteNodeLogDirSuffix = + LogAggregationUtils.getRemoteNodeLogDirSuffix(conf); + boolean isCompacted = false; boolean foundLog = false; - String desiredLogType = $(CONTAINER_LOG_TYPE); - try { - while (nodeFiles.hasNext()) { - AggregatedLogFormat.LogReader reader = null; - try { - FileStatus thisNodeFile = nodeFiles.next(); - if (!thisNodeFile.getPath().getName() - .contains(LogAggregationUtils.getNodeString(nodeId)) - || thisNodeFile.getPath().getName() - .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) { - continue; - } - long logUploadedTime = thisNodeFile.getModificationTime(); + if (conf.getBoolean(YarnConfiguration.AGGREGATION_LOG_COMPACTION_ENABLED, + YarnConfiguration.DEFAULT_AGGREGATION_LOG_COMPACTION_ENABLED)) { + Path logFile = + LogAggregationUtils.getRemoteCompactedAggregatedLogFileForApp( + remoteRootLogDir, applicationId, appOwner, remoteNodeLogDirSuffix); + Path indexFile = + LogAggregationUtils.getRemoteCompactedAggregatedLogIndexFileForApp( + remoteRootLogDir, applicationId, appOwner, remoteNodeLogDirSuffix); + CompactedAggregatedLogFormat.LogReader reader = null; + try { + FileStatus indexFileStatus = + FileContext.getFileContext(conf).getFileStatus(indexFile); + if (indexFileStatus.isFile()) { + isCompacted = true; + String desiredLogType = $(CONTAINER_LOG_TYPE); + long logUploadedTime = indexFileStatus.getModificationTime(); reader = - new AggregatedLogFormat.LogReader(conf, thisNodeFile.getPath()); + new CompactedAggregatedLogFormat.LogReader( + conf, indexFile, logFile); - String owner = null; - Map appAcls = null; - try { - owner = reader.getApplicationOwner(); - appAcls = reader.getApplicationAcls(); - } catch (IOException e) { - LOG.error("Error getting logs for " + logEntity, e); - continue; - } - ApplicationACLsManager aclsManager = new ApplicationACLsManager(conf); - aclsManager.addApplication(applicationId, appAcls); + foundLog = readApplicationLogs(html, reader, logEntity, + applicationId, containerId, logFile, logLimits, desiredLogType, + logUploadedTime); + } + } catch (Exception e) { + LOG.error("Error getting compacted aggregated logs for " + logEntity + + "; will try to get aggregated logs", e); + } finally { + if (reader != null) + reader.close(); + } + } - String remoteUser = request().getRemoteUser(); - UserGroupInformation callerUGI = null; - if (remoteUser != null) { - callerUGI = UserGroupInformation.createRemoteUser(remoteUser); - } - if (callerUGI != null && !aclsManager.checkAccess(callerUGI, - ApplicationAccessType.VIEW_APP, owner, applicationId)) { - html.h1() - ._("User [" + remoteUser - + "] is not authorized to view the logs for " + logEntity - + " in log file [" + thisNodeFile.getPath().getName() + "]")._(); - LOG.error("User [" + remoteUser - + "] is not authorized to view the logs for " + logEntity); - continue; - } + if (!isCompacted || !foundLog) { + Path remoteAppDir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogDir, applicationId, appOwner, remoteNodeLogDirSuffix); + RemoteIterator nodeFiles; + try { + Path qualifiedLogDir = + FileContext.getFileContext(conf).makeQualified( + remoteAppDir); + nodeFiles = + FileContext.getFileContext(qualifiedLogDir.toUri(), conf) + .listStatus(remoteAppDir); + } catch (FileNotFoundException fnf) { + html.h1() + ._("Logs not available for " + logEntity + + ". Aggregation may not be complete, " + + "Check back later or try the nodemanager at " + nodeId)._(); + return; + } catch (Exception ex) { + html.h1() + ._("Error getting logs at " + nodeId)._(); + return; + } - AggregatedLogFormat.ContainerLogsReader logReader = reader - .getContainerLogsReader(containerId); - if (logReader == null) { + foundLog = false; + String desiredLogType = $(CONTAINER_LOG_TYPE); + try { + while (nodeFiles.hasNext()) { + AggregatedLogFormat.LogReader reader = null; + try { + FileStatus thisNodeFile = nodeFiles.next(); + if (!thisNodeFile.getPath().getName() + .contains(LogAggregationUtils.getNodeString(nodeId)) + || thisNodeFile.getPath().getName() + .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) { + continue; + } + long logUploadedTime = thisNodeFile.getModificationTime(); + reader = + new AggregatedLogFormat.LogReader(conf, thisNodeFile.getPath()); + + foundLog = readApplicationLogs(html, reader, logEntity, applicationId, + containerId, thisNodeFile.getPath(), logLimits, desiredLogType, + logUploadedTime); + } catch (IOException ex) { + LOG.error("Error getting logs for " + logEntity, ex); continue; + } finally { + if (reader != null) + reader.close(); } - - foundLog = readContainerLogs(html, logReader, logLimits, - desiredLogType, logUploadedTime); - } catch (IOException ex) { - LOG.error("Error getting logs for " + logEntity, ex); - continue; - } finally { - if (reader != null) - reader.close(); } - } - if (!foundLog) { - if (desiredLogType.isEmpty()) { - html.h1("No logs available for container " + containerId.toString()); - } else { - html.h1("Unable to locate '" + desiredLogType - + "' log for container " + containerId.toString()); + if (!foundLog) { + if (desiredLogType.isEmpty()) { + html.h1("No logs available for container " + containerId.toString()); + } else { + html.h1("Unable to locate '" + desiredLogType + + "' log for container " + containerId.toString()); + } } + } catch (IOException e) { + html.h1()._("Error getting logs for " + logEntity)._(); + LOG.error("Error getting logs for " + logEntity, e); } + } + } + + private boolean readApplicationLogs(Block html, LogFormatReader reader, + String logEntity, ApplicationId applicationId, ContainerId containerId, + Path logFile, LogLimits logLimits, String desiredLogType, long logUploadedTime) throws IOException { + String owner = null; + Map appAcls = null; + try { + owner = reader.getApplicationOwner(); + appAcls = reader.getApplicationAcls(); } catch (IOException e) { - html.h1()._("Error getting logs for " + logEntity)._(); LOG.error("Error getting logs for " + logEntity, e); + return false; + } + ApplicationACLsManager aclsManager = new ApplicationACLsManager(conf); + aclsManager.addApplication(applicationId, appAcls); + + String remoteUser = request().getRemoteUser(); + UserGroupInformation callerUGI = null; + if (remoteUser != null) { + callerUGI = UserGroupInformation.createRemoteUser(remoteUser); } + if (callerUGI != null && !aclsManager.checkAccess(callerUGI, + ApplicationAccessType.VIEW_APP, owner, applicationId)) { + html.h1() + ._("User [" + remoteUser + + "] is not authorized to view the logs for " + logEntity + + " in log file [" + logFile.getName() + "]")._(); + LOG.error("User [" + remoteUser + + "] is not authorized to view the logs for " + logEntity); + return false; + } + + AggregatedLogFormat.ContainerLogsReader logReader = reader + .getContainerLogsReader(containerId); + if (logReader == null) { + return false; + } + + return readContainerLogs(html, logReader, logLimits, + desiredLogType, logUploadedTime); } private boolean readContainerLogs(Block html, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 20887b6..4db4eb2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -35,6 +35,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; @@ -44,7 +46,6 @@ import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -53,9 +54,11 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter; +import org.apache.hadoop.yarn.logaggregation.CompactedAggregatedLogFormat; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; import org.apache.hadoop.yarn.server.nodemanager.Context; @@ -118,6 +121,12 @@ private final long rollingMonitorInterval; private final NodeId nodeId; + private boolean compactionEnabled; + private CuratorFramework curator; + private String compactedLogLock; + private Path remoteCompactedLogFileForApp; + private Path remoteCompactedLogIndexFileForApp; + private final Map containerLogAggregators = new HashMap(); @@ -191,6 +200,27 @@ public AppLogAggregatorImpl(Dispatcher dispatcher, } this.rollingMonitorInterval = configuredRollingMonitorInterval; } + compactionEnabled = false; + } + + public AppLogAggregatorImpl(Dispatcher dispatcher, + DeletionService deletionService, Configuration conf, + ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId, + LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp, + ContainerLogsRetentionPolicy retentionPolicy, + Map appAcls, + LogAggregationContext logAggregationContext, Context context, + FileContext lfs, CuratorFramework curator, String compactedLogLock, + Path remoteCompactedLogFileForApp, + Path remoteCompactedLogIndexFileForApp) { + this(dispatcher, deletionService, conf, appId, userUgi, nodeId, dirsHandler, + remoteNodeLogFileForApp, retentionPolicy, appAcls, + logAggregationContext, context, lfs); + this.curator = curator; + this.compactedLogLock = compactedLogLock; + this.remoteCompactedLogFileForApp = remoteCompactedLogFileForApp; + this.remoteCompactedLogIndexFileForApp = remoteCompactedLogIndexFileForApp; + compactionEnabled = true; } private void uploadLogsForContainers() { @@ -381,11 +411,70 @@ public void run() { } finally { if (!this.appAggregationFinished.get()) { LOG.warn("Aggregation did not complete for application " + appId); + } else { + if (compactionEnabled) { + // Append aggregated log to compacted file + doAppendAggregatedLogToCompactedFile(); + } } this.appAggregationFinished.set(true); } } + private void doAppendAggregatedLogToCompactedFile() { + LOG.info("Appending aggregated log to compacted aggregated log file for " + + appId); + InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex(curator, + compactedLogLock + "/" + appId); + try { + lock.acquire(); + // Append this Node's log file + append(remoteNodeLogFileForApp, remoteCompactedLogFileForApp, + remoteCompactedLogIndexFileForApp); + // Delete aggregated log file + userUgi.doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws IOException { + FileSystem remoteFS = FileSystem.get(conf); + remoteFS.delete(remoteNodeLogFileForApp, false); + return null; + } + }); + } catch (IOException ioe) { + LOG.error(ioe.getMessage(), ioe); + } catch (Exception e) { + LOG.error("Error acquiring lock for " + appId + ": " + e.getMessage(), e); + } finally { + try { + lock.release(); + } catch (Exception e) { + LOG.error("Error releasing lock for " + appId + ": " + e.getMessage(), + e); + } + } + LOG.info("Done appending aggregated log to compacted aggregated log file for " + + appId); + } + + private void append(Path sourceLogFile, Path compactedLogFile, + Path compactedLogIndex) throws IOException { + CompactedAggregatedLogFormat.LogWriter writer = null; + AggregatedLogFormat.LogReader reader = null; + try { + writer = new CompactedAggregatedLogFormat.LogWriter(conf, + compactedLogIndex, compactedLogFile, userUgi); + reader = new AggregatedLogFormat.LogReader(conf, sourceLogFile); + writer.append(reader); + } finally { + if (writer != null) { + writer.close(); + } + if (reader != null) { + reader.close(); + } + } + } + @SuppressWarnings("unchecked") private void doAppLogAggregation() { while (!this.appFinishing.get() && !this.aborted.get()) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index bd3e847..6ecea71 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -30,6 +30,13 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.locks.ChildReaper; +import org.apache.curator.framework.recipes.locks.Reaper; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.utils.ThreadUtils; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; @@ -97,6 +104,10 @@ Path remoteRootLogDir; String remoteRootLogDirSuffix; private NodeId nodeId; + private CuratorFramework curator; + private ChildReaper lockReaper; + private String compactedLogLock; + boolean compactionEnabled; private final ConcurrentMap appLogAggregators; @@ -125,6 +136,42 @@ protected void serviceInit(Configuration conf) throws Exception { conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX); + if (conf.getBoolean(YarnConfiguration.AGGREGATION_LOG_COMPACTION_ENABLED, + YarnConfiguration.DEFAULT_AGGREGATION_LOG_COMPACTION_ENABLED)) { + compactionEnabled = true; + // TODO: Curator security + String connectString = conf.get( + YarnConfiguration.AGGREGATION_LOG_COMPACTION_ZOOKEEPER_CONNECTION_STRING, + YarnConfiguration.DEFAULT_AGGREGATION_LOG_COMPACTION_ZOOKEEPER_CONNECTION_STRING); + this.compactedLogLock = conf.get( + YarnConfiguration.AGGREGATION_LOG_COMPACTION_LOCK_LOCATION, + YarnConfiguration.DEFAULT_AGGREGATION_LOG_COMPACTION_LOCK_LOCATION); + if (this.compactedLogLock.endsWith("/")) { + this.compactedLogLock = + this.compactedLogLock.substring(0, this.compactedLogLock.length()-1); + } + String compactedLogLockLeader = conf.get( + YarnConfiguration.AGGREGATION_LOG_COMPACTION_LOCK_REAPER_LEADER_LOCATION, + YarnConfiguration.DEFAULT_AGGREGATION_LOG_COMPACTION_LOCK_REAPER_LEADER_LOCATION); + if (compactedLogLockLeader.endsWith("/")) { + compactedLogLockLeader = + compactedLogLockLeader.substring(0, compactedLogLockLeader.length()-1); + } + RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); + this.curator = CuratorFrameworkFactory.builder() + .connectString(connectString) + .retryPolicy(retryPolicy) + .build(); + this.curator.start(); + this.lockReaper = new ChildReaper(this.curator, this.compactedLogLock, + Reaper.Mode.REAP_INDEFINITELY, + ThreadUtils.newFixedThreadScheduledPool(2, "CompactedLogLockReaper"), + 300, compactedLogLockLeader); + this.lockReaper.start(); + } else { + this.compactionEnabled = false; + } + super.serviceInit(conf); } @@ -140,6 +187,12 @@ protected void serviceStart() throws Exception { protected void serviceStop() throws Exception { LOG.info(this.getName() + " waiting for pending aggregation during exit"); stopAggregators(); + if (lockReaper != null) { + lockReaper.close(); + } + if (curator != null) { + curator.close(); + } super.serviceStop(); } @@ -226,6 +279,18 @@ Path getRemoteNodeLogFileForApp(ApplicationId appId, String user) { this.remoteRootLogDirSuffix); } + Path getRemoteCompactedLogFileForApp(ApplicationId appId, String user) { + return LogAggregationUtils.getRemoteCompactedAggregatedLogFileForApp( + this.remoteRootLogDir, appId, user, + this.remoteRootLogDirSuffix); + } + + Path getRemoteCompactedLogIndexFileForApp(ApplicationId appId, String user) { + return LogAggregationUtils.getRemoteCompactedAggregatedLogIndexFileForApp( + this.remoteRootLogDir, appId, user, + this.remoteRootLogDirSuffix); + } + Path getRemoteAppLogDir(ApplicationId appId, String user) { return LogAggregationUtils.getRemoteAppLogDir(this.remoteRootLogDir, appId, user, this.remoteRootLogDirSuffix); @@ -351,12 +416,24 @@ protected void initAppAggregator(final ApplicationId appId, String user, } // New application - final AppLogAggregator appLogAggregator = - new AppLogAggregatorImpl(this.dispatcher, this.deletionService, - getConfig(), appId, userUgi, this.nodeId, dirsHandler, - getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy, - appAcls, logAggregationContext, this.context, - getLocalFileContext(getConfig())); + final AppLogAggregator appLogAggregator; + if (compactionEnabled) { + appLogAggregator = + new AppLogAggregatorImpl(this.dispatcher, this.deletionService, + getConfig(), appId, userUgi, this.nodeId, dirsHandler, + getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy, + appAcls, logAggregationContext, this.context, + getLocalFileContext(getConfig()), curator, compactedLogLock, + getRemoteCompactedLogFileForApp(appId, user), + getRemoteCompactedLogIndexFileForApp(appId, user)); + } else { + appLogAggregator = + new AppLogAggregatorImpl(this.dispatcher, this.deletionService, + getConfig(), appId, userUgi, this.nodeId, dirsHandler, + getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy, + appAcls, logAggregationContext, this.context, + getLocalFileContext(getConfig())); + } if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) { throw new YarnRuntimeException("Duplicate initApp for " + appId); }