diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index e6d8b7ab3e..6a00220f6d 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hive.llap.io.metadata.MetadataCache.LlapBufferOrBuffers; import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata; import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.HdfsUtils; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcFile.ReaderOptions; @@ -167,7 +168,8 @@ public void resetBeforeOffer(OrcEncodedColumnBatch t) { private CompressionCodec codec; private Object fileKey; private final String cacheTag; - private FileSystem fs; + + private Utilities.SupplierWithCheckedException fsSupplier; /** * stripeRgs[stripeIx'] => boolean array (could be a bitmask) of rg-s that need to be read. @@ -211,8 +213,8 @@ public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager buff ? LlapUtil.getDbAndTableNameForMetrics(split.getPath(), true) : null; // 1. Get file metadata from cache, or create the reader and read it. // Don't cache the filesystem object for now; Tez closes it and FS cache will fix all that - fs = split.getPath().getFileSystem(jobConf); - fileKey = determineFileId(fs, split, + fsSupplier = Utilities.getFsSupplier(split.getPath(), jobConf); + fileKey = determineFileId(fsSupplier, split, HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID), HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID), !HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_IO_USE_FILEID_PATH) @@ -472,7 +474,8 @@ private boolean processStop() { return true; } - private static Object determineFileId(FileSystem fs, FileSplit split, + private static Object determineFileId(Utilities.SupplierWithCheckedException fsSupplier, + FileSplit split, boolean allowSynthetic, boolean checkDefaultFs, boolean forceSynthetic) throws IOException { if (split instanceof OrcSplit) { Object fileKey = ((OrcSplit)split).getFileKey(); @@ -481,7 +484,7 @@ private static Object determineFileId(FileSystem fs, FileSplit split, } } LOG.warn("Split for " + split.getPath() + " (" + split.getClass() + ") does not have file ID"); - return HdfsUtils.getFileId(fs, split.getPath(), allowSynthetic, checkDefaultFs, forceSynthetic); + return HdfsUtils.getFileId(fsSupplier.get(), split.getPath(), allowSynthetic, checkDefaultFs, forceSynthetic); } /** @@ -513,11 +516,11 @@ private void ensureOrcReader() throws IOException { path = split.getPath(); if (fileKey instanceof Long && HiveConf.getBoolVar( daemonConf, ConfVars.LLAP_IO_USE_FILEID_PATH)) { - path = HdfsUtils.getFileIdPath(fs, path, (long)fileKey); + path = HdfsUtils.getFileIdPath(fsSupplier.get(), path, (long)fileKey); } LlapIoImpl.ORC_LOGGER.trace("Creating reader for {} ({})", path, split.getPath()); long startTime = counters.startTimeCounter(); - ReaderOptions opts = OrcFile.readerOptions(jobConf).filesystem(fs).fileMetadata(fileMetadata); + ReaderOptions opts = OrcFile.readerOptions(jobConf).filesystem(fsSupplier.get()).fileMetadata(fileMetadata); if (split instanceof OrcSplit) { OrcTail orcTail = ((OrcSplit) split).getOrcTail(); if (orcTail != null) { @@ -732,7 +735,7 @@ private void ensureRawDataReader(boolean isOpen) throws IOException { rawDataReader = RecordReaderUtils.createDefaultDataReader( DataReaderProperties.builder().withBufferSize(orcReader.getCompressionSize()) .withCompression(orcReader.getCompressionKind()) - .withFileSystem(fs).withPath(path) + .withFileSystem(fsSupplier.get()).withPath(path) .withTypeCount(orcReader.getSchema().getMaximumId() + 1) .withZeroCopy(useZeroCopy) .build()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 4e621a4a40..172bb73305 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -255,6 +255,11 @@ private static final Object INPUT_SUMMARY_LOCK = new Object(); private static final Object ROOT_HDFS_DIR_LOCK = new Object(); + @FunctionalInterface + public interface SupplierWithCheckedException { + T get() throws X; + } + /** * ReduceField: * KEY: record key @@ -4602,4 +4607,9 @@ public static String getPasswdFromKeystore(String keystore, String key) throws I } return passwd; } + + public static SupplierWithCheckedException getFsSupplier(final Path path, + final Configuration conf) { + return () -> path.getFileSystem(conf); + } }