diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index 9e613227b9..2893870c75 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -58,7 +59,6 @@ import org.apache.hadoop.hive.llap.io.metadata.MetadataCache.LlapBufferOrBuffers; import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata; import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.HdfsUtils; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcFile.ReaderOptions; @@ -173,7 +173,7 @@ public void resetBeforeOffer(OrcEncodedColumnBatch t) { private final CacheTag cacheTag; private final Map parts; - private Utilities.SupplierWithCheckedException fsSupplier; + private Supplier fsSupplier; /** * stripeRgs[stripeIx'] => boolean array (could be a bitmask) of rg-s that need to be read. @@ -219,7 +219,7 @@ public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager buff ? LlapHiveUtils.getDbAndTableNameForMetrics(split.getPath(), true, parts) : null; // 1. Get file metadata from cache, or create the reader and read it. // Don't cache the filesystem object for now; Tez closes it and FS cache will fix all that - fsSupplier = Utilities.getFsSupplier(split.getPath(), jobConf); + fsSupplier = getFsSupplier(split.getPath(), jobConf); fileKey = determineFileId(fsSupplier, split, HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID), HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID), @@ -276,6 +276,17 @@ public Void run() throws Exception { }); } + private static Supplier getFsSupplier(final Path path, + final Configuration conf) { + return () -> { + try { + return path.getFileSystem(conf); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + } + protected Void performDataRead() throws IOException, InterruptedException { long startTime = counters.startTimeCounter(); LlapIoImpl.LOG.info("Processing data for file {}: {}", fileKey, split.getPath()); @@ -479,7 +490,7 @@ private boolean processStop() { return true; } - private static Object determineFileId(Utilities.SupplierWithCheckedException fsSupplier, + private static Object determineFileId(Supplier fsSupplier, FileSplit split, boolean allowSynthetic, boolean checkDefaultFs, boolean forceSynthetic) throws IOException { if (split instanceof OrcSplit) { @@ -525,7 +536,7 @@ private void ensureOrcReader() throws IOException { } LlapIoImpl.ORC_LOGGER.trace("Creating reader for {} ({})", path, split.getPath()); long startTime = counters.startTimeCounter(); - ReaderOptions opts = OrcFile.readerOptions(jobConf).filesystem(fsSupplier.get()).fileMetadata(fileMetadata); + ReaderOptions opts = EncodedOrcFile.readerOptions(jobConf).filesystem(fsSupplier).fileMetadata(fileMetadata); if (split instanceof OrcSplit) { OrcTail orcTail = ((OrcSplit) split).getOrcTail(); if (orcTail != null) { @@ -740,7 +751,7 @@ private void ensureRawDataReader(boolean isOpen) throws IOException { rawDataReader = RecordReaderUtils.createDefaultDataReader( DataReaderProperties.builder().withBufferSize(orcReader.getCompressionSize()) .withCompression(orcReader.getCompressionKind()) - .withFileSystem(fsSupplier.get()).withPath(path) + .withFileSystemSupplier(fsSupplier).withPath(path) .withTypeCount(orcReader.getSchema().getMaximumId() + 1) .withZeroCopy(useZeroCopy) .build()); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 3deba27dd6..3d30d0948d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -4512,11 +4512,6 @@ public static String getPasswdFromKeystore(String keystore, String key) throws I return passwd; } - public static SupplierWithCheckedException getFsSupplier(final Path path, - final Configuration conf) { - return () -> path.getFileSystem(conf); - } - /** * Logs the class paths of the job class loader and the thread context class loader to the passed logger. * Checks both loaders if getURLs method is available; if not, prints a message about this (instead of the class path) diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedOrcFile.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedOrcFile.java index 97a1b53c25..1c50612d7f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedOrcFile.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedOrcFile.java @@ -18,13 +18,49 @@ package org.apache.hadoop.hive.ql.io.orc.encoded; import java.io.IOException; +import java.util.function.Supplier; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.orc.OrcFile.ReaderOptions; +/** + * This class is used to extend ReaderOptions with a file system + * supplier rather than providing fully initialized fs object. + */ public class EncodedOrcFile { + + public static class EncodedReaderOptions extends ReaderOptions { + + private Supplier fileSystemSupplier; + + public EncodedReaderOptions(Configuration configuration) { + super(configuration); + } + + public EncodedReaderOptions filesystem(Supplier fsSupplier) { + this.fileSystemSupplier = fsSupplier; + return this; + } + + @Override + public EncodedReaderOptions filesystem(FileSystem fs) { + this.fileSystemSupplier = () -> fs; + return this; + } + + @Override + public FileSystem getFilesystem() { + return fileSystemSupplier != null ? fileSystemSupplier.get() : null; + } + } public static Reader createReader( Path path, ReaderOptions options) throws IOException { return new ReaderImpl(path, options); } + + public static EncodedReaderOptions readerOptions(Configuration conf) { + return new EncodedReaderOptions(conf); + } }