diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java index 0590535..aeef574 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java @@ -226,10 +226,12 @@ public abstract void renderAggregatedLogsBlock(Block html, * Returns the owner of the application. * * @param aggregatedLogPath the aggregatedLog path + * @param appId the ApplicationId * @return the application owner * @throws IOException if we can not get the application owner */ - public abstract String getApplicationOwner(Path aggregatedLogPath) + public abstract String getApplicationOwner(Path aggregatedLogPath, + ApplicationId appId) throws IOException; /** @@ -237,11 +239,12 @@ public abstract String getApplicationOwner(Path aggregatedLogPath) * found. * * @param aggregatedLogPath the aggregatedLog path. + * @param appId the ApplicationId * @return a map of the Application ACLs. * @throws IOException if we can not get the application acls */ public abstract Map getApplicationAcls( - Path aggregatedLogPath) throws IOException; + Path aggregatedLogPath, ApplicationId appId) throws IOException; /** * Verify and create the remote log directory. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java index 6d48d7a..c53ffcc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java @@ -135,7 +135,7 @@ protected void render(Block html) { IndexedLogsMeta indexedLogsMeta = null; try { indexedLogsMeta = fileController.loadIndexedLogsMeta( - thisNodeFile.getPath(), endIndex); + thisNodeFile.getPath(), endIndex, appId); } catch (Exception ex) { // DO NOTHING LOG.warn("Can not load log meta from the log file:" diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java index 800c0a2..56bae26 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java @@ -284,16 +284,8 @@ private Path initializeWriterInRolling(final Path remoteLogFile, currentRemoteLogFile.getName())) { overwriteCheckSum = false; long endIndex = checksumFileInputStream.readLong(); - IndexedLogsMeta recoveredLogsMeta = null; - try { - truncateFileWithRetries(fc, currentRemoteLogFile, - endIndex); - recoveredLogsMeta = loadIndexedLogsMeta( - currentRemoteLogFile); - } catch (Exception ex) { - recoveredLogsMeta = loadIndexedLogsMeta( - currentRemoteLogFile, endIndex); - } + IndexedLogsMeta recoveredLogsMeta = loadIndexedLogsMeta( + currentRemoteLogFile, endIndex, appId); if (recoveredLogsMeta != null) { indexedLogsMeta = recoveredLogsMeta; } @@ -524,11 +516,11 @@ public boolean readAggregatedLogs(ContainerLogsRequest logRequest, IndexedLogsMeta indexedLogsMeta = null; try { indexedLogsMeta = loadIndexedLogsMeta(thisNodeFile.getPath(), - endIndex); + endIndex, appId); } catch (Exception ex) { // DO NOTHING LOG.warn("Can not load log meta from the log file:" - + thisNodeFile.getPath()); + + thisNodeFile.getPath() + "\n" + ex.getMessage()); continue; } if (indexedLogsMeta == null) { @@ -636,14 +628,14 @@ public boolean readAggregatedLogs(ContainerLogsRequest logRequest, endIndex = checkSumIndex.longValue(); } IndexedLogsMeta current = loadIndexedLogsMeta( - thisNodeFile.getPath(), endIndex); + thisNodeFile.getPath(), endIndex, appId); if (current != null) { listOfLogsMeta.add(current); } } catch (IOException ex) { // DO NOTHING LOG.warn("Can not get log meta from the log file:" - + thisNodeFile.getPath()); + + thisNodeFile.getPath() + "\n" + ex.getMessage()); } } for (IndexedLogsMeta indexedLogMeta : listOfLogsMeta) { @@ -721,6 +713,7 @@ public boolean apply(FileStatus next) { checkSumFiles.put(nodeName, Long.valueOf(index)); } } catch (IOException ex) { + LOG.warn(ex.getMessage()); continue; } finally { IOUtils.cleanupWithLogger(LOG, checksumFileInputStream); @@ -773,25 +766,26 @@ public void renderAggregatedLogsBlock(Block html, ViewContext context) { } @Override - public String getApplicationOwner(Path aggregatedLogPath) + public String getApplicationOwner(Path aggregatedLogPath, + ApplicationId appId) throws IOException { if (this.cachedIndexedLogsMeta == null || !this.cachedIndexedLogsMeta.getRemoteLogPath() .equals(aggregatedLogPath)) { this.cachedIndexedLogsMeta = new CachedIndexedLogsMeta( - loadIndexedLogsMeta(aggregatedLogPath), aggregatedLogPath); + loadIndexedLogsMeta(aggregatedLogPath, appId), aggregatedLogPath); } return this.cachedIndexedLogsMeta.getCachedIndexedLogsMeta().getUser(); } @Override public Map getApplicationAcls( - Path aggregatedLogPath) throws IOException { + Path aggregatedLogPath, ApplicationId appId) throws IOException { if (this.cachedIndexedLogsMeta == null || !this.cachedIndexedLogsMeta.getRemoteLogPath() .equals(aggregatedLogPath)) { this.cachedIndexedLogsMeta = new CachedIndexedLogsMeta( - loadIndexedLogsMeta(aggregatedLogPath), aggregatedLogPath); + loadIndexedLogsMeta(aggregatedLogPath, appId), aggregatedLogPath); } return this.cachedIndexedLogsMeta.getCachedIndexedLogsMeta().getAcls(); } @@ -804,8 +798,8 @@ public Path getRemoteAppLogDir(ApplicationId appId, String user) } @Private - public IndexedLogsMeta loadIndexedLogsMeta(Path remoteLogPath, long end) - throws IOException { + public IndexedLogsMeta loadIndexedLogsMeta(Path remoteLogPath, long end, + ApplicationId appId) throws IOException { FileContext fileContext = FileContext.getFileContext(remoteLogPath.toUri(), conf); FSDataInputStream fsDataIStream = null; @@ -816,8 +810,36 @@ public IndexedLogsMeta loadIndexedLogsMeta(Path remoteLogPath, long end) } long fileLength = end < 0 ? fileContext.getFileStatus( remoteLogPath).getLen() : end; + fsDataIStream.seek(fileLength - Integer.SIZE/ Byte.SIZE - UUID_LENGTH); int offset = fsDataIStream.readInt(); + // If the offset/log meta size is larger than 64M, + // output a warn message for better debug. + if (offset > 64 * 1024 * 1024) { + LOG.warn("The log meta size read from " + remoteLogPath + + " is " + offset); + } + + // Load UUID and make sure the UUID is correct. + byte[] uuidRead = new byte[UUID_LENGTH]; + int uuidReadLen = fsDataIStream.read(uuidRead); + if (this.uuid == null) { + this.uuid = createUUID(appId); + } + if (uuidReadLen != UUID_LENGTH || !Arrays.equals(this.uuid, uuidRead)) { + if (LOG.isDebugEnabled()) { + LOG.debug("the length of loaded UUID:" + uuidReadLen); + LOG.debug("the loaded UUID:" + new String(uuidRead, + Charset.forName("UTF-8"))); + LOG.debug("the expected UUID:" + new String(this.uuid, + Charset.forName("UTF-8"))); + } + throw new IOException("The UUID from " + + remoteLogPath + " is not correct. The offset of loaded UUID is " + + (fileLength - UUID_LENGTH)); + } + + // Load Log Meta byte[] array = new byte[offset]; fsDataIStream.seek( fileLength - offset - Integer.SIZE/ Byte.SIZE - UUID_LENGTH); @@ -833,9 +855,9 @@ public IndexedLogsMeta loadIndexedLogsMeta(Path remoteLogPath, long end) } } - private IndexedLogsMeta loadIndexedLogsMeta(Path remoteLogPath) - throws IOException { - return loadIndexedLogsMeta(remoteLogPath, -1); + private IndexedLogsMeta loadIndexedLogsMeta(Path remoteLogPath, + ApplicationId appId) throws IOException { + return loadIndexedLogsMeta(remoteLogPath, -1, appId); } /** @@ -1040,6 +1062,7 @@ private static String logErrorMessage(File logFile, Exception e) { this.out = compressAlgo.createCompressionStream( fsBufferedOutput, compressor, 0); } catch (IOException e) { + LOG.warn(e.getMessage()); compressAlgo.returnCompressor(compressor); throw e; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java index 5064e26..a4f50d2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java @@ -335,14 +335,15 @@ public void renderAggregatedLogsBlock(Block html, ViewContext context) { } @Override - public String getApplicationOwner(Path aggregatedLog) throws IOException { + public String getApplicationOwner(Path aggregatedLog, ApplicationId appId) + throws IOException { createTFileLogReader(aggregatedLog); return this.tfReader.getLogReader().getApplicationOwner(); } @Override public Map getApplicationAcls( - Path aggregatedLog) throws IOException { + Path aggregatedLog, ApplicationId appId) throws IOException { createTFileLogReader(aggregatedLog); return this.tfReader.getLogReader().getApplicationAcls(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java index 2d0864a..99aca1b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java @@ -194,14 +194,15 @@ public void renderAggregatedLogsBlock(Block html, ViewContext context) { } @Override - public String getApplicationOwner(Path aggregatedLogPath) + public String getApplicationOwner(Path aggregatedLogPath, + ApplicationId appId) throws IOException { return null; } @Override public Map getApplicationAcls( - Path aggregatedLogPath) throws IOException { + Path aggregatedLogPath, ApplicationId appId) throws IOException { return null; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java index 7d0205b..9c02c1b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java @@ -55,7 +55,9 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.ControlledClock; import org.junit.After; @@ -219,6 +221,25 @@ public boolean isRollover(final FileContext fc, } sysOutStream.reset(); + Configuration factoryConf = new Configuration(conf); + factoryConf.set("yarn.log-aggregation.file-formats", "Indexed"); + factoryConf.set("yarn.log-aggregation.file-controller.Indexed.class", + "org.apache.hadoop.yarn.logaggregation.filecontroller.ifile" + + ".LogAggregationIndexedFileController"); + LogAggregationFileControllerFactory factory = + new LogAggregationFileControllerFactory(factoryConf); + LogAggregationFileController fileController = factory + .getFileControllerForRead(appId, USER_UGI.getShortUserName()); + Assert.assertTrue(fileController instanceof + LogAggregationIndexedFileController); + foundLogs = fileController.readAggregatedLogs(logRequest, System.out); + Assert.assertTrue(foundLogs); + for (String logType : logTypes) { + Assert.assertTrue(sysOutStream.toString().contains(logMessage( + containerId, logType))); + } + sysOutStream.reset(); + // create a checksum file Path checksumFile = new Path(fileFormat.getRemoteAppLogDir( appId, USER_UGI.getShortUserName()),