diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ChildReaper.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ChildReaper.java new file mode 100644 index 0000000..4b087a1 --- /dev/null +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ChildReaper.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.util.curator; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; +import org.apache.curator.framework.recipes.locks.Reaper; +import org.apache.curator.utils.CloseableUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.utils.CloseableScheduledExecutorService; +import org.apache.curator.utils.ThreadUtils; +import org.apache.curator.utils.ZKPaths; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.curator.utils.PathUtils; + +// This is a copy of Curator 2.7.1's ChildReaper class, modified to work with +// Guava 11.0.2. The problem is the 'paths' Collection, which calls Guava's +// Sets.newConcurrentHashSet(), which was added in Guava 15.0. +/** + * Utility to reap empty child nodes of a parent node. Periodically calls getChildren on + * the node and adds empty nodes to an internally managed {@link Reaper} + */ +public class ChildReaper implements Closeable +{ + private final Logger log = LoggerFactory.getLogger(getClass()); + private final Reaper reaper; + private final AtomicReference state = new AtomicReference(State.LATENT); + private final CuratorFramework client; + private final Collection paths = newConcurrentHashSet(); + private final Reaper.Mode mode; + private final CloseableScheduledExecutorService executor; + private final int reapingThresholdMs; + + private volatile Future task; + + // This is copied from Curator's Reaper class + static final int DEFAULT_REAPING_THRESHOLD_MS = (int)TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES); + + // This is copied from Guava + /** + * Creates a thread-safe set backed by a hash map. The set is backed by a + * {@link ConcurrentHashMap} instance, and thus carries the same concurrency + * guarantees. + * + *

Unlike {@code HashSet}, this class does NOT allow {@code null} to be + * used as an element. The set is serializable. + * + * @return a new, empty thread-safe {@code Set} + * @since 15.0 + */ + public static Set newConcurrentHashSet() { + return Sets.newSetFromMap(new ConcurrentHashMap()); + } + + private enum State + { + LATENT, + STARTED, + CLOSED + } + + /** + * @param client the client + * @param path path to reap children from + * @param mode reaping mode + */ + public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode) + { + this(client, path, mode, newExecutorService(), DEFAULT_REAPING_THRESHOLD_MS, null); + } + + /** + * @param client the client + * @param path path to reap children from + * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted + * @param mode reaping mode + */ + public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, int reapingThresholdMs) + { + this(client, path, mode, newExecutorService(), reapingThresholdMs, null); + } + + /** + * @param client the client + * @param path path to reap children from + * @param executor executor to use for background tasks + * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted + * @param mode reaping mode + */ + public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, ScheduledExecutorService executor, int reapingThresholdMs) + { + this(client, path, mode, executor, reapingThresholdMs, null); + } + + /** + * @param client the client + * @param path path to reap children from + * @param executor executor to use for background tasks + * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted + * @param mode reaping mode + * @param leaderPath if not null, uses a leader selection so that only 1 reaper is active in the cluster + */ + public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, ScheduledExecutorService executor, int reapingThresholdMs, String leaderPath) + { + this.client = client; + this.mode = mode; + this.executor = new CloseableScheduledExecutorService(executor); + this.reapingThresholdMs = reapingThresholdMs; + this.reaper = new Reaper(client, executor, reapingThresholdMs, leaderPath); + addPath(path); + } + + /** + * The reaper must be started + * + * @throws Exception errors + */ + public void start() throws Exception + { + Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once"); + + task = executor.scheduleWithFixedDelay + ( + new Runnable() + { + @Override + public void run() + { + doWork(); + } + }, + reapingThresholdMs, + reapingThresholdMs, + TimeUnit.MILLISECONDS + ); + + reaper.start(); + } + + @Override + public void close() throws IOException + { + if ( state.compareAndSet(State.STARTED, State.CLOSED) ) + { + CloseableUtils.closeQuietly(reaper); + task.cancel(true); + } + } + + /** + * Add a path to reap children from + * + * @param path the path + * @return this for chaining + */ + public ChildReaper addPath(String path) + { + paths.add(PathUtils.validatePath(path)); + return this; + } + + /** + * Remove a path from reaping + * + * @param path the path + * @return true if the path existed and was removed + */ + public boolean removePath(String path) + { + return paths.remove(PathUtils.validatePath(path)); + } + + private static ScheduledExecutorService newExecutorService() + { + return ThreadUtils.newFixedThreadScheduledPool(2, "ChildReaper"); + } + + private void doWork() + { + for ( String path : paths ) + { + try + { + List children = client.getChildren().forPath(path); + for ( String name : children ) + { + String thisPath = ZKPaths.makePath(path, name); + Stat stat = client.checkExists().forPath(thisPath); + if ( (stat != null) && (stat.getNumChildren() == 0) ) + { + reaper.addPath(thisPath, mode); + } + } + } + catch ( Exception e ) + { + log.error("Could not get children for path: " + path, e); + } + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index e0bbd7b..b251995 100644 --- hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -112,6 +112,11 @@ + + + + + @@ -288,6 +293,13 @@ + + + + + + + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 05c6cbf..9943bf4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -757,10 +757,37 @@ private static void addDeprecatedKeys() { * The remote log dir will be created at * NM_REMOTE_APP_LOG_DIR/${user}/NM_REMOTE_APP_LOG_DIR_SUFFIX/${appId} */ - public static final String NM_REMOTE_APP_LOG_DIR_SUFFIX = + public static final String NM_REMOTE_APP_LOG_DIR_SUFFIX = NM_PREFIX + "remote-app-log-dir-suffix"; public static final String DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX="logs"; + /** Whether to enable aggregated log compaction */ + public static final String AGGREGATION_LOG_COMPACTION_ENABLED = YARN_PREFIX + + "aggregated-log-compaction-enable"; + public static final boolean DEFAULT_AGGREGATION_LOG_COMPACTION_ENABLED = + false; + + /** Aggregated log compaction ZooKeeper connection string */ + public static final String AGGREGATION_LOG_COMPACTION_ZOOKEEPER_CONNECTION_STRING = + YARN_PREFIX + "aggregated-log-compaction-zookeeper-connection-string"; + public static final String DEFAULT_AGGREGATION_LOG_COMPACTION_ZOOKEEPER_CONNECTION_STRING = + "localhost:2181"; + + /** Aggregated log compaction distributed lock location in ZooKeeper */ + public static final String AGGREGATION_LOG_COMPACTION_LOCK_LOCATION = + YARN_PREFIX + "aggregated-log-compaction-lock-location"; + public static final String DEFAULT_AGGREGATION_LOG_COMPACTION_LOCK_LOCATION = + "/yarn/logs/compaction"; + + /** + * Aggregated log compaction distributed lock reaper leader election location + * in ZooKeeper + * */ + public static final String AGGREGATION_LOG_COMPACTION_LOCK_REAPER_LEADER_LOCATION = + YARN_PREFIX + "aggregated-log-compaction-lock-reaper-leader-location"; + public static final String DEFAULT_AGGREGATION_LOG_COMPACTION_LOCK_REAPER_LEADER_LOCATION = + "/yarn/logs/compaction_leader"; + public static final String YARN_LOG_SERVER_URL = YARN_PREFIX + "log.server.url"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java index 4c1d152..811d5a5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java @@ -46,7 +46,7 @@ import com.google.common.annotations.VisibleForTesting; /** - * A service that periodically deletes aggregated logs. + * A service that periodically deletes aggregated and compacted aggregated logs. */ @InterfaceAudience.LimitedPrivate({"yarn", "mapreduce"}) public class AggregatedLogDeletionService extends AbstractService { @@ -76,7 +76,7 @@ public LogDeletionTask(Configuration conf, long retentionSecs, ApplicationClient @Override public void run() { long cutoffMillis = System.currentTimeMillis() - retentionMillis; - LOG.info("aggregated log deletion started."); + LOG.info("aggregated and compacted aggregated log deletion started."); try { FileSystem fs = remoteRootLogDir.getFileSystem(conf); for(FileStatus userDir : fs.listStatus(remoteRootLogDir)) { @@ -89,7 +89,7 @@ public void run() { logIOException("Error reading root log dir this deletion " + "attempt is being aborted", e); } - LOG.info("aggregated log deletion finished."); + LOG.info("aggregated and compacted aggregated log deletion finished."); } private static void deleteOldLogDirsFrom(Path dir, long cutoffMillis, @@ -103,7 +103,8 @@ private static void deleteOldLogDirsFrom(Path dir, long cutoffMillis, .getPath().getName()), rmClient); if(appTerminated && shouldDeleteLogDir(appDir, cutoffMillis, fs)) { try { - LOG.info("Deleting aggregated logs in "+appDir.getPath()); + LOG.info("Deleting aggregated and compacted aggregated logs in " + +appDir.getPath()); fs.delete(appDir.getPath(), true); } catch (IOException e) { logIOException("Could not delete "+appDir.getPath(), e); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index b669332..2ad1063 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -462,7 +462,7 @@ public void close() { @Public @Evolving - public static class LogReader { + public static class LogReader implements LogFormatReader { private final FSDataInputStream fsDataIStream; private final TFile.Reader.Scanner scanner; @@ -486,6 +486,7 @@ public LogReader(Configuration conf, Path remoteAppLogFile) * @return the application owner. * @throws IOException */ + @Override public String getApplicationOwner() throws IOException { TFile.Reader.Scanner ownerScanner = reader.createScanner(); LogKey key = new LogKey(); @@ -508,6 +509,7 @@ public String getApplicationOwner() throws IOException { * @return a map of the Application ACLs. * @throws IOException */ + @Override public Map getApplicationAcls() throws IOException { // TODO Seek directly to the key once a comparator is specified. @@ -579,6 +581,7 @@ public DataInputStream next(LogKey key) throws IOException { * @throws IOException */ @Private + @Override public ContainerLogsReader getContainerLogsReader( ContainerId containerId) throws IOException { ContainerLogsReader logReader = null; @@ -742,6 +745,7 @@ public static void readAContainerLogsForALogType( readAContainerLogsForALogType(valueStream, out, -1); } + @Override public void close() { IOUtils.cleanup(LOG, scanner, reader, fsDataIStream); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/CompactedAggregatedLogFormat.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/CompactedAggregatedLogFormat.java new file mode 100644 index 0000000..ad57bc4 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/CompactedAggregatedLogFormat.java @@ -0,0 +1,335 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.logaggregation; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.input.BoundedInputStream; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ContainerId; + +import java.io.DataInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.security.PrivilegedExceptionAction; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Map; +import java.util.Scanner; +import java.util.Set; + +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class CompactedAggregatedLogFormat { + + private static final Log LOG = LogFactory.getLog(AggregatedLogFormat.class); + private static final int CURRENT_VERSION = 1; + private static final String DELIMITER = ","; + private static final String LINE_DELIMITER = System.lineSeparator(); + + private static StreamCopier streamCopier; + + static { + // Allows us to inject failures while copying streams for testing + if (System.getProperty("TestCompactedAggregatedLogFormat", "false") + .equals("true")) { + streamCopier = new StreamCopierForTest(); + } else { + streamCopier = new StreamCopier(); + } + } + + /** + * Umask for the log file. + */ + private static final FsPermission APP_LOG_FILE_UMASK = FsPermission + .createImmutable((short) (0640 ^ 0777)); + + public CompactedAggregatedLogFormat() { + } + + public static class LogWriter { + + private final FSDataOutputStream fsDataOStream; + private final OutputStreamWriter indexWriter; + private FileContext fc; + private long currentOffset; + private int indexLineCount; + + public LogWriter(final Configuration conf, final Path indexFile, + final Path logFile, final UserGroupInformation userUgi) + throws IOException { + try { + this.fc = + userUgi.doAs(new PrivilegedExceptionAction() { + @Override + public FileContext run() throws Exception { + fc = FileContext.getFileContext(conf); + fc.setUMask(APP_LOG_FILE_UMASK); + return fc; + } + }); + } catch (InterruptedException e) { + throw new IOException(e); + } + + try { + currentOffset = fc.getFileStatus(logFile).getLen(); + + // Verify the header is not missing any lines + if (currentOffset > 0l) { + Scanner indexScanner = null; + try { + indexScanner = new Scanner(fc.open(indexFile), "utf-8"); + indexLineCount = 0; + while (indexLineCount < 3 && indexScanner.hasNextLine()) { + indexScanner.nextLine(); + indexLineCount++; + } + } finally { + if (indexScanner != null) { + indexScanner.close(); + } + } + } + } catch (FileNotFoundException fnfe) { + currentOffset = 0l; + indexLineCount = 0; + } + + this.fsDataOStream = fc.create( + logFile, + EnumSet.of(CreateFlag.CREATE, CreateFlag.APPEND), + new Options.CreateOpts[]{}); + + indexWriter = new OutputStreamWriter(fc.create( + indexFile, + EnumSet.of(CreateFlag.CREATE, CreateFlag.APPEND), + new Options.CreateOpts[]{}), "utf-8"); + } + + private void writeApplicationACLs(Map appAcls) + throws IOException { + indexWriter.write(LINE_DELIMITER + appAcls.size()); + for (Map.Entry entry : appAcls.entrySet()) { + indexWriter.write(DELIMITER + entry.getKey() + + DELIMITER + entry.getValue()); + } + } + + public void append(AggregatedLogFormat.LogReader reader) throws IOException { + if (indexLineCount < 3) { + switch (indexLineCount) { + case 0: + indexWriter.write(Integer.toString(CURRENT_VERSION)); + indexLineCount++; + case 1: + writeApplicationACLs(reader.getApplicationAcls()); + indexLineCount++; + case 2: + indexWriter.write(LINE_DELIMITER + reader.getApplicationOwner()); + indexLineCount++; + default: + break; + } + } + + AggregatedLogFormat.LogKey key = new AggregatedLogFormat.LogKey(); + DataInputStream valueStream = reader.next(key); + while (valueStream != null) { + indexWriter.write(LINE_DELIMITER + key); + long length = streamCopier.copy(valueStream, fsDataOStream); + indexWriter.write(DELIMITER + currentOffset + DELIMITER + length); + currentOffset += length; + + // Next container + key = new AggregatedLogFormat.LogKey(); + valueStream = reader.next(key); + } + } + + public void close() { + org.apache.hadoop.io.IOUtils.closeStream(indexWriter); + org.apache.hadoop.io.IOUtils.closeStream(fsDataOStream); + } + } + + public static class LogReader implements LogFormatReader { + + private final FSDataInputStream fsDataIStream; + private Map acls; + private String owner; + private Map index; + + public LogReader(Configuration conf, Path indexFile, Path logFile) throws IOException { + FileContext fileContext = FileContext.getFileContext(conf); + + this.fsDataIStream = fileContext.open(logFile); + + Scanner indexScanner = null; + try { + indexScanner = new Scanner(fileContext.open(indexFile), "utf-8"); + loadIndexFile(indexScanner); + } finally { + org.apache.hadoop.io.IOUtils.closeStream(indexScanner); + } + } + + @Override + public Map getApplicationAcls() { + return acls; + } + + @Override + public String getApplicationOwner() { + return owner; + } + + private void readApplicationACLs(Scanner indexScanner) throws IOException { + String line = indexScanner.nextLine(); + Scanner sc = new Scanner(line); + sc.useDelimiter(DELIMITER); + int numAcls = sc.nextInt(); + acls = new HashMap(numAcls); + for (int i = 0; i < numAcls; i++) { + String appAccessOp = sc.next(); + String aclString = sc.next(); + acls.put(ApplicationAccessType.valueOf(appAccessOp), aclString); + } + } + + private void loadIndexFile(Scanner indexScanner) throws IOException { + String line = indexScanner.nextLine(); + int version = Integer.parseInt(line); + if (version != CURRENT_VERSION) { + throw new IOException("Incorrect version: expected " + CURRENT_VERSION + + " but found " + version); + } + readApplicationACLs(indexScanner); + owner = indexScanner.nextLine(); + + index = new HashMap(); + while (indexScanner.hasNextLine()) { + Scanner sc = new Scanner(indexScanner.nextLine()); + sc.useDelimiter(DELIMITER); + try { + String containerId = sc.next(); + long offset = sc.nextLong(); + long length = sc.nextLong(); + index.put(containerId, new IndexEntry(offset, length)); + } catch (Exception e) { + // this entry is invalid or incomplete; skip it + LOG.debug("Invalid index entry; skipping", e); + } + } + } + + @VisibleForTesting + public Set getContainerIds() { + return index.keySet(); + } + + /** + * Get a ContainerLogsReader to read the logs for + * the specified container. + * + * @param containerId + * @return object to read the container's logs or null if the + * logs could not be found + * @throws IOException + */ + @InterfaceAudience.Private + @Override + public AggregatedLogFormat.ContainerLogsReader getContainerLogsReader( + ContainerId containerId) throws IOException { + AggregatedLogFormat.ContainerLogsReader logReader = null; + IndexEntry iEntry = index.get(containerId.toString()); + if (iEntry != null) { + fsDataIStream.seek(iEntry.getOffset()); + BoundedInputStream boundedFsDataIStream = new BoundedInputStream(fsDataIStream, iEntry.getLength()); + DataInputStream valueStream = new DataInputStream(boundedFsDataIStream); + if (valueStream != null) { + logReader = new AggregatedLogFormat.ContainerLogsReader(valueStream); + } + } + return logReader; + } + + @Override + public void close() { + org.apache.hadoop.io.IOUtils.closeStream(fsDataIStream); + } + } + + private static class IndexEntry { + private long offset; + private long length; + + public IndexEntry(long offset, long length) { + this.offset = offset; + this.length = length; + } + + public long getOffset() { + return offset; + } + + public long getLength() { + return length; + } + } + + protected static class StreamCopier { + public long copy(InputStream input, OutputStream output) + throws IOException { + return IOUtils.copyLarge(input, output); + } + } + + protected static class StreamCopierForTest extends StreamCopier { + public long copy(InputStream input, OutputStream output) + throws IOException { + String cause = System.getProperty("TestCompactedAggregatedLogFormat_Cause"); + if (cause != null) { + if (cause.equals("IOException")) { + throw new IOException("Injected IO Failure!"); + } else if (cause.equals("Exit")) { + System.exit(1); + } + } + return IOUtils.copyLarge(input, output); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java index 34c9100..384f68a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java @@ -100,6 +100,17 @@ public static String getRemoteNodeLogDirSuffix(Configuration conf) { YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX); } + public static Path getRemoteCompactedAggregatedLogFileForApp( + Path remoteRootLogDir,ApplicationId appId, String user, String suffix) { + return new Path(getRemoteAppLogDir(remoteRootLogDir, appId, user, suffix), + appId.toString()); + } + + public static Path getRemoteCompactedAggregatedLogIndexFileForApp( + Path remoteRootLogDir, ApplicationId appId, String user, String suffix) { + return new Path(getRemoteAppLogDir(remoteRootLogDir, appId, user, suffix), + appId.toString() + ".index"); + } /** * Converts a nodeId to a form used in the app log file name. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogFormatReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogFormatReader.java new file mode 100644 index 0000000..6ac440a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogFormatReader.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.logaggregation; + +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ContainerId; + +import java.io.IOException; +import java.util.Map; + +public interface LogFormatReader { + + public String getApplicationOwner() throws IOException; + + public Map getApplicationAcls() + throws IOException; + + public AggregatedLogFormat.ContainerLogsReader getContainerLogsReader( + ContainerId containerId) throws IOException; + + public void close(); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java index 620d097..19b42fa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java @@ -41,7 +41,9 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; +import org.apache.hadoop.yarn.logaggregation.CompactedAggregatedLogFormat; import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; +import org.apache.hadoop.yarn.logaggregation.LogFormatReader; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Times; @@ -90,102 +92,149 @@ protected void render(Block html) { Path remoteRootLogDir = new Path(conf.get( YarnConfiguration.NM_REMOTE_APP_LOG_DIR, YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); - Path remoteAppDir = LogAggregationUtils.getRemoteAppLogDir( - remoteRootLogDir, applicationId, appOwner, - LogAggregationUtils.getRemoteNodeLogDirSuffix(conf)); - RemoteIterator nodeFiles; - try { - Path qualifiedLogDir = - FileContext.getFileContext(conf).makeQualified( - remoteAppDir); - nodeFiles = - FileContext.getFileContext(qualifiedLogDir.toUri(), conf) - .listStatus(remoteAppDir); - } catch (FileNotFoundException fnf) { - html.h1() - ._("Logs not available for " + logEntity - + ". Aggregation may not be complete, " - + "Check back later or try the nodemanager at " + nodeId)._(); - return; - } catch (Exception ex) { - html.h1() - ._("Error getting logs at " + nodeId)._(); - return; - } + String remoteNodeLogDirSuffix = + LogAggregationUtils.getRemoteNodeLogDirSuffix(conf); + boolean isCompacted = false; boolean foundLog = false; - String desiredLogType = $(CONTAINER_LOG_TYPE); - try { - while (nodeFiles.hasNext()) { - AggregatedLogFormat.LogReader reader = null; - try { - FileStatus thisNodeFile = nodeFiles.next(); - if (!thisNodeFile.getPath().getName() - .contains(LogAggregationUtils.getNodeString(nodeId)) - || thisNodeFile.getPath().getName() - .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) { - continue; - } - long logUploadedTime = thisNodeFile.getModificationTime(); + if (conf.getBoolean(YarnConfiguration.AGGREGATION_LOG_COMPACTION_ENABLED, + YarnConfiguration.DEFAULT_AGGREGATION_LOG_COMPACTION_ENABLED)) { + Path logFile = + LogAggregationUtils.getRemoteCompactedAggregatedLogFileForApp( + remoteRootLogDir, applicationId, appOwner, remoteNodeLogDirSuffix); + Path indexFile = + LogAggregationUtils.getRemoteCompactedAggregatedLogIndexFileForApp( + remoteRootLogDir, applicationId, appOwner, remoteNodeLogDirSuffix); + CompactedAggregatedLogFormat.LogReader reader = null; + try { + FileStatus indexFileStatus = + FileContext.getFileContext(conf).getFileStatus(indexFile); + if (indexFileStatus.isFile()) { + isCompacted = true; + String desiredLogType = $(CONTAINER_LOG_TYPE); + long logUploadedTime = indexFileStatus.getModificationTime(); reader = - new AggregatedLogFormat.LogReader(conf, thisNodeFile.getPath()); + new CompactedAggregatedLogFormat.LogReader( + conf, indexFile, logFile); - String owner = null; - Map appAcls = null; - try { - owner = reader.getApplicationOwner(); - appAcls = reader.getApplicationAcls(); - } catch (IOException e) { - LOG.error("Error getting logs for " + logEntity, e); - continue; - } - ApplicationACLsManager aclsManager = new ApplicationACLsManager(conf); - aclsManager.addApplication(applicationId, appAcls); + foundLog = readApplicationLogs(html, reader, logEntity, + applicationId, containerId, logFile, logLimits, desiredLogType, + logUploadedTime); + } + } catch (Exception e) { + LOG.error("Error getting compacted aggregated logs for " + logEntity + + "; will try to get aggregated logs", e); + } finally { + if (reader != null) + reader.close(); + } + } - String remoteUser = request().getRemoteUser(); - UserGroupInformation callerUGI = null; - if (remoteUser != null) { - callerUGI = UserGroupInformation.createRemoteUser(remoteUser); - } - if (callerUGI != null && !aclsManager.checkAccess(callerUGI, - ApplicationAccessType.VIEW_APP, owner, applicationId)) { - html.h1() - ._("User [" + remoteUser - + "] is not authorized to view the logs for " + logEntity - + " in log file [" + thisNodeFile.getPath().getName() + "]")._(); - LOG.error("User [" + remoteUser - + "] is not authorized to view the logs for " + logEntity); - continue; - } + if (!isCompacted || !foundLog) { + Path remoteAppDir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogDir, applicationId, appOwner, remoteNodeLogDirSuffix); + RemoteIterator nodeFiles; + try { + Path qualifiedLogDir = + FileContext.getFileContext(conf).makeQualified( + remoteAppDir); + nodeFiles = + FileContext.getFileContext(qualifiedLogDir.toUri(), conf) + .listStatus(remoteAppDir); + } catch (FileNotFoundException fnf) { + html.h1() + ._("Logs not available for " + logEntity + + ". Aggregation may not be complete, " + + "Check back later or try the nodemanager at " + nodeId)._(); + return; + } catch (Exception ex) { + html.h1() + ._("Error getting logs at " + nodeId)._(); + return; + } - AggregatedLogFormat.ContainerLogsReader logReader = reader - .getContainerLogsReader(containerId); - if (logReader == null) { + foundLog = false; + String desiredLogType = $(CONTAINER_LOG_TYPE); + try { + while (nodeFiles.hasNext()) { + AggregatedLogFormat.LogReader reader = null; + try { + FileStatus thisNodeFile = nodeFiles.next(); + if (!thisNodeFile.getPath().getName() + .contains(LogAggregationUtils.getNodeString(nodeId)) + || thisNodeFile.getPath().getName() + .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) { + continue; + } + long logUploadedTime = thisNodeFile.getModificationTime(); + reader = + new AggregatedLogFormat.LogReader(conf, thisNodeFile.getPath()); + + foundLog = readApplicationLogs(html, reader, logEntity, applicationId, + containerId, thisNodeFile.getPath(), logLimits, desiredLogType, + logUploadedTime); + } catch (IOException ex) { + LOG.error("Error getting logs for " + logEntity, ex); continue; + } finally { + if (reader != null) + reader.close(); } - - foundLog = readContainerLogs(html, logReader, logLimits, - desiredLogType, logUploadedTime); - } catch (IOException ex) { - LOG.error("Error getting logs for " + logEntity, ex); - continue; - } finally { - if (reader != null) - reader.close(); } - } - if (!foundLog) { - if (desiredLogType.isEmpty()) { - html.h1("No logs available for container " + containerId.toString()); - } else { - html.h1("Unable to locate '" + desiredLogType - + "' log for container " + containerId.toString()); + if (!foundLog) { + if (desiredLogType.isEmpty()) { + html.h1("No logs available for container " + containerId.toString()); + } else { + html.h1("Unable to locate '" + desiredLogType + + "' log for container " + containerId.toString()); + } } + } catch (IOException e) { + html.h1()._("Error getting logs for " + logEntity)._(); + LOG.error("Error getting logs for " + logEntity, e); } + } + } + + private boolean readApplicationLogs(Block html, LogFormatReader reader, + String logEntity, ApplicationId applicationId, ContainerId containerId, + Path logFile, LogLimits logLimits, String desiredLogType, long logUploadedTime) throws IOException { + String owner = null; + Map appAcls = null; + try { + owner = reader.getApplicationOwner(); + appAcls = reader.getApplicationAcls(); } catch (IOException e) { - html.h1()._("Error getting logs for " + logEntity)._(); LOG.error("Error getting logs for " + logEntity, e); + return false; + } + ApplicationACLsManager aclsManager = new ApplicationACLsManager(conf); + aclsManager.addApplication(applicationId, appAcls); + + String remoteUser = request().getRemoteUser(); + UserGroupInformation callerUGI = null; + if (remoteUser != null) { + callerUGI = UserGroupInformation.createRemoteUser(remoteUser); } + if (callerUGI != null && !aclsManager.checkAccess(callerUGI, + ApplicationAccessType.VIEW_APP, owner, applicationId)) { + html.h1() + ._("User [" + remoteUser + + "] is not authorized to view the logs for " + logEntity + + " in log file [" + logFile.getName() + "]")._(); + LOG.error("User [" + remoteUser + + "] is not authorized to view the logs for " + logEntity); + return false; + } + + AggregatedLogFormat.ContainerLogsReader logReader = reader + .getContainerLogsReader(containerId); + if (logReader == null) { + return false; + } + + return readContainerLogs(html, logReader, logLimits, + desiredLogType, logUploadedTime); } private boolean readContainerLogs(Block html, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java index 2a5762c..b5672a9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.webapp.view.BlockForTest; import org.apache.hadoop.yarn.webapp.view.HtmlBlock; import org.apache.hadoop.yarn.webapp.view.HtmlBlockForTest; +import org.junit.Before; import org.junit.Test; import static org.mockito.Mockito.*; @@ -58,16 +59,24 @@ * */ public class TestAggregatedLogsBlock { + + private static final File testWorkDir = new File("target", + "TestAggregatedLogsBlock"); + + @Before + public void setup() { + FileUtil.fullyDelete(testWorkDir); + } + /** * Bad user. User 'owner' is trying to read logs without access */ @Test public void testAccessDenied() throws Exception { - - FileUtil.fullyDelete(new File("target/logs")); Configuration configuration = getConfiguration(); - writeLogs("target/logs/logs/application_0_0001/container_0_0001_01_000001"); + writeLogs(testWorkDir.toString() + + "/logs/logs/application_0_0001/container_0_0001_01_000001"); writeLog(configuration, "owner"); @@ -93,11 +102,10 @@ public void testAccessDenied() throws Exception { */ @Test public void testBadLogs() throws Exception { - - FileUtil.fullyDelete(new File("target/logs")); Configuration configuration = getConfiguration(); - writeLogs("target/logs/logs/application_0_0001/container_0_0001_01_000001"); + writeLogs(testWorkDir.toString() + + "/logs/logs/application_0_0001/container_0_0001_01_000001"); writeLog(configuration, "owner"); @@ -123,11 +131,10 @@ public void testBadLogs() throws Exception { */ @Test public void testAggregatedLogsBlock() throws Exception { - - FileUtil.fullyDelete(new File("target/logs")); Configuration configuration = getConfiguration(); - writeLogs("target/logs/logs/application_0_0001/container_0_0001_01_000001"); + writeLogs(testWorkDir.toString() + + "/logs/logs/application_0_0001/container_0_0001_01_000001"); writeLog(configuration, "admin"); @@ -152,11 +159,10 @@ public void testAggregatedLogsBlock() throws Exception { */ @Test public void testNoLogs() throws Exception { - - FileUtil.fullyDelete(new File("target/logs")); Configuration configuration = getConfiguration(); - File f = new File("target/logs/logs/application_0_0001/container_0_0001_01_000001"); + File f = new File(testWorkDir, + "logs/logs/application_0_0001/container_0_0001_01_000001"); if (!f.exists()) { assertTrue(f.mkdirs()); } @@ -175,27 +181,110 @@ public void testNoLogs() throws Exception { assertTrue(out.contains("No logs available for container container_0_0001_01_000001")); } + + /** + * Test reading from compacted logs and aggregated logs + * + * @throws Exception + */ + @Test + public void testCompactedLogs() throws Exception { + Configuration configuration = getConfiguration(); + + writeLogs(testWorkDir.toString() + + "/logs/logs/application_0_0001/container_0_0001_01_000001"); + String c1Aggregated = writeLog(configuration, "admin", 1, "host1_1234"); + + writeLogs(testWorkDir.toString() + + "/logs/logs/application_0_0001/container_0_0001_01_000002"); + writeLog(configuration, "admin", 2, "host2_1234"); + + // Put container_0_0001_01_000001's aggregated logs into the compacted log + // file and delete it + File compactedLogDir = new File(testWorkDir, + "logs/admin/logs/application_0_0001"); + Path indexFile = + new Path(compactedLogDir.toString(), "application_0_0001.index"); + Path logFile = + new Path(compactedLogDir.toString(), "application_0_0001"); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + CompactedAggregatedLogFormat.LogWriter cWriter = null; + AggregatedLogFormat.LogReader aReader = null; + try { + cWriter = new CompactedAggregatedLogFormat.LogWriter(configuration, + indexFile, logFile, ugi); + aReader = new AggregatedLogFormat.LogReader(configuration, + new Path(c1Aggregated)); + cWriter.append(aReader); + } finally { + if (cWriter != null) { + cWriter.close(); + } + if (aReader != null) { + aReader.close(); + } + } + File c1AggregatedFile = new File(c1Aggregated); + assertTrue(c1AggregatedFile.delete()); + + // Should read from compacted log file + AggregatedLogsBlockForTest aggregatedBlock = getAggregatedLogsBlockForTest( + configuration, "admin", "container_0_0001_01_000001", "host1:1234"); + ByteArrayOutputStream data = new ByteArrayOutputStream(); + PrintWriter printWriter = new PrintWriter(data); + HtmlBlock html = new HtmlBlockForTest(); + HtmlBlock.Block block = new BlockForTest(html, printWriter, 10, false); + aggregatedBlock.render(block); + block.getWriter().flush(); + String out = data.toString(); + assertTrue(out.contains("test log1")); + assertTrue(out.contains("test log2")); + assertTrue(out.contains("test log3")); + + // Not in compacted log file, but can read from aggregated log file + aggregatedBlock = getAggregatedLogsBlockForTest( + configuration, "admin", "container_0_0001_01_000002", "host2:1234"); + data = new ByteArrayOutputStream(); + printWriter = new PrintWriter(data); + html = new HtmlBlockForTest(); + block = new BlockForTest(html, printWriter, 10, false); + aggregatedBlock.render(block); + block.getWriter().flush(); + out = data.toString(); + assertTrue(out.contains("test log1")); + assertTrue(out.contains("test log2")); + assertTrue(out.contains("test log3")); + } private Configuration getConfiguration() { Configuration configuration = new Configuration(); configuration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); - configuration.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, "target/logs"); + configuration.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + testWorkDir.toString() + "/logs"); configuration.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true); configuration.set(YarnConfiguration.YARN_ADMIN_ACL, "admin"); + configuration.setBoolean( + YarnConfiguration.AGGREGATION_LOG_COMPACTION_ENABLED, true); return configuration; } private AggregatedLogsBlockForTest getAggregatedLogsBlockForTest( Configuration configuration, String user, String containerId) { + return getAggregatedLogsBlockForTest(configuration, user, containerId, + "localhost:1234"); + } + + private AggregatedLogsBlockForTest getAggregatedLogsBlockForTest( + Configuration configuration, String user, String containerId, + String node) { HttpServletRequest request = mock(HttpServletRequest.class); when(request.getRemoteUser()).thenReturn(user); AggregatedLogsBlockForTest aggregatedBlock = new AggregatedLogsBlockForTest( configuration); aggregatedBlock.setRequest(request); aggregatedBlock.moreParams().put(YarnWebParams.CONTAINER_ID, containerId); - aggregatedBlock.moreParams().put(YarnWebParams.NM_NODENAME, - "localhost:1234"); + aggregatedBlock.moreParams().put(YarnWebParams.NM_NODENAME, node); aggregatedBlock.moreParams().put(YarnWebParams.APP_OWNER, user); aggregatedBlock.moreParams().put("start", ""); aggregatedBlock.moreParams().put("end", ""); @@ -203,19 +292,25 @@ private AggregatedLogsBlockForTest getAggregatedLogsBlockForTest( return aggregatedBlock; } - private void writeLog(Configuration configuration, String user) + private String writeLog(Configuration configuration, String user) + throws Exception { + return writeLog(configuration, user, 1, "localhost_1234"); + } + + private String writeLog(Configuration configuration, String user, int cid, String node) throws Exception { ApplicationId appId = ApplicationIdPBImpl.newInstance(0, 1); ApplicationAttemptId appAttemptId = ApplicationAttemptIdPBImpl.newInstance(appId, 1); - ContainerId containerId = ContainerIdPBImpl.newContainerId(appAttemptId, 1); + ContainerId containerId = ContainerIdPBImpl.newContainerId(appAttemptId, cid); - String path = "target/logs/" + user - + "/logs/application_0_0001/localhost_1234"; + String path = testWorkDir.toString() + + "/logs/" + user + "/logs/application_0_0001/" + node; File f = new File(path); if (!f.getParentFile().exists()) { assertTrue(f.getParentFile().mkdirs()); } - List rootLogDirs = Arrays.asList("target/logs/logs"); + List rootLogDirs = + Arrays.asList(testWorkDir.toString() + "/logs/logs"); UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); AggregatedLogFormat.LogWriter writer = new AggregatedLogFormat.LogWriter( @@ -226,9 +321,10 @@ private void writeLog(Configuration configuration, String user) appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName()); writer.writeApplicationACLs(appAcls); - writer.append(new AggregatedLogFormat.LogKey("container_0_0001_01_000001"), + writer.append(new AggregatedLogFormat.LogKey(containerId.toString()), new AggregatedLogFormat.LogValue(rootLogDirs, containerId,UserGroupInformation.getCurrentUser().getShortUserName())); writer.close(); + return path; } private void writeLogs(String dirName) throws Exception { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestCompactedAggregatedLogFormat.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestCompactedAggregatedLogFormat.java new file mode 100644 index 0000000..30ed83f --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestCompactedAggregatedLogFormat.java @@ -0,0 +1,421 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.logaggregation; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.TestContainerId; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.Writer; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Scanner; +import java.util.Set; + +import static org.junit.Assert.assertTrue; + +public class TestCompactedAggregatedLogFormat { + + private static final File testWorkDir = new File("target", + "TestCompactedAggregatedLogFormat"); + private static final Configuration conf = new Configuration(); + private static final FileSystem fs; + + static { + try { + fs = FileSystem.get(conf); + } catch (IOException e) { + throw new RuntimeException(e); + } + // Allows us to inject failures while copying streams + System.setProperty("TestCompactedAggregatedLogFormat", "true"); + } + + @Before + public void setup() throws Exception { + Path workDirPath = new Path(testWorkDir.getAbsolutePath()); + if (fs.exists(workDirPath)) { + fs.delete(workDirPath, true); + } + fs.mkdirs(workDirPath); + } + + @After + public void cleanup() throws Exception { + Path workDirPath = new Path(testWorkDir.getAbsolutePath()); + fs.delete(workDirPath, true); + } + + /** + * This test creates local log files for 2 containers, aggregates them, then + * compacts them, and finally reads the compacted aggregated log file. It + * then creates local log files for a 3rd container, repeats the process for + * it, and verifies that all 3 containers' log files are in the compacted + * log file. + * + * @throws Exception + */ + @Test + public void testReaderWriter() throws Exception { + File rootLogDirs = new File(testWorkDir.getAbsolutePath(), "local-logs"); + Path remoteAggregatedLogDirs = new Path(testWorkDir.getAbsolutePath(), + "logs"); + ContainerId containerIdA = TestContainerId.newContainerId(1, 1, 1, 1); + ContainerId containerIdB = TestContainerId.newContainerId(1, 1, 1, 2); + ContainerId containerIdC = TestContainerId.newContainerId(1, 1, 1, 3); + + Path remoteAppLogFileA = + writeAggregatedLog(rootLogDirs, remoteAggregatedLogDirs, containerIdA, + "hostA_1234"); + Path remoteAppLogFileB = + writeAggregatedLog(rootLogDirs, remoteAggregatedLogDirs, containerIdB, + "hostB_1234"); + + Path indexFile = + new Path(testWorkDir.getAbsolutePath(), + containerIdA.getApplicationAttemptId().getApplicationId().toString() + + ".index"); + Path logFile = new Path(testWorkDir.getAbsolutePath(), + containerIdA.getApplicationAttemptId().getApplicationId().toString()); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + CompactedAggregatedLogFormat.LogWriter cWriter = null; + AggregatedLogFormat.LogReader aReader = null; + try { + cWriter = new CompactedAggregatedLogFormat.LogWriter(conf, indexFile, + logFile, ugi); + aReader = new AggregatedLogFormat.LogReader(conf, remoteAppLogFileA); + cWriter.append(aReader); + aReader.close(); + aReader = null; + aReader = new AggregatedLogFormat.LogReader(conf, remoteAppLogFileB); + cWriter.append(aReader); + } finally { + if (cWriter != null) { + cWriter.close(); + } + if (aReader != null) { + aReader.close(); + } + } + + CompactedAggregatedLogFormat.LogReader cReader = null; + try { + cReader = + new CompactedAggregatedLogFormat.LogReader(conf, indexFile, logFile); + Assert.assertEquals(ugi.getUserName(), cReader.getApplicationOwner()); + Assert.assertEquals("{VIEW_APP=" + ugi.getUserName() + "}", + cReader.getApplicationAcls().toString()); + Set containerIds = cReader.getContainerIds(); + Assert.assertEquals(2, containerIds.size()); + Assert.assertTrue(containerIds.contains(containerIdA.toString())); + Assert.assertTrue(containerIds.contains(containerIdB.toString())); + for (ContainerId containerId : + new ContainerId[]{containerIdA, containerIdB}) { + AggregatedLogFormat.ContainerLogsReader logReader = + cReader.getContainerLogsReader(containerId); + int i = 0; + while (logReader.nextLog() != null) { + i++; + Assert.assertEquals("log" + i, logReader.getCurrentLogType()); + char[] buf = new char[(int) logReader.getCurrentLogLength()]; + logReader.read(buf, 0, buf.length); + Assert.assertEquals("test log" + i, new String(buf)); + } + Assert.assertEquals(i, 3); + } + } finally { + if (cReader != null) { + cReader.close(); + } + } + + Path remoteAppLogFileC = + writeAggregatedLog(rootLogDirs, remoteAggregatedLogDirs, containerIdC, + "hostC_1234"); + cWriter = null; + aReader = null; + try { + cWriter = new CompactedAggregatedLogFormat.LogWriter(conf, indexFile, + logFile, ugi); + aReader = new AggregatedLogFormat.LogReader(conf, remoteAppLogFileC); + cWriter.append(aReader); + } finally { + if (cWriter != null) { + cWriter.close(); + } + if (aReader != null) { + aReader.close(); + } + } + + cReader = null; + try { + cReader = + new CompactedAggregatedLogFormat.LogReader(conf, indexFile, logFile); + Assert.assertEquals(ugi.getUserName(), cReader.getApplicationOwner()); + Assert.assertEquals("{VIEW_APP=" + ugi.getUserName() + "}", + cReader.getApplicationAcls().toString()); + Set containerIds = cReader.getContainerIds(); + Assert.assertEquals(3, containerIds.size()); + Assert.assertTrue(containerIds.contains(containerIdA.toString())); + Assert.assertTrue(containerIds.contains(containerIdB.toString())); + Assert.assertTrue(containerIds.contains(containerIdC.toString())); + for (ContainerId containerId : + new ContainerId[]{containerIdA, containerIdB, containerIdC}) { + AggregatedLogFormat.ContainerLogsReader logReader = + cReader.getContainerLogsReader(containerId); + int i = 0; + while (logReader.nextLog() != null) { + i++; + Assert.assertEquals("log" + i, logReader.getCurrentLogType()); + char[] buf = new char[(int) logReader.getCurrentLogLength()]; + logReader.read(buf, 0, buf.length); + Assert.assertEquals("test log" + i, new String(buf)); + } + Assert.assertEquals(i, 3); + } + } finally { + if (cReader != null) { + cReader.close(); + } + } + } + + /** + * This test creates local log files for 1 container, aggregates them, then + * compacts them, then injects an IO failure while copying the data, and + * finally reads the compacted aggregated log file (verifying that the + * container is effectively not there). It then creates local log files + * for a 2nd container, repeats the process for it (without the IO failure), + * and verifies that the second container's log files are in the compacted log + * file. + * + * @throws Exception + */ + @Test + public void testIncompleteIndexReaderWriter() throws Exception { + File rootLogDirs = new File(testWorkDir.getAbsolutePath(), "local-logs"); + Path remoteAggregatedLogDirs = new Path(testWorkDir.getAbsolutePath(), + "logs"); + ContainerId containerIdA = TestContainerId.newContainerId(1, 1, 1, 1); + ContainerId containerIdB = TestContainerId.newContainerId(1, 1, 1, 2); + + Path remoteAppLogFileA = + writeAggregatedLog(rootLogDirs, remoteAggregatedLogDirs, containerIdA, + "hostA_1234"); + + Path indexFile = + new Path(testWorkDir.getAbsolutePath(), + containerIdA.getApplicationAttemptId().getApplicationId().toString() + + ".index"); + Path logFile = new Path(testWorkDir.getAbsolutePath(), + containerIdA.getApplicationAttemptId().getApplicationId().toString()); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + CompactedAggregatedLogFormat.LogWriter cWriter = null; + AggregatedLogFormat.LogReader aReader = null; + try { + cWriter = new CompactedAggregatedLogFormat.LogWriter(conf, indexFile, + logFile, ugi); + aReader = new AggregatedLogFormat.LogReader(conf, remoteAppLogFileA); + // Inject IO Failure + System.setProperty("TestCompactedAggregatedLogFormat_Cause", "IOException"); + try { + cWriter.append(aReader); + } catch (IOException ioe) { + if (!ioe.getMessage() + .equals("Injected IO Failure!")) { + throw ioe; + } + } + } finally { + if (cWriter != null) { + cWriter.close(); + } + if (aReader != null) { + aReader.close(); + } + System.setProperty("TestCompactedAggregatedLogFormat_Cause", "None"); + } + + // Verify that the index entry is incomplete + File iFile = new File(testWorkDir.getAbsolutePath(), indexFile.getName()); + StringBuilder sb = new StringBuilder(); + Scanner sc = null; + try { + sc = new Scanner(iFile); + String line = sc.nextLine(); + Assert.assertEquals("1", line); + sb.append(line).append(System.lineSeparator()); + line = sc.nextLine(); + Assert.assertEquals("1,VIEW_APP," + ugi.getUserName(), line); + sb.append(line).append(System.lineSeparator()); + line = sc.nextLine(); + Assert.assertEquals(ugi.getUserName(), line); + sb.append(line).append(System.lineSeparator()); + line = sc.nextLine(); + Assert.assertEquals(containerIdA.toString(), line); + Assert.assertFalse(sc.hasNextLine()); + } finally { + if (sc != null) { + sc.close(); + } + } + + CompactedAggregatedLogFormat.LogReader cReader = null; + try { + cReader = new CompactedAggregatedLogFormat.LogReader(conf, indexFile, + logFile); + Assert.assertEquals(ugi.getUserName(), cReader.getApplicationOwner()); + Assert.assertEquals("{VIEW_APP=" + ugi.getUserName() + "}", + cReader.getApplicationAcls().toString()); + Set containerIds = cReader.getContainerIds(); + Assert.assertEquals(0, containerIds.size()); + AggregatedLogFormat.ContainerLogsReader logReader = + cReader.getContainerLogsReader(containerIdA); + Assert.assertNull(logReader); + } finally { + if (cReader != null) { + cReader.close(); + } + } + + Path remoteAppLogFileB = + writeAggregatedLog(rootLogDirs, remoteAggregatedLogDirs, containerIdB, + "hostB_1234"); + + cWriter = null; + aReader = null; + try { + cWriter = + new CompactedAggregatedLogFormat.LogWriter(conf, indexFile, logFile, ugi); + aReader = new AggregatedLogFormat.LogReader(conf, remoteAppLogFileB); + cWriter.append(aReader); + } finally { + if (cWriter != null) { + cWriter.close(); + } + if (aReader != null) { + aReader.close(); + } + } + + cReader = null; + try { + cReader = + new CompactedAggregatedLogFormat.LogReader(conf, indexFile, logFile); + Assert.assertEquals(ugi.getUserName(), cReader.getApplicationOwner()); + Assert.assertEquals("{VIEW_APP=" + ugi.getUserName() + "}", + cReader.getApplicationAcls().toString()); + Set containerIds = cReader.getContainerIds(); + Assert.assertEquals(1, containerIds.size()); + Assert.assertTrue(containerIds.contains(containerIdB.toString())); + AggregatedLogFormat.ContainerLogsReader logReader = + cReader.getContainerLogsReader(containerIdB); + int i = 0; + while (logReader.nextLog() != null) { + i++; + Assert.assertEquals("log" + i, logReader.getCurrentLogType()); + char[] buf = new char[(int) logReader.getCurrentLogLength()]; + logReader.read(buf, 0, buf.length); + Assert.assertEquals("test log" + i, new String(buf)); + } + Assert.assertEquals(i, 3); + } finally { + if (cReader != null) { + cReader.close(); + } + } + } + + private Path writeAggregatedLog(File rootLogDirs, + Path remoteAggregatedLogDirs, ContainerId containerId, String node) + throws Exception { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + Path remoteAppDir = new Path(remoteAggregatedLogDirs, + containerId.getApplicationAttemptId().getApplicationId().toString()); + if (!fs.exists(remoteAppDir)) { + assertTrue(fs.mkdirs(remoteAppDir)); + } + Path remoteAppLogFile = new Path(remoteAppDir, node); + AggregatedLogFormat.LogWriter writer = null; + try { + writer = new AggregatedLogFormat.LogWriter(conf, remoteAppLogFile, ugi); + AggregatedLogFormat.LogKey logKey = + new AggregatedLogFormat.LogKey(containerId); + writer.writeApplicationOwner(ugi.getUserName()); + Map appAcls = + new HashMap(); + appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName()); + writer.writeApplicationACLs(appAcls); + File appLogDir = new File(rootLogDirs, + containerId.getApplicationAttemptId().getApplicationId().toString()); + File containerLogDir = new File(appLogDir, containerId.toString()); + writeLogs(containerLogDir.getAbsolutePath()); + AggregatedLogFormat.LogValue logValue = + new AggregatedLogFormat.LogValue( + Arrays.asList(rootLogDirs.getAbsolutePath()), containerId, + ugi.getUserName()); + writer.append(logKey, logValue); + } finally { + if (writer != null) { + writer.close(); + } + } + return remoteAppLogFile; + } + + private void writeLogs(String dirName) throws Exception { + File f = new File(dirName + File.separator + "log1"); + if (!f.getParentFile().exists()) { + assertTrue(f.getParentFile().mkdirs()); + } + + writeLog(dirName + File.separator + "log1", "test log1"); + writeLog(dirName + File.separator + "log2", "test log2"); + writeLog(dirName + File.separator + "log3", "test log3"); + } + + private void writeLog(String fileName, String text) throws Exception { + File f = new File(fileName); + if (f.exists()) { + f.delete(); + } + Writer writer = null; + try { + writer = new FileWriter(f); + writer.write(text); + } finally { + if (writer != null) { + writer.close(); + } + } + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml index b1efa5f..b5ff0e5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml @@ -164,6 +164,11 @@ org.fusesource.leveldbjni leveldbjni-all + + org.apache.curator + curator-test + test + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 20887b6..4db4eb2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -35,6 +35,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; @@ -44,7 +46,6 @@ import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -53,9 +54,11 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter; +import org.apache.hadoop.yarn.logaggregation.CompactedAggregatedLogFormat; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; import org.apache.hadoop.yarn.server.nodemanager.Context; @@ -118,6 +121,12 @@ private final long rollingMonitorInterval; private final NodeId nodeId; + private boolean compactionEnabled; + private CuratorFramework curator; + private String compactedLogLock; + private Path remoteCompactedLogFileForApp; + private Path remoteCompactedLogIndexFileForApp; + private final Map containerLogAggregators = new HashMap(); @@ -191,6 +200,27 @@ public AppLogAggregatorImpl(Dispatcher dispatcher, } this.rollingMonitorInterval = configuredRollingMonitorInterval; } + compactionEnabled = false; + } + + public AppLogAggregatorImpl(Dispatcher dispatcher, + DeletionService deletionService, Configuration conf, + ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId, + LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp, + ContainerLogsRetentionPolicy retentionPolicy, + Map appAcls, + LogAggregationContext logAggregationContext, Context context, + FileContext lfs, CuratorFramework curator, String compactedLogLock, + Path remoteCompactedLogFileForApp, + Path remoteCompactedLogIndexFileForApp) { + this(dispatcher, deletionService, conf, appId, userUgi, nodeId, dirsHandler, + remoteNodeLogFileForApp, retentionPolicy, appAcls, + logAggregationContext, context, lfs); + this.curator = curator; + this.compactedLogLock = compactedLogLock; + this.remoteCompactedLogFileForApp = remoteCompactedLogFileForApp; + this.remoteCompactedLogIndexFileForApp = remoteCompactedLogIndexFileForApp; + compactionEnabled = true; } private void uploadLogsForContainers() { @@ -381,11 +411,70 @@ public void run() { } finally { if (!this.appAggregationFinished.get()) { LOG.warn("Aggregation did not complete for application " + appId); + } else { + if (compactionEnabled) { + // Append aggregated log to compacted file + doAppendAggregatedLogToCompactedFile(); + } } this.appAggregationFinished.set(true); } } + private void doAppendAggregatedLogToCompactedFile() { + LOG.info("Appending aggregated log to compacted aggregated log file for " + + appId); + InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex(curator, + compactedLogLock + "/" + appId); + try { + lock.acquire(); + // Append this Node's log file + append(remoteNodeLogFileForApp, remoteCompactedLogFileForApp, + remoteCompactedLogIndexFileForApp); + // Delete aggregated log file + userUgi.doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws IOException { + FileSystem remoteFS = FileSystem.get(conf); + remoteFS.delete(remoteNodeLogFileForApp, false); + return null; + } + }); + } catch (IOException ioe) { + LOG.error(ioe.getMessage(), ioe); + } catch (Exception e) { + LOG.error("Error acquiring lock for " + appId + ": " + e.getMessage(), e); + } finally { + try { + lock.release(); + } catch (Exception e) { + LOG.error("Error releasing lock for " + appId + ": " + e.getMessage(), + e); + } + } + LOG.info("Done appending aggregated log to compacted aggregated log file for " + + appId); + } + + private void append(Path sourceLogFile, Path compactedLogFile, + Path compactedLogIndex) throws IOException { + CompactedAggregatedLogFormat.LogWriter writer = null; + AggregatedLogFormat.LogReader reader = null; + try { + writer = new CompactedAggregatedLogFormat.LogWriter(conf, + compactedLogIndex, compactedLogFile, userUgi); + reader = new AggregatedLogFormat.LogReader(conf, sourceLogFile); + writer.append(reader); + } finally { + if (writer != null) { + writer.close(); + } + if (reader != null) { + reader.close(); + } + } + } + @SuppressWarnings("unchecked") private void doAppLogAggregation() { while (!this.appFinishing.get() && !this.aborted.get()) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index bd3e847..2258e5d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -30,6 +30,13 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.locks.Reaper; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.utils.EnsurePath; +import org.apache.curator.utils.ThreadUtils; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; @@ -40,6 +47,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.curator.ChildReaper; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -97,6 +105,10 @@ Path remoteRootLogDir; String remoteRootLogDirSuffix; private NodeId nodeId; + private CuratorFramework curator; + private ChildReaper lockReaper; + private String compactedLogLock; + boolean compactionEnabled; private final ConcurrentMap appLogAggregators; @@ -125,6 +137,45 @@ protected void serviceInit(Configuration conf) throws Exception { conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX); + if (conf.getBoolean(YarnConfiguration.AGGREGATION_LOG_COMPACTION_ENABLED, + YarnConfiguration.DEFAULT_AGGREGATION_LOG_COMPACTION_ENABLED)) { + compactionEnabled = true; + // TODO: Curator security + String connectString = conf.get( + YarnConfiguration.AGGREGATION_LOG_COMPACTION_ZOOKEEPER_CONNECTION_STRING, + YarnConfiguration.DEFAULT_AGGREGATION_LOG_COMPACTION_ZOOKEEPER_CONNECTION_STRING); + this.compactedLogLock = conf.get( + YarnConfiguration.AGGREGATION_LOG_COMPACTION_LOCK_LOCATION, + YarnConfiguration.DEFAULT_AGGREGATION_LOG_COMPACTION_LOCK_LOCATION); + if (this.compactedLogLock.endsWith("/")) { + this.compactedLogLock = + this.compactedLogLock.substring(0, this.compactedLogLock.length()-1); + } + String compactedLogLockLeader = conf.get( + YarnConfiguration.AGGREGATION_LOG_COMPACTION_LOCK_REAPER_LEADER_LOCATION, + YarnConfiguration.DEFAULT_AGGREGATION_LOG_COMPACTION_LOCK_REAPER_LEADER_LOCATION); + if (compactedLogLockLeader.endsWith("/")) { + compactedLogLockLeader = + compactedLogLockLeader.substring(0, compactedLogLockLeader.length()-1); + } + RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); + this.curator = CuratorFrameworkFactory.builder() + .connectString(connectString) + .retryPolicy(retryPolicy) + .build(); + this.curator.start(); + new EnsurePath(compactedLogLock).ensure(curator.getZookeeperClient()); + new EnsurePath(compactedLogLockLeader) + .ensure(curator.getZookeeperClient()); + this.lockReaper = new ChildReaper(this.curator, this.compactedLogLock, + Reaper.Mode.REAP_INDEFINITELY, + ThreadUtils.newFixedThreadScheduledPool(2, "CompactedLogLockReaper"), + 300, compactedLogLockLeader); + this.lockReaper.start(); + } else { + this.compactionEnabled = false; + } + super.serviceInit(conf); } @@ -140,6 +191,12 @@ protected void serviceStart() throws Exception { protected void serviceStop() throws Exception { LOG.info(this.getName() + " waiting for pending aggregation during exit"); stopAggregators(); + if (lockReaper != null) { + lockReaper.close(); + } + if (curator != null) { + curator.close(); + } super.serviceStop(); } @@ -226,6 +283,18 @@ Path getRemoteNodeLogFileForApp(ApplicationId appId, String user) { this.remoteRootLogDirSuffix); } + Path getRemoteCompactedLogFileForApp(ApplicationId appId, String user) { + return LogAggregationUtils.getRemoteCompactedAggregatedLogFileForApp( + this.remoteRootLogDir, appId, user, + this.remoteRootLogDirSuffix); + } + + Path getRemoteCompactedLogIndexFileForApp(ApplicationId appId, String user) { + return LogAggregationUtils.getRemoteCompactedAggregatedLogIndexFileForApp( + this.remoteRootLogDir, appId, user, + this.remoteRootLogDirSuffix); + } + Path getRemoteAppLogDir(ApplicationId appId, String user) { return LogAggregationUtils.getRemoteAppLogDir(this.remoteRootLogDir, appId, user, this.remoteRootLogDirSuffix); @@ -351,12 +420,24 @@ protected void initAppAggregator(final ApplicationId appId, String user, } // New application - final AppLogAggregator appLogAggregator = - new AppLogAggregatorImpl(this.dispatcher, this.deletionService, - getConfig(), appId, userUgi, this.nodeId, dirsHandler, - getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy, - appAcls, logAggregationContext, this.context, - getLocalFileContext(getConfig())); + final AppLogAggregator appLogAggregator; + if (compactionEnabled) { + appLogAggregator = + new AppLogAggregatorImpl(this.dispatcher, this.deletionService, + getConfig(), appId, userUgi, this.nodeId, dirsHandler, + getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy, + appAcls, logAggregationContext, this.context, + getLocalFileContext(getConfig()), curator, compactedLogLock, + getRemoteCompactedLogFileForApp(appId, user), + getRemoteCompactedLogIndexFileForApp(appId, user)); + } else { + appLogAggregator = + new AppLogAggregatorImpl(this.dispatcher, this.deletionService, + getConfig(), appId, userUgi, this.nodeId, dirsHandler, + getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy, + appAcls, logAggregationContext, this.context, + getLocalFileContext(getConfig())); + } if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) { throw new YarnRuntimeException("Duplicate initApp for " + appId); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 7d911e9..fe8c11d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -57,6 +57,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.LogFactory; +import org.apache.curator.test.TestingServer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.AbstractFileSystem; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -98,6 +99,7 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader; +import org.apache.hadoop.yarn.logaggregation.CompactedAggregatedLogFormat; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; @@ -299,6 +301,78 @@ public void testNoContainerOnNode() throws Exception { logAggregationService.close(); } + @Test (timeout = 50000) + public void testLogAggregationServiceWithCompaction() throws Exception { + TestingServer zkServer = null; + try { + zkServer = new TestingServer(); + zkServer.start(); + this.conf.setBoolean( + YarnConfiguration.AGGREGATION_LOG_COMPACTION_ENABLED, true); + this.conf.set( + YarnConfiguration.AGGREGATION_LOG_COMPACTION_ZOOKEEPER_CONNECTION_STRING, + zkServer.getConnectString()); + this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); + this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + this.remoteRootLogDir.getAbsolutePath()); + + LogAggregationService logAggregationService = new LogAggregationService( + dispatcher, this.context, this.delSrvc, super.dirsHandler); + logAggregationService.init(this.conf); + logAggregationService.start(); + + ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1); + + // AppLogDir should be created + File app1LogDir = + new File(localLogDir, ConverterUtils.toString(application1)); + app1LogDir.mkdir(); + logAggregationService + .handle(new LogHandlerAppStartedEvent( + application1, this.user, null, + ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls)); + + ApplicationAttemptId appAttemptId = + BuilderUtils.newApplicationAttemptId(application1, 1); + ContainerId container11 = BuilderUtils.newContainerId(appAttemptId, 1); + // Simulate log-file creation + String[] logFiles1 = new String[] { "stdout", "stderr", "syslog" }; + writeContainerLogs(app1LogDir, container11, logFiles1); + logAggregationService.handle( + new LogHandlerContainerFinishedEvent(container11, 0)); + + logAggregationService.handle(new LogHandlerAppFinishedEvent( + application1)); + + logAggregationService.stop(); + assertEquals(0, logAggregationService.getNumAggregators()); + + dispatcher.await(); + + ApplicationEvent expectedEvents[] = new ApplicationEvent[]{ + new ApplicationEvent( + application1, + ApplicationEventType.APPLICATION_LOG_HANDLING_INITED), + new ApplicationEvent( + application1, + ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED) + }; + checkEvents(appEventHandler, expectedEvents, true, "getType", + "getApplicationID"); + dispatcher.stop(); + logAggregationService.close(); + + verifyContainerLogs(logAggregationService, application1, + new ContainerId[] {container11}, logFiles1, 3); + + } finally { + if (zkServer != null) { + zkServer.stop(); + zkServer.close(); + } + } + } + @Test @SuppressWarnings("unchecked") public void testMultipleAppsLogAggregation() throws Exception { @@ -662,7 +736,7 @@ public void testLogAggregationCreateDirsFailsWithoutKillingNM() Exception e = new RuntimeException("KABOOM!"); doThrow(e) .when(logAggregationService).createAppDir(any(String.class), - any(ApplicationId.class), any(UserGroupInformation.class)); + any(ApplicationId.class), any(UserGroupInformation.class)); logAggregationService.handle(new LogHandlerAppStartedEvent(appId, this.user, null, ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls)); @@ -799,28 +873,83 @@ private String verifyContainerLogs(LogAggregationService logAggregationService, key = new LogKey(); valueStream = reader.next(key); } + verifyContainerLogsHelper(expectedContainerIds, logFiles, + numOfContainerLogs, logMap); + return targetNodeFile.getPath().getName(); + } finally { + reader.close(); + } + } - // 1 for each container - Assert.assertEquals(expectedContainerIds.length, logMap.size()); - for (ContainerId cId : expectedContainerIds) { - String containerStr = ConverterUtils.toString(cId); - Map thisContainerMap = logMap.remove(containerStr); - Assert.assertEquals(numOfContainerLogs, thisContainerMap.size()); - for (String fileType : logFiles) { - String expectedValue = containerStr + " Hello " + fileType + "!"; - LOG.info("Expected log-content : " + new String(expectedValue)); - String foundValue = thisContainerMap.remove(fileType); - Assert.assertNotNull(cId + " " + fileType - + " not present in aggregated log-file!", foundValue); - Assert.assertEquals(expectedValue, foundValue); + private void verifyContainerLogs(LogAggregationService logAggregationService, + ApplicationId appId, ContainerId[] expectedContainerIds, + String[] logFiles, int numOfContainerLogs) throws IOException { + FileSystem fs = FileSystem.get(this.conf); + Path indexFile = logAggregationService + .getRemoteCompactedLogIndexFileForApp( + appId, this.user); + Path compactedLogFile = logAggregationService + .getRemoteCompactedLogFileForApp( + appId, this.user); + Assert.assertTrue(fs.exists(indexFile)); + Assert.assertTrue(fs.exists(compactedLogFile)); + + CompactedAggregatedLogFormat.LogReader reader = null; + try { + reader = new CompactedAggregatedLogFormat.LogReader(conf, indexFile, + compactedLogFile); + Assert.assertEquals(this.user, reader.getApplicationOwner()); + verifyAcls(reader.getApplicationAcls()); + + Map> logMap = + new HashMap>(); + for (String container : reader.getContainerIds()) { + LOG.info("Found container " + container); + Map perContainerMap = new HashMap(); + logMap.put(container, perContainerMap); + AggregatedLogFormat.ContainerLogsReader logReader = + reader.getContainerLogsReader(ContainerId.fromString(container)); + while (logReader.nextLog() != null) { + String fileType = logReader.getCurrentLogType(); + long fileLength = logReader.getCurrentLogLength(); + char[] buf = new char[(int) fileLength]; + logReader.read(buf, 0, buf.length); + String logContents = new String(buf); + perContainerMap.put(fileType, logContents); + LOG.info("LogType:" + fileType); + LOG.info("LogLength:" + fileLength); + LOG.info("Log Contents:\n" + perContainerMap.get(fileType)); } - Assert.assertEquals(0, thisContainerMap.size()); } - Assert.assertEquals(0, logMap.size()); - return targetNodeFile.getPath().getName(); + verifyContainerLogsHelper(expectedContainerIds, logFiles, + numOfContainerLogs, logMap); } finally { - reader.close(); + if (reader != null) { + reader.close(); + } + } + } + + private void verifyContainerLogsHelper(ContainerId[] expectedContainerIds, + String[] logFiles, int numOfContainerLogs, + Map> logMap) { + // 1 for each container + Assert.assertEquals(expectedContainerIds.length, logMap.size()); + for (ContainerId cId : expectedContainerIds) { + String containerStr = ConverterUtils.toString(cId); + Map thisContainerMap = logMap.remove(containerStr); + Assert.assertEquals(numOfContainerLogs, thisContainerMap.size()); + for (String fileType : logFiles) { + String expectedValue = containerStr + " Hello " + fileType + "!"; + LOG.info("Expected log-content : " + new String(expectedValue)); + String foundValue = thisContainerMap.remove(fileType); + Assert.assertNotNull(cId + " " + fileType + + " not present in aggregated log-file!", foundValue); + Assert.assertEquals(expectedValue, foundValue); + } + Assert.assertEquals(0, thisContainerMap.size()); } + Assert.assertEquals(0, logMap.size()); } @Test