From ed45bd7c6e52cab9471debf3ade3e51ba5506e02 Mon Sep 17 00:00:00 2001 From: Adam Antal Date: Wed, 19 Jun 2019 16:22:39 +0200 Subject: [PATCH] YARN-9607. Auto-configuring rollover-size of IFile format for non-appendable filesystems --- .../LogAggregationFileController.java | 5 --- .../LogAggregationIndexedFileController.java | 34 ++++++++++++------- ...stLogAggregationIndexedFileController.java | 28 +++++++++++++++ 3 files changed, 50 insertions(+), 17 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java index 001f4f53e7faab68483f535199a8fb0d15fffe29..ec633d6de4194a4a2a00340e5a00c4d658d6a534 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java @@ -99,11 +99,6 @@ protected static final FsPermission APP_LOG_FILE_UMASK = FsPermission .createImmutable((short) (0640 ^ 0777)); - // This is temporary solution. The configuration will be deleted once we have - // the FileSystem API to check whether append operation is supported or not. - public static final String LOG_AGGREGATION_FS_SUPPORT_APPEND - = YarnConfiguration.YARN_PREFIX+ "log-aggregation.fs-support-append"; - protected Configuration conf; protected Path remoteRootLogDir; protected String remoteRootLogDirSuffix; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java index a27d809904d5fa5172901c4a2a223af2a2e6b241..96e2d5c781b2d4e86243adb0f4a91eb73127b390 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import java.io.File; @@ -28,6 +29,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; +import java.net.URI; import java.nio.charset.Charset; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; @@ -47,12 +49,15 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonPathCapabilities; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.HarFs; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; @@ -135,16 +140,6 @@ public LogAggregationIndexedFileController() {} @Override public void initInternal(Configuration conf) { - // Currently, we need the underlying File System to support append - // operation. Will remove this check after we finish - // LogAggregationIndexedFileController for non-append mode. - boolean append = conf.getBoolean(LOG_AGGREGATION_FS_SUPPORT_APPEND, true); - if (!append) { - throw new YarnRuntimeException("The configuration:" - + LOG_AGGREGATION_FS_SUPPORT_APPEND + " is set as False. We can only" - + " use LogAggregationIndexedFileController when the FileSystem " - + "support append operations."); - } String compressName = conf.get( YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE, YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE); @@ -1139,8 +1134,23 @@ public static int getFSInputBufferSize(Configuration conf) { @Private @VisibleForTesting public long getRollOverLogMaxSize(Configuration conf) { - return 1024L * 1024 * 1024 * conf.getInt( - LOG_ROLL_OVER_MAX_FILE_SIZE_GB, 10); + boolean supportAppend = false; + try { + FileSystem fs = FileSystem.get(remoteRootLogDir.toUri(), conf); + if (fs instanceof LocalFileSystem || fs.hasPathCapability( + remoteRootLogDir, CommonPathCapabilities.FS_APPEND)) { + supportAppend = true; + } + } catch (Exception ioe) { + LOG.warn("Unable to determine if the filesystem supports " + + "append operation", ioe); + } + if (supportAppend) { + return 1024L * 1024 * 1024 * conf.getInt( + LOG_ROLL_OVER_MAX_FILE_SIZE_GB, 10); + } else { + return 0L; + } } private abstract class FSAction { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexedFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexedFileController.java index 098f3be4ce7741113853b635b03190a67e585f0a..73351813e7108a5555659d49a4d57093f291bf7d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexedFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexedFileController.java @@ -469,4 +469,32 @@ private File createAndWriteLocalLogFile(Path localLogDir, String logType, private String logMessage(ContainerId containerId, String logType) { return "Hello " + containerId + " in " + logType + "!"; } + + @Test + public void testGetRollOverLogMaxSize() { + String fileControllerName = "testController"; + String remoteDirConf = String.format( + YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT, + fileControllerName); + Configuration conf = new Configuration(); + LogAggregationIndexedFileController fileFormat + = new LogAggregationIndexedFileController(); + long defaultRolloverSize = 10L * 1024 * 1024 * 1024; + + // test local filesystem + fileFormat.initialize(conf, fileControllerName); + assertThat(fileFormat.getRollOverLogMaxSize(conf)) + .isEqualTo(defaultRolloverSize); + + // test file system supporting append + conf.set(remoteDirConf, "webhdfs://localhost/path"); + fileFormat.initialize(conf, fileControllerName); + assertThat(fileFormat.getRollOverLogMaxSize(conf)) + .isEqualTo(defaultRolloverSize); + + // test file system not supporting append + conf.set(remoteDirConf, "s3a://test/path"); + fileFormat.initialize(conf, fileControllerName); + assertThat(fileFormat.getRollOverLogMaxSize(conf)).isZero(); + } } -- 2.21.0