From 29abd3fc8b34839ab25b2c436116d5f2684ccc88 Mon Sep 17 00:00:00 2001 From: Adam Antal Date: Thu, 20 Jun 2019 10:27:25 +0200 Subject: [PATCH] YARN-9525. IFile format is not working against s3a remote folder --- .../LogAggregationIndexedFileController.java | 64 +++++++++++++------ 1 file changed, 43 insertions(+), 21 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java index 7af58d7a83f..b7c1ca96cee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java @@ -44,6 +44,7 @@ import java.util.Map.Entry; import java.util.Set; import org.apache.commons.lang3.SerializationUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; @@ -110,6 +111,7 @@ "indexedFile.fs.retry-interval-ms"; private static final String LOG_ROLL_OVER_MAX_FILE_SIZE_GB = "indexedFile.log.roll-over.max-file-size-gb"; + private static final int LOG_ROLL_OVER_MAX_FILE_SIZE_GB_DEFAULT = 10; @VisibleForTesting public static final String CHECK_SUM_FILE_SUFFIX = "-checksum"; @@ -182,9 +184,14 @@ public Object run() throws Exception { indexedLogsMeta.setCompressName(compressName); } Path aggregatedLogFile = null; + Pair initializationResult = null; + boolean createdNew; + if (context.isLogAggregationInRolling()) { - aggregatedLogFile = initializeWriterInRolling( + initializationResult = initializeWriterInRolling( remoteLogFile, appId, nodeId); + aggregatedLogFile = initializationResult.getLeft(); + createdNew = initializationResult.getRight(); } else { aggregatedLogFile = remoteLogFile; fsDataOStream = fc.create(remoteLogFile, @@ -195,22 +202,28 @@ public Object run() throws Exception { } fsDataOStream.write(uuid); fsDataOStream.flush(); + createdNew = true; } - long aggregatedLogFileLength = fc.getFileStatus( - aggregatedLogFile).getLen(); - // append a simple character("\n") to move the writer cursor, so - // we could get the correct position when we call - // fsOutputStream.getStartPos() - final byte[] dummyBytes = "\n".getBytes(Charset.forName("UTF-8")); - fsDataOStream.write(dummyBytes); - fsDataOStream.flush(); - - if (fsDataOStream.getPos() >= (aggregatedLogFileLength - + dummyBytes.length)) { - currentOffSet = 0; + if (!createdNew) { + long aggregatedLogFileLength = fc.getFileStatus( + aggregatedLogFile).getLen(); + // append a simple character("\n") to move the writer cursor, so + // we could get the correct position when we call + // fsOutputStream.getStartPos() + final byte[] dummyBytes = "\n".getBytes(Charset.forName("UTF-8")); + fsDataOStream.write(dummyBytes); + fsDataOStream.flush(); + + if (fsDataOStream.getPos() < (aggregatedLogFileLength + + dummyBytes.length)) { + currentOffSet = fc.getFileStatus( + aggregatedLogFile).getLen(); + } else { + currentOffSet = 0; + } } else { - currentOffSet = aggregatedLogFileLength; + currentOffSet = 0; } return null; } @@ -220,8 +233,10 @@ public Object run() throws Exception { } } - private Path initializeWriterInRolling(final Path remoteLogFile, - final ApplicationId appId, final String nodeId) throws Exception { + private Pair initializeWriterInRolling( + final Path remoteLogFile, final ApplicationId appId, + final String nodeId) throws Exception { + boolean createdNew = false; Path aggregatedLogFile = null; // check uuid // if we can not find uuid, we would load the uuid @@ -281,6 +296,7 @@ private Path initializeWriterInRolling(final Path remoteLogFile, // writes the uuid fsDataOStream.write(uuid); fsDataOStream.flush(); + createdNew = true; } else { aggregatedLogFile = currentRemoteLogFile; fsDataOStream = fc.create(currentRemoteLogFile, @@ -289,8 +305,13 @@ private Path initializeWriterInRolling(final Path remoteLogFile, } // recreate checksum file if needed before aggregate the logs if (overwriteCheckSum) { - final long currentAggregatedLogFileLength = fc - .getFileStatus(aggregatedLogFile).getLen(); + long currentAggregatedLogFileLength; + if (createdNew) { + currentAggregatedLogFileLength = 0; + } else { + currentAggregatedLogFileLength = fc + .getFileStatus(aggregatedLogFile).getLen(); + } FSDataOutputStream checksumFileOutputStream = null; try { checksumFileOutputStream = fc.create(remoteLogCheckSumFile, @@ -307,7 +328,8 @@ private Path initializeWriterInRolling(final Path remoteLogFile, IOUtils.cleanupWithLogger(LOG, checksumFileOutputStream); } } - return aggregatedLogFile; + + return Pair.of(aggregatedLogFile, createdNew); } @Override @@ -572,7 +594,6 @@ public boolean readAggregatedLogs(ContainerLogsRequest logRequest, return findLogs; } - // TODO: fix me if the remote file system does not support append operation. @Override public List readAggregatedLogsMeta( ContainerLogsRequest logRequest) throws IOException { @@ -1144,7 +1165,8 @@ public long getRollOverLogMaxSize(Configuration conf) { } if (supportAppend) { return 1024L * 1024 * 1024 * conf.getInt( - LOG_ROLL_OVER_MAX_FILE_SIZE_GB, 10); + LOG_ROLL_OVER_MAX_FILE_SIZE_GB, + LOG_ROLL_OVER_MAX_FILE_SIZE_GB_DEFAULT); } else { return 0L; } -- 2.21.0