diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index e6d8b7ab3e..abd66d0ab1 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hive.llap.io.metadata.MetadataCache.LlapBufferOrBuffers; import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata; import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.HdfsUtils; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcFile.ReaderOptions; @@ -167,7 +168,8 @@ public void resetBeforeOffer(OrcEncodedColumnBatch t) { private CompressionCodec codec; private Object fileKey; private final String cacheTag; - private FileSystem fs; + + private Utilities.SupplierWithCheckedException fsSupplier; /** * stripeRgs[stripeIx'] => boolean array (could be a bitmask) of rg-s that need to be read. @@ -211,8 +213,7 @@ public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager buff ? LlapUtil.getDbAndTableNameForMetrics(split.getPath(), true) : null; // 1. Get file metadata from cache, or create the reader and read it. // Don't cache the filesystem object for now; Tez closes it and FS cache will fix all that - fs = split.getPath().getFileSystem(jobConf); - fileKey = determineFileId(fs, split, + fileKey = determineFileId(Utilities.getFsSupplier(split.getPath(), jobConf), split, HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID), HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID), !HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_IO_USE_FILEID_PATH) @@ -472,7 +473,8 @@ private boolean processStop() { return true; } - private static Object determineFileId(FileSystem fs, FileSplit split, + private static Object determineFileId(Utilities.SupplierWithCheckedException fsSupplier, + FileSplit split, boolean allowSynthetic, boolean checkDefaultFs, boolean forceSynthetic) throws IOException { if (split instanceof OrcSplit) { Object fileKey = ((OrcSplit)split).getFileKey(); @@ -481,7 +483,7 @@ private static Object determineFileId(FileSystem fs, FileSplit split, } } LOG.warn("Split for " + split.getPath() + " (" + split.getClass() + ") does not have file ID"); - return HdfsUtils.getFileId(fs, split.getPath(), allowSynthetic, checkDefaultFs, forceSynthetic); + return HdfsUtils.getFileId(fsSupplier, split.getPath(), allowSynthetic, checkDefaultFs, forceSynthetic); } /** @@ -513,11 +515,11 @@ private void ensureOrcReader() throws IOException { path = split.getPath(); if (fileKey instanceof Long && HiveConf.getBoolVar( daemonConf, ConfVars.LLAP_IO_USE_FILEID_PATH)) { - path = HdfsUtils.getFileIdPath(fs, path, (long)fileKey); + path = HdfsUtils.getFileIdPath(fsSupplier, path, (long)fileKey); } LlapIoImpl.ORC_LOGGER.trace("Creating reader for {} ({})", path, split.getPath()); long startTime = counters.startTimeCounter(); - ReaderOptions opts = OrcFile.readerOptions(jobConf).filesystem(fs).fileMetadata(fileMetadata); + ReaderOptions opts = OrcFile.readerOptions(jobConf).filesystem(fsSupplier.get()).fileMetadata(fileMetadata); if (split instanceof OrcSplit) { OrcTail orcTail = ((OrcSplit) split).getOrcTail(); if (orcTail != null) { @@ -732,7 +734,7 @@ private void ensureRawDataReader(boolean isOpen) throws IOException { rawDataReader = RecordReaderUtils.createDefaultDataReader( DataReaderProperties.builder().withBufferSize(orcReader.getCompressionSize()) .withCompression(orcReader.getCompressionKind()) - .withFileSystem(fs).withPath(path) + .withFileSystem(fsSupplier.get()).withPath(path) .withTypeCount(orcReader.getSchema().getMaximumId() + 1) .withZeroCopy(useZeroCopy) .build()); diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java index a5671e9682..7ed760c8f7 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.llap.io.encoded; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.orc.impl.MemoryManager; import java.io.IOException; @@ -150,7 +151,6 @@ public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end private final Object fileKey; private final String cacheTag; - private final FileSystem fs; private AtomicBoolean isStopped = new AtomicBoolean(false); private final Deserializer sourceSerDe; @@ -213,8 +213,7 @@ public MemoryBuffer create() { throw new RuntimeException(e); } - fs = split.getPath().getFileSystem(daemonConf); - fileKey = determineFileId(fs, split, + fileKey = determineFileId(Utilities.getFsSupplier(split.getPath(), daemonConf), split, HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID), HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID), !HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_IO_USE_FILEID_PATH)); @@ -1698,13 +1697,13 @@ private boolean processStop() { return true; } - private static Object determineFileId(FileSystem fs, FileSplit split, + private static Object determineFileId(Utilities.SupplierWithCheckedException fsSupplier, FileSplit split, boolean allowSynthetic, boolean checkDefaultFs, boolean forceSynthetic) throws IOException { /* TODO: support this optionally? this is not OrcSplit, but we could add a custom split. Object fileKey = ((OrcSplit)split).getFileKey(); if (fileKey != null) return fileKey; */ LlapIoImpl.LOG.warn("Split for " + split.getPath() + " (" + split.getClass() + ") does not have file ID"); - return HdfsUtils.getFileId(fs, split.getPath(), allowSynthetic, checkDefaultFs, forceSynthetic); + return HdfsUtils.getFileId(fsSupplier, split.getPath(), allowSynthetic, checkDefaultFs, forceSynthetic); } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 61e34308bc..88a92abc2b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -256,6 +256,11 @@ private static final Object INPUT_SUMMARY_LOCK = new Object(); private static final Object ROOT_HDFS_DIR_LOCK = new Object(); + @FunctionalInterface + public interface SupplierWithCheckedException { + T get() throws X; + } + /** * ReduceField: * KEY: record key @@ -4589,4 +4594,9 @@ public static String getPasswdFromKeystore(String keystore, String key) throws I } return passwd; } + + public static SupplierWithCheckedException getFsSupplier(final Path path, + final Configuration conf) { + return () -> path.getFileSystem(conf); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java index 3482cfce36..7b8af37a65 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileStatus; @@ -44,8 +45,9 @@ private static final HadoopShims SHIMS = ShimLoader.getHadoopShims(); private static final Logger LOG = LoggerFactory.getLogger(HdfsUtils.class); - public static Object getFileId(FileSystem fileSystem, Path path, + public static Object getFileId(Utilities.SupplierWithCheckedException fsSupplier, Path path, boolean allowSynthetic, boolean checkDefaultFs, boolean forceSyntheticIds) throws IOException { + FileSystem fileSystem = fsSupplier.get(); if (forceSyntheticIds == false && fileSystem instanceof DistributedFileSystem) { DistributedFileSystem dfs = (DistributedFileSystem) fileSystem; if ((!checkDefaultFs) || isDefaultFs(dfs)) { @@ -100,7 +102,7 @@ public static long createTestFileId( private static String HDFS_ID_PATH_PREFIX = "/.reserved/.inodes/"; public static Path getFileIdPath( - FileSystem fileSystem, Path path, long fileId) { + Utilities.SupplierWithCheckedException fileSystem, Path path, long fileId) { return ((fileSystem instanceof DistributedFileSystem)) ? new Path(HDFS_ID_PATH_PREFIX + fileId) : path; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java index fd776cf978..9c05afa62d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java @@ -191,7 +191,7 @@ public void initialize( String cacheTag = null; // TODO: also support fileKey in splits, like OrcSplit does if (metadataCache != null) { - cacheKey = HdfsUtils.getFileId(file.getFileSystem(configuration), file, + cacheKey = HdfsUtils.getFileId(Utilities.getFsSupplier(file, configuration), file, HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID), HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID), !HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_IO_USE_FILEID_PATH)); @@ -201,10 +201,9 @@ public void initialize( cacheTag = LlapUtil.getDbAndTableNameForMetrics(file, true); } // If we are going to use cache, change the path to depend on file ID for extra consistency. - FileSystem fs = file.getFileSystem(configuration); if (cacheKey instanceof Long && HiveConf.getBoolVar( cacheConf, ConfVars.LLAP_IO_USE_FILEID_PATH)) { - file = HdfsUtils.getFileIdPath(fs, file, (long)cacheKey); + file = HdfsUtils.getFileIdPath(Utilities.getFsSupplier(file, configuration), file, (long)cacheKey); } }