diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index debe770..7242e50 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -190,7 +190,7 @@ public LogValue(List rootLogDirs, ContainerId containerId, this.appFinished = appFinished; } - private Set getPendingLogFilesToUploadForThisContainer() { + Set getPendingLogFilesToUploadForThisContainer() { Set pendingUploadFiles = new HashSet(); for (String rootLogDir : this.rootLogDirs) { File appLogDir = @@ -224,53 +224,61 @@ public void write(DataOutputStream out, Set pendingUploadFiles) LOG.warn(logFile.getAbsolutePath() + " is a directory. Ignore it."); continue; } + write(out, logFile, false); + } + } - FileInputStream in = null; - try { - in = secureOpenFile(logFile); - } catch (IOException e) { - logErrorMessage(logFile, e); - IOUtils.cleanup(LOG, in); - continue; - } + long write(DataOutputStream out, File logFile, boolean contentOnly) + throws IOException { + FileInputStream in = null; + try { + in = secureOpenFile(logFile); + } catch (IOException e) { + logErrorMessage(logFile, e); + IOUtils.cleanup(LOG, in); + return -1; + } + + final long fileLength = logFile.length(); - final long fileLength = logFile.length(); + if (!contentOnly) { // Write the logFile Type out.writeUTF(logFile.getName()); // Write the log length as UTF so that it is printable out.writeUTF(String.valueOf(fileLength)); + } - // Write the log itself - try { - byte[] buf = new byte[65535]; - int len = 0; - long bytesLeft = fileLength; - while ((len = in.read(buf)) != -1) { - //If buffer contents within fileLength, write - if (len < bytesLeft) { - out.write(buf, 0, len); - bytesLeft-=len; - } - //else only write contents within fileLength, then exit early - else { - out.write(buf, 0, (int)bytesLeft); - break; - } + // Write the log itself + try { + byte[] buf = new byte[65535]; + int len = 0; + long bytesLeft = fileLength; + while ((len = in.read(buf)) != -1) { + //If buffer contents within fileLength, write + if (len < bytesLeft) { + out.write(buf, 0, len); + bytesLeft-=len; } - long newLength = logFile.length(); - if(fileLength < newLength) { - LOG.warn("Aggregated logs truncated by approximately "+ - (newLength-fileLength) +" bytes."); + //else only write contents within fileLength, then exit early + else { + out.write(buf, 0, (int)bytesLeft); + break; } - this.uploadedFiles.add(logFile); - } catch (IOException e) { - String message = logErrorMessage(logFile, e); - out.write(message.getBytes(Charset.forName("UTF-8"))); - } finally { - IOUtils.cleanup(LOG, in); } + long newLength = logFile.length(); + if(fileLength < newLength) { + LOG.warn("Aggregated logs truncated by approximately "+ + (newLength-fileLength) +" bytes."); + } + this.uploadedFiles.add(logFile); + } catch (IOException e) { + String message = logErrorMessage(logFile, e); + out.write(message.getBytes(Charset.forName("UTF-8"))); + } finally { + IOUtils.cleanup(LOG, in); } + return fileLength; } @VisibleForTesting @@ -463,7 +471,7 @@ public void close() { @Public @Evolving - public static class LogReader { + public static class LogReader implements LogFormatReader { private final FSDataInputStream fsDataIStream; private final TFile.Reader.Scanner scanner; @@ -488,6 +496,7 @@ public LogReader(Configuration conf, Path remoteAppLogFile) * @return the application owner. * @throws IOException */ + @Override public String getApplicationOwner() throws IOException { TFile.Reader.Scanner ownerScanner = reader.createScanner(); LogKey key = new LogKey(); @@ -510,6 +519,7 @@ public String getApplicationOwner() throws IOException { * @return a map of the Application ACLs. * @throws IOException */ + @Override public Map getApplicationAcls() throws IOException { // TODO Seek directly to the key once a comparator is specified. @@ -580,7 +590,7 @@ public DataInputStream next(LogKey key) throws IOException { * logs could not be found * @throws IOException */ - @Private + @Override public ContainerLogsReader getContainerLogsReader( ContainerId containerId) throws IOException { ContainerLogsReader logReader = null; @@ -799,23 +809,21 @@ public static int readContainerLogsForALogType( } } + @Override public void close() { IOUtils.cleanup(LOG, scanner, reader, fsDataIStream); } } - @Private - public static class ContainerLogsReader { - private DataInputStream valueStream; - private String currentLogType = null; - private long currentLogLength = 0; - private BoundedInputStream currentLogData = null; - private InputStreamReader currentLogISR; + public static class ContainerLogsReader extends + org.apache.hadoop.yarn.logaggregation.ContainerLogsReader{ + protected DataInputStream valueStream; public ContainerLogsReader(DataInputStream stream) { valueStream = stream; } + @Override public String nextLog() throws IOException { if (currentLogData != null && currentLogLength > 0) { // seek to the end of the current log, relying on BoundedInputStream @@ -847,29 +855,5 @@ public String nextLog() throws IOException { return currentLogType; } - - public String getCurrentLogType() { - return currentLogType; - } - - public long getCurrentLogLength() { - return currentLogLength; - } - - public long skip(long n) throws IOException { - return currentLogData.skip(n); - } - - public int read() throws IOException { - return currentLogData.read(); - } - - public int read(byte[] buf, int off, int len) throws IOException { - return currentLogData.read(buf, off, len); - } - - public int read(char[] buf, int off, int len) throws IOException { - return currentLogISR.read(buf, off, len); - } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ConcatenatableAggregatedLogFormat.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ConcatenatableAggregatedLogFormat.java new file mode 100644 index 0000000..b9e4bde --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ConcatenatableAggregatedLogFormat.java @@ -0,0 +1,383 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.logaggregation; + +import org.apache.commons.io.input.BoundedInputStream; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ContainerId; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.nio.charset.Charset; +import java.security.PrivilegedExceptionAction; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Scanner; +import java.util.Set; + +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class ConcatenatableAggregatedLogFormat { + + private static final Log LOG = + LogFactory.getLog(ConcatenatableAggregatedLogFormat.class); + private static final int CURRENT_VERSION = 1; + private static final String DELIMITER = ","; + private static final String LINE_DELIMITER = System.lineSeparator(); + + /** + * Umask for the log file. + */ + private static final FsPermission APP_LOG_FILE_UMASK = FsPermission + .createImmutable((short) (0640 ^ 0777)); + + public ConcatenatableAggregatedLogFormat() { + } + + public static class LogWriter { + + private final FSDataOutputStream fsDataOStream; + private final OutputStreamWriter indexWriter; + private long currentOffset; + private int indexLineCount; + + public static LogWriter createNew(final Configuration conf, + final Path indexFile, final Path logFile, + final UserGroupInformation userUgi, String owner, + Map appAcls) throws IOException { + FileContext fc; + try { + fc = + userUgi.doAs(new PrivilegedExceptionAction() { + @Override + public FileContext run() throws Exception { + FileContext fc = FileContext.getFileContext(conf); + fc.setUMask(APP_LOG_FILE_UMASK); + return fc; + } + }); + } catch (InterruptedException e) { + throw new IOException(e); + } + + // Check that the files don't already exist + try { + if (fc.getFileStatus(indexFile).isFile()) { + throw new IOException("Index file " + indexFile + " already exists"); + } + } catch (FileNotFoundException fne) { + // expected + } + try { + if (fc.getFileStatus(logFile).isFile()) { + throw new IOException("Log file " + logFile + " already exists"); + } + } catch (FileNotFoundException fne) { + // expected + } + + LogWriter writer = new LogWriter(fc, indexFile, logFile); + writer.writeMetadataHeader(owner, appAcls); + return writer; + } + + public static LogWriter openExisting(final Configuration conf, + final Path indexFile, final Path logFile, + final UserGroupInformation userUgi) throws IOException { + FileContext fc; + try { + fc = + userUgi.doAs(new PrivilegedExceptionAction() { + @Override + public FileContext run() throws Exception { + FileContext fc = FileContext.getFileContext(conf); + fc.setUMask(APP_LOG_FILE_UMASK); + return fc; + } + }); + } catch (InterruptedException e) { + throw new IOException(e); + } + if (!fc.getFileStatus(indexFile).isFile()) { + throw new IOException("Index file " + indexFile + " does not exist"); + } + if (!fc.getFileStatus(logFile).isFile()) { + throw new IOException("Log file " + logFile + " does not exist"); + } + + LogWriter writer = new LogWriter(fc, indexFile, logFile); + return writer; + } + + private LogWriter(final FileContext fc, final Path indexFile, + final Path logFile) throws IOException { + + try { + currentOffset = fc.getFileStatus(logFile).getLen(); + + // Verify the header is not missing any lines + if (currentOffset > 0l) { + Scanner indexScanner = null; + try { + indexScanner = new Scanner(fc.open(indexFile), "utf-8"); + indexLineCount = 0; + while (indexLineCount < 3 && indexScanner.hasNextLine()) { + indexScanner.nextLine(); + indexLineCount++; + } + } finally { + org.apache.hadoop.io.IOUtils.closeStream(indexScanner); + } + } + } catch (FileNotFoundException fnfe) { + currentOffset = 0l; + indexLineCount = 0; + } + + this.fsDataOStream = fc.create( + logFile, + EnumSet.of(CreateFlag.CREATE, CreateFlag.APPEND), + new Options.CreateOpts[]{}); + + indexWriter = new OutputStreamWriter(fc.create( + indexFile, + EnumSet.of(CreateFlag.CREATE, CreateFlag.APPEND), + new Options.CreateOpts[]{}), "utf-8"); + } + + private void writeMetadataHeader(String owner, Map appAcls) throws IOException { + indexWriter.write(Integer.toString(CURRENT_VERSION)); + indexLineCount++; + writeApplicationACLs(appAcls); + indexLineCount++; + indexWriter.write(LINE_DELIMITER + owner); + indexLineCount++; + } + + private void writeApplicationACLs(Map appAcls) throws IOException { + indexWriter.write(LINE_DELIMITER + appAcls.size()); + for (Map.Entry entry : appAcls.entrySet()) { + indexWriter.write(DELIMITER + entry.getKey() + + DELIMITER + entry.getValue()); + } + } + + public void append(ContainerId containerId, + AggregatedLogFormat.LogValue logValue) throws IOException { + Set pendingUploadFiles = + logValue.getPendingLogFilesToUploadForThisContainer(); + for (File f : pendingUploadFiles) { + indexWriter.write(LINE_DELIMITER + containerId + DELIMITER + f.getName()); + long length = logValue.write(fsDataOStream, f, true); + if (length != -1) { + indexWriter.write(DELIMITER + currentOffset + DELIMITER + length); + currentOffset += length; + } + } + } + + public void close() { + org.apache.hadoop.io.IOUtils.closeStream(indexWriter); + org.apache.hadoop.io.IOUtils.closeStream(fsDataOStream); + } + } + + public static class LogReader implements LogFormatReader { + + private final FSDataInputStream fsDataIStream; + private Map acls; + private String owner; + private Map> index; + + public LogReader(Configuration conf, Path indexFile, Path logFile) + throws IOException { + FileContext fileContext = FileContext.getFileContext(conf); + + this.fsDataIStream = fileContext.open(logFile); + + Scanner indexScanner = null; + try { + indexScanner = new Scanner(fileContext.open(indexFile), "utf-8"); + loadIndexFile(indexScanner); + } finally { + org.apache.hadoop.io.IOUtils.closeStream(indexScanner); + } + } + + @Override + public Map getApplicationAcls() { + return acls; + } + + @Override + public String getApplicationOwner() { + return owner; + } + + private void readApplicationACLs(Scanner indexScanner) throws IOException { + String line = indexScanner.nextLine(); + Scanner sc = new Scanner(line); + sc.useDelimiter(DELIMITER); + int numAcls = sc.nextInt(); + acls = new HashMap(numAcls); + for (int i = 0; i < numAcls; i++) { + String appAccessOp = sc.next(); + String aclString = sc.next(); + acls.put(ApplicationAccessType.valueOf(appAccessOp), aclString); + } + } + + private void loadIndexFile(Scanner indexScanner) throws IOException { + String line = indexScanner.nextLine(); + int version = Integer.parseInt(line); + if (version != CURRENT_VERSION) { + throw new IOException("Incorrect version: expected " + CURRENT_VERSION + + " but found " + version); + } + readApplicationACLs(indexScanner); + owner = indexScanner.nextLine(); + + index = new HashMap>(); + while (indexScanner.hasNextLine()) { + Scanner sc = new Scanner(indexScanner.nextLine()); + sc.useDelimiter(DELIMITER); + try { + String containerId = sc.next(); + String logType = sc.next(); + long offset = sc.nextLong(); + long length = sc.nextLong(); + addIndexEntry(containerId, logType, offset, length); + } catch (Exception e) { + // this entry is invalid or incomplete; skip it + LOG.debug("Invalid index entry; skipping", e); + } + } + } + + private void addIndexEntry(String containerId, String logType, long offset, + long length) { + Map logTypes = index.get(containerId); + if (logTypes == null) { + logTypes = new HashMap(); + } + IndexEntry iEntry = new IndexEntry(offset, length); + logTypes.put(logType, iEntry); + index.put(containerId, logTypes); + } + + public Set getContainerIds() { + return index.keySet(); + } + + /** + * Get a ContainerLogsReader to read the logs for + * the specified container. + * + * @param containerId + * @return object to read the container's logs or null if the + * logs could not be found + * @throws IOException + */ + @Override + public ContainerLogsReader getContainerLogsReader( + ContainerId containerId) throws IOException { + ContainerLogsReader logReader = null; + Map logTypes = index.get(containerId.toString()); + if (logTypes != null) { + logReader = new ContainerLogsReader(fsDataIStream, logTypes); + } + return logReader; + } + + @Override + public void close() { + org.apache.hadoop.io.IOUtils.closeStream(fsDataIStream); + } + } + + private static class IndexEntry { + private long offset; + private long length; + + public IndexEntry(long offset, long length) { + this.offset = offset; + this.length = length; + } + + public long getOffset() { + return offset; + } + + public long getLength() { + return length; + } + } + + public static class ContainerLogsReader extends + org.apache.hadoop.yarn.logaggregation.ContainerLogsReader { + private Iterator> logTypesIterator; + private FSDataInputStream valueStream; + + public ContainerLogsReader(FSDataInputStream stream, Map logTypes) { + valueStream = stream; + logTypesIterator = logTypes.entrySet().iterator(); + } + + @Override + public String nextLog() throws IOException { + if (logTypesIterator.hasNext()) { + Map.Entry currentLogEntry = logTypesIterator.next(); + currentLogType = currentLogEntry.getKey(); + currentLogLength = currentLogEntry.getValue().getLength(); + long currentLogOffset = currentLogEntry.getValue().getOffset(); + valueStream.seek(currentLogOffset); + currentLogData = new BoundedInputStream(valueStream, currentLogLength); + currentLogISR = new InputStreamReader(currentLogData, + Charset.forName("UTF-8")); + } else { + currentLogType = null; + currentLogData = null; + currentLogISR = null; + currentLogLength = 0; + } + return currentLogType; + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogsReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogsReader.java new file mode 100644 index 0000000..a78954d --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogsReader.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.logaggregation; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.io.InputStreamReader; + +public abstract class ContainerLogsReader { + protected String currentLogType = null; + protected long currentLogLength = 0; + protected BoundedInputStream currentLogData = null; + protected InputStreamReader currentLogISR; + + public abstract String nextLog() throws IOException; + + public String getCurrentLogType() { + return currentLogType; + } + + public long getCurrentLogLength() { + return currentLogLength; + } + + public long skip(long n) throws IOException { + return currentLogData.skip(n); + } + + public int read() throws IOException { + return currentLogData.read(); + } + + public int read(byte[] buf, int off, int len) throws IOException { + return currentLogData.read(buf, off, len); + } + + public int read(char[] buf, int off, int len) throws IOException { + return currentLogISR.read(buf, off, len); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogFormatReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogFormatReader.java new file mode 100644 index 0000000..b3409bc --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogFormatReader.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.logaggregation; + +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ContainerId; + +import java.io.IOException; +import java.util.Map; + +public interface LogFormatReader { + + public String getApplicationOwner() throws IOException; + + public Map getApplicationAcls() + throws IOException; + + public ContainerLogsReader getContainerLogsReader( + ContainerId containerId) throws IOException; + + public void close(); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestConcatenatableAggregatedLogFormat.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestConcatenatableAggregatedLogFormat.java new file mode 100644 index 0000000..adb3955 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestConcatenatableAggregatedLogFormat.java @@ -0,0 +1,346 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.logaggregation; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.TestContainerId; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.Writer; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Scanner; +import java.util.Set; + +import static org.junit.Assert.assertTrue; + +public class TestConcatenatableAggregatedLogFormat { + + private static final File testWorkDir = new File("target", + "TestConcatenatableAggregatedLogFormat"); + private static final Configuration conf = new Configuration(); + private static final FileSystem fs; + + static { + try { + fs = FileSystem.get(conf); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Before + public void setup() throws Exception { + Path workDirPath = new Path(testWorkDir.getAbsolutePath()); + if (fs.exists(workDirPath)) { + fs.delete(workDirPath, true); + } + fs.mkdirs(workDirPath); + } + + @After + public void cleanup() throws Exception { + Path workDirPath = new Path(testWorkDir.getAbsolutePath()); + fs.delete(workDirPath, true); + } + + @Test + public void testReaderWriter() throws Exception { + File rootLogDirs = new File(testWorkDir.getAbsolutePath(), "local-logs"); + Path remoteAggregatedLogDirs = new Path(testWorkDir.getAbsolutePath(), + "logs"); + ContainerId containerIdA = TestContainerId.newContainerId(1, 1, 1, 1); + ContainerId containerIdB = TestContainerId.newContainerId(1, 1, 1, 2); + ContainerId containerIdC = TestContainerId.newContainerId(1, 1, 1, 3); + + writeLocalLogs(rootLogDirs, containerIdA); + writeLocalLogs(rootLogDirs, containerIdB); + writeLocalLogs(rootLogDirs, containerIdC); + + Path remoteAppDir = new Path(remoteAggregatedLogDirs, + containerIdA.getApplicationAttemptId().getApplicationId().toString()); + if (!fs.exists(remoteAppDir)) { + assertTrue(fs.mkdirs(remoteAppDir)); + } + + Path indexFile = + new Path(testWorkDir.getAbsolutePath(), + containerIdA.getApplicationAttemptId().getApplicationId().toString() + + ".index"); + Path logFile = new Path(testWorkDir.getAbsolutePath(), + containerIdA.getApplicationAttemptId().getApplicationId().toString()); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + Map appAcls = + new HashMap(); + appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName()); + // Write two containers after creating new file + ConcatenatableAggregatedLogFormat.LogWriter cWriter = null; + try { + cWriter = ConcatenatableAggregatedLogFormat.LogWriter.createNew( + conf, indexFile, logFile, ugi, ugi.getShortUserName(), appAcls); + AggregatedLogFormat.LogValue logValue = + new AggregatedLogFormat.LogValue( + Arrays.asList(rootLogDirs.getAbsolutePath()), containerIdA, + ugi.getUserName()); + cWriter.append(containerIdA, logValue); + logValue = + new AggregatedLogFormat.LogValue( + Arrays.asList(rootLogDirs.getAbsolutePath()), containerIdB, + ugi.getUserName()); + cWriter.append(containerIdB, logValue); + } finally { + if (cWriter != null) { + cWriter.close(); + } + } + // Write one container after opening existing file + cWriter = null; + try { + cWriter = ConcatenatableAggregatedLogFormat.LogWriter.openExisting( + conf, indexFile, logFile, ugi); + AggregatedLogFormat.LogValue logValue = + new AggregatedLogFormat.LogValue( + Arrays.asList(rootLogDirs.getAbsolutePath()), containerIdC, + ugi.getUserName()); + cWriter.append(containerIdC, logValue); + } finally { + if (cWriter != null) { + cWriter.close(); + } + } + + ConcatenatableAggregatedLogFormat.LogReader cReader = null; + try { + cReader = + new ConcatenatableAggregatedLogFormat.LogReader(conf, indexFile, logFile); + Assert.assertEquals(ugi.getUserName(), cReader.getApplicationOwner()); + Assert.assertEquals("{VIEW_APP=" + ugi.getUserName() + "}", + cReader.getApplicationAcls().toString()); + Set containerIds = cReader.getContainerIds(); + Assert.assertEquals(3, containerIds.size()); + Assert.assertTrue(containerIds.contains(containerIdA.toString())); + Assert.assertTrue(containerIds.contains(containerIdB.toString())); + Assert.assertTrue(containerIds.contains(containerIdC.toString())); + for (ContainerId containerId : + new ContainerId[]{containerIdA, containerIdB, containerIdC}) { + ConcatenatableAggregatedLogFormat.ContainerLogsReader logReader = + cReader.getContainerLogsReader(containerId); + Map readLogs = new HashMap(3); + while (logReader.nextLog() != null) { + String logType = logReader.getCurrentLogType(); + Assert.assertFalse(readLogs.containsKey(logType)); + char[] buf = new char[(int) logReader.getCurrentLogLength()]; + logReader.read(buf, 0, buf.length); + readLogs.put(logType, new String(buf)); + } + Assert.assertEquals(3, readLogs.size()); + Assert.assertEquals("test log1", readLogs.get("log1")); + Assert.assertEquals("test log2", readLogs.get("log2")); + Assert.assertEquals("test log3", readLogs.get("log3")); + } + } finally { + if (cReader != null) { + cReader.close(); + } + } + } + + @Test + public void testIncompleteIndexReaderWriter() throws Exception { + File rootLogDirs = new File(testWorkDir.getAbsolutePath(), "local-logs"); + Path remoteAggregatedLogDirs = new Path(testWorkDir.getAbsolutePath(), + "logs"); + ContainerId containerIdA = TestContainerId.newContainerId(1, 1, 1, 1); + ContainerId containerIdB = TestContainerId.newContainerId(1, 1, 1, 2); + ContainerId containerIdC = TestContainerId.newContainerId(1, 1, 1, 3); + + writeLocalLogs(rootLogDirs, containerIdA); + writeLocalLogs(rootLogDirs, containerIdB); + writeLocalLogs(rootLogDirs, containerIdC); + + Path remoteAppDir = new Path(remoteAggregatedLogDirs, + containerIdA.getApplicationAttemptId().getApplicationId().toString()); + if (!fs.exists(remoteAppDir)) { + assertTrue(fs.mkdirs(remoteAppDir)); + } + + Path indexFile = + new Path(testWorkDir.getAbsolutePath(), + containerIdA.getApplicationAttemptId().getApplicationId().toString() + + ".index"); + Path logFile = new Path(testWorkDir.getAbsolutePath(), + containerIdA.getApplicationAttemptId().getApplicationId().toString()); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + Map appAcls = + new HashMap(); + appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName()); + // Write two containers after creating new file + ConcatenatableAggregatedLogFormat.LogWriter cWriter = null; + try { + cWriter = ConcatenatableAggregatedLogFormat.LogWriter.createNew( + conf, indexFile, logFile, ugi, ugi.getShortUserName(), appAcls); + AggregatedLogFormat.LogValue logValue = + new AggregatedLogFormat.LogValue( + Arrays.asList(rootLogDirs.getAbsolutePath()), containerIdA, + ugi.getUserName()); + cWriter.append(containerIdA, logValue); + logValue = + new AggregatedLogFormat.LogValue( + Arrays.asList(rootLogDirs.getAbsolutePath()), containerIdB, + ugi.getUserName()); + cWriter.append(containerIdB, logValue); + } finally { + if (cWriter != null) { + cWriter.close(); + } + } + + // Pretend the host got interrupted while writing the index for + // container B's "log2" and "log3" log types by making those index entries + // incomplete + StringBuilder sb = new StringBuilder(); + Scanner sc = null; + Writer writer = null; + try { + sc = new Scanner(new File(indexFile.toString())); + while(sc.hasNextLine()) { + String line = sc.nextLine(); + if (line.startsWith(containerIdB.toString() + ",log2")) { + line = containerIdB.toString() + ",log2"; + } else + if (line.startsWith(containerIdB.toString() + ",log3")) { + line = containerIdB.toString() + ",log3"; + } + sb.append(line).append(System.lineSeparator()); + } + sc.close(); + writer = new FileWriter(new File(indexFile.toString())); + writer.write(sb.toString()); + } finally { + if (sc != null) { + sc.close(); + } + if (writer != null) { + writer.close(); + } + } + + // Write one container after opening existing file + cWriter = null; + try { + cWriter = ConcatenatableAggregatedLogFormat.LogWriter.openExisting( + conf, indexFile, logFile, ugi); + AggregatedLogFormat.LogValue logValue = + new AggregatedLogFormat.LogValue( + Arrays.asList(rootLogDirs.getAbsolutePath()), containerIdC, + ugi.getUserName()); + cWriter.append(containerIdC, logValue); + } finally { + if (cWriter != null) { + cWriter.close(); + } + } + + ConcatenatableAggregatedLogFormat.LogReader cReader = null; + try { + cReader = + new ConcatenatableAggregatedLogFormat.LogReader(conf, indexFile, logFile); + Assert.assertEquals(ugi.getUserName(), cReader.getApplicationOwner()); + Assert.assertEquals("{VIEW_APP=" + ugi.getUserName() + "}", + cReader.getApplicationAcls().toString()); + Set containerIds = cReader.getContainerIds(); + Assert.assertEquals(3, containerIds.size()); + Assert.assertTrue(containerIds.contains(containerIdA.toString())); + Assert.assertTrue(containerIds.contains(containerIdB.toString())); + Assert.assertTrue(containerIds.contains(containerIdC.toString())); + for (ContainerId containerId : + new ContainerId[]{containerIdA, containerIdB, containerIdC}) { + ConcatenatableAggregatedLogFormat.ContainerLogsReader logReader = + cReader.getContainerLogsReader(containerId); + Map readLogs = new HashMap(3); + while (logReader.nextLog() != null) { + String logType = logReader.getCurrentLogType(); + Assert.assertFalse(readLogs.containsKey(logType)); + char[] buf = new char[(int) logReader.getCurrentLogLength()]; + logReader.read(buf, 0, buf.length); + readLogs.put(logType, new String(buf)); + } + if (containerId.equals(containerIdB)) { + Assert.assertEquals(1, readLogs.size()); + Assert.assertEquals("test log1", readLogs.get("log1")); + } else { + Assert.assertEquals(3, readLogs.size()); + Assert.assertEquals("test log1", readLogs.get("log1")); + Assert.assertEquals("test log2", readLogs.get("log2")); + Assert.assertEquals("test log3", readLogs.get("log3")); + } + } + } finally { + if (cReader != null) { + cReader.close(); + } + } + } + +public void writeLocalLogs(File rootLogDirs, ContainerId containerId) throws IOException { + File appLogDir = new File(rootLogDirs, + containerId.getApplicationAttemptId().getApplicationId().toString()); + File containerLogDir = new File(appLogDir, containerId.toString()); + writeLogs(containerLogDir.getAbsolutePath()); +} + + private void writeLogs(String dirName) throws IOException { + File f = new File(dirName + File.separator + "log1"); + if (!f.getParentFile().exists()) { + assertTrue(f.getParentFile().mkdirs()); + } + + writeLog(dirName + File.separator + "log1", "test log1"); + writeLog(dirName + File.separator + "log2", "test log2"); + writeLog(dirName + File.separator + "log3", "test log3"); + } + + private void writeLog(String fileName, String text) throws IOException { + File f = new File(fileName); + if (f.exists()) { + f.delete(); + } + Writer writer = null; + try { + writer = new FileWriter(f); + writer.write(text); + } finally { + if (writer != null) { + writer.close(); + } + } + } +}