From d4da9a18af4a03f81afef491cb4781902ef5ba9b Mon Sep 17 00:00:00 2001 From: Adam Antal Date: Tue, 4 Jun 2019 14:41:20 +0200 Subject: [PATCH] POCv2 --- .../LogAggregationIndexedFileController.java | 43 +++++++++++-------- 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java index a27d809904d..caed6540c42 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java @@ -44,6 +44,7 @@ import java.util.Map.Entry; import java.util.Set; import org.apache.commons.lang3.SerializationUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; @@ -190,9 +191,14 @@ public Object run() throws Exception { indexedLogsMeta.setCompressName(compressName); } Path aggregatedLogFile = null; + Pair initializationResult = null; + boolean createdNew; + if (context.isLogAggregationInRolling()) { - aggregatedLogFile = initializeWriterInRolling( + initializationResult = initializeWriterInRolling( remoteLogFile, appId, nodeId); + aggregatedLogFile = initializationResult.getLeft(); + createdNew = initializationResult.getRight(); } else { aggregatedLogFile = remoteLogFile; fsDataOStream = fc.create(remoteLogFile, @@ -203,22 +209,14 @@ public Object run() throws Exception { } fsDataOStream.write(uuid); fsDataOStream.flush(); + createdNew = true; } - long aggregatedLogFileLength = fc.getFileStatus( - aggregatedLogFile).getLen(); - // append a simple character("\n") to move the writer cursor, so - // we could get the correct position when we call - // fsOutputStream.getStartPos() - final byte[] dummyBytes = "\n".getBytes(Charset.forName("UTF-8")); - fsDataOStream.write(dummyBytes); - fsDataOStream.flush(); - - if (fsDataOStream.getPos() >= (aggregatedLogFileLength - + dummyBytes.length)) { + if (createdNew) { currentOffSet = 0; } else { - currentOffSet = aggregatedLogFileLength; + currentOffSet = fc.getFileStatus( + aggregatedLogFile).getLen(); } return null; } @@ -228,8 +226,10 @@ public Object run() throws Exception { } } - private Path initializeWriterInRolling(final Path remoteLogFile, - final ApplicationId appId, final String nodeId) throws Exception { + private Pair initializeWriterInRolling( + final Path remoteLogFile, final ApplicationId appId, + final String nodeId) throws Exception { + boolean createdNew = false; Path aggregatedLogFile = null; // check uuid // if we can not find uuid, we would load the uuid @@ -289,6 +289,7 @@ private Path initializeWriterInRolling(final Path remoteLogFile, // writes the uuid fsDataOStream.write(uuid); fsDataOStream.flush(); + createdNew = true; } else { aggregatedLogFile = currentRemoteLogFile; fsDataOStream = fc.create(currentRemoteLogFile, @@ -297,8 +298,13 @@ private Path initializeWriterInRolling(final Path remoteLogFile, } // recreate checksum file if needed before aggregate the logs if (overwriteCheckSum) { - final long currentAggregatedLogFileLength = fc - .getFileStatus(aggregatedLogFile).getLen(); + long currentAggregatedLogFileLength; + if (createdNew) { + currentAggregatedLogFileLength = 0; + } else { + currentAggregatedLogFileLength = fc + .getFileStatus(aggregatedLogFile).getLen(); + } FSDataOutputStream checksumFileOutputStream = null; try { checksumFileOutputStream = fc.create(remoteLogCheckSumFile, @@ -315,7 +321,8 @@ private Path initializeWriterInRolling(final Path remoteLogFile, IOUtils.cleanupWithLogger(LOG, checksumFileOutputStream); } } - return aggregatedLogFile; + + return Pair.of(aggregatedLogFile, createdNew); } @Override -- 2.21.0