From cd85a46d78ad1d5d71cb7e3cf4133f6f25fc6476 Mon Sep 17 00:00:00 2001 From: Panos Garefalakis Date: Thu, 7 May 2020 14:38:50 +0100 Subject: [PATCH] Extending LLapInputFormat to check/set random IO input setting for S3 Change-Id: I0ba3a4c5af7d972654e8e8d34014c31203ec1816 --- .../hadoop/hive/llap/io/api/impl/LlapInputFormat.java | 11 +++++++++++ .../org/apache/hadoop/hive/ql/io/HiveInputFormat.java | 4 ++-- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java index ac1aca88668..513e9a7fa94 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java @@ -19,6 +19,9 @@ package org.apache.hadoop.hive.llap.io.api.impl; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.S3AInputPolicy; import org.apache.hadoop.hive.ql.exec.vector.VectorizedSupport; import org.apache.hadoop.hive.ql.io.BatchToRowInputFormat; import org.apache.hadoop.hive.conf.HiveConf; @@ -62,6 +65,9 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hive.common.util.HiveStringUtils; +import static org.apache.hadoop.hive.common.FileUtils.isS3a; +import static org.apache.hadoop.hive.ql.io.HiveInputFormat.isRandomAccessInputFormat; + public class LlapInputFormat implements InputFormat, VectorizedInputFormatInterface, SelfDescribingInputFormatInterface, AvoidSplitCombination { @@ -100,6 +106,11 @@ FileSplit fileSplit = (FileSplit) split; reporter.setStatus(fileSplit.toString()); + FileSystem splitFileSystem = fileSplit.getPath().getFileSystem(job); + if (isS3a(splitFileSystem) && isRandomAccessInputFormat(sourceInputFormat)) { + LlapIoImpl.LOG.info("Changing S3A input policy to RANDOM"); + ((S3AFileSystem) splitFileSystem).setInputPolicy(S3AInputPolicy.Random); + } try { // At this entry point, we are going to assume that these are logical table columns. // Perhaps we should go thru the code and clean this up to be more explicit; for now, we diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 218d6651af0..f04330831da 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -386,7 +386,7 @@ public static boolean canInjectCaches(Class clazz) { * @param inputFormat * @return */ - private static boolean isRandomAccessInputFormat(InputFormat inputFormat) { + public static boolean isRandomAccessInputFormat(InputFormat inputFormat) { if (inputFormat instanceof OrcInputFormat || inputFormat instanceof VectorizedParquetInputFormat) { return true; @@ -449,7 +449,7 @@ public RecordReader getRecordReader(InputSplit split, JobConf job, FileSystem splitFileSystem = splitPath.getFileSystem(job); if (isS3a(splitFileSystem) && isRandomAccessInputFormat(inputFormat)) { - LOG.debug("Changing S3A input policy to RANDOM for split {}", splitPath); + LOG.info("Changing S3A input policy to RANDOM"); ((S3AFileSystem) splitFileSystem).setInputPolicy(S3AInputPolicy.Random); } -- 2.20.1 (Apple Git-117)