diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index 9e613227b9..2893870c75 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -58,7 +59,6 @@ import org.apache.hadoop.hive.llap.io.metadata.MetadataCache.LlapBufferOrBuffers; import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata; import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.HdfsUtils; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcFile.ReaderOptions; @@ -173,7 +173,7 @@ public void resetBeforeOffer(OrcEncodedColumnBatch t) { private final CacheTag cacheTag; private final Map parts; - private Utilities.SupplierWithCheckedException fsSupplier; + private Supplier fsSupplier; /** * stripeRgs[stripeIx'] => boolean array (could be a bitmask) of rg-s that need to be read. @@ -219,7 +219,7 @@ public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager buff ? LlapHiveUtils.getDbAndTableNameForMetrics(split.getPath(), true, parts) : null; // 1. Get file metadata from cache, or create the reader and read it. // Don't cache the filesystem object for now; Tez closes it and FS cache will fix all that - fsSupplier = Utilities.getFsSupplier(split.getPath(), jobConf); + fsSupplier = getFsSupplier(split.getPath(), jobConf); fileKey = determineFileId(fsSupplier, split, HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID), HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID), @@ -276,6 +276,17 @@ public Void run() throws Exception { }); } + private static Supplier getFsSupplier(final Path path, + final Configuration conf) { + return () -> { + try { + return path.getFileSystem(conf); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + } + protected Void performDataRead() throws IOException, InterruptedException { long startTime = counters.startTimeCounter(); LlapIoImpl.LOG.info("Processing data for file {}: {}", fileKey, split.getPath()); @@ -479,7 +490,7 @@ private boolean processStop() { return true; } - private static Object determineFileId(Utilities.SupplierWithCheckedException fsSupplier, + private static Object determineFileId(Supplier fsSupplier, FileSplit split, boolean allowSynthetic, boolean checkDefaultFs, boolean forceSynthetic) throws IOException { if (split instanceof OrcSplit) { @@ -525,7 +536,7 @@ private void ensureOrcReader() throws IOException { } LlapIoImpl.ORC_LOGGER.trace("Creating reader for {} ({})", path, split.getPath()); long startTime = counters.startTimeCounter(); - ReaderOptions opts = OrcFile.readerOptions(jobConf).filesystem(fsSupplier.get()).fileMetadata(fileMetadata); + ReaderOptions opts = EncodedOrcFile.readerOptions(jobConf).filesystem(fsSupplier).fileMetadata(fileMetadata); if (split instanceof OrcSplit) { OrcTail orcTail = ((OrcSplit) split).getOrcTail(); if (orcTail != null) { @@ -740,7 +751,7 @@ private void ensureRawDataReader(boolean isOpen) throws IOException { rawDataReader = RecordReaderUtils.createDefaultDataReader( DataReaderProperties.builder().withBufferSize(orcReader.getCompressionSize()) .withCompression(orcReader.getCompressionKind()) - .withFileSystem(fsSupplier.get()).withPath(path) + .withFileSystemSupplier(fsSupplier).withPath(path) .withTypeCount(orcReader.getSchema().getMaximumId() + 1) .withZeroCopy(useZeroCopy) .build()); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 3deba27dd6..3d30d0948d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -4512,11 +4512,6 @@ public static String getPasswdFromKeystore(String keystore, String key) throws I return passwd; } - public static SupplierWithCheckedException getFsSupplier(final Path path, - final Configuration conf) { - return () -> path.getFileSystem(conf); - } - /** * Logs the class paths of the job class loader and the thread context class loader to the passed logger. * Checks both loaders if getURLs method is available; if not, prints a message about this (instead of the class path) diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedOrcFile.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedOrcFile.java index 97a1b53c25..3e0894e8c4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedOrcFile.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedOrcFile.java @@ -18,13 +18,53 @@ package org.apache.hadoop.hive.ql.io.orc.encoded; import java.io.IOException; +import java.util.function.Supplier; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.orc.OrcFile.ReaderOptions; +/** + * Factory for encoded ORC readers and options + */ public class EncodedOrcFile { + + /** + * Extends ReaderOptions to accept a file system supplier + * instead of a fully initialized fs object + */ + public static class EncodedReaderOptions extends ReaderOptions { + + private Supplier fileSystemSupplier; + + public EncodedReaderOptions(Configuration configuration) { + super(configuration); + } + + public EncodedReaderOptions filesystem(Supplier fsSupplier) { + this.fileSystemSupplier = fsSupplier; + return this; + } + + @Override + public EncodedReaderOptions filesystem(FileSystem fs) { + this.fileSystemSupplier = () -> fs; + return this; + } + + @Override + public FileSystem getFilesystem() { + return fileSystemSupplier != null ? fileSystemSupplier.get() : null; + } + } + public static Reader createReader( Path path, ReaderOptions options) throws IOException { return new ReaderImpl(path, options); } + + public static EncodedReaderOptions readerOptions(Configuration conf) { + return new EncodedReaderOptions(conf); + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/encoded/TestEncodedOrcFile.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/encoded/TestEncodedOrcFile.java new file mode 100644 index 0000000000..e25322ede9 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/encoded/TestEncodedOrcFile.java @@ -0,0 +1,57 @@ +package org.apache.hadoop.hive.ql.io.orc.encoded; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.orc.OrcFile; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hive.common.util.MockFileSystem; +import org.apache.orc.CompressionKind; +import org.apache.orc.FileMetadata; +import org.apache.orc.OrcProto; +import org.apache.orc.impl.OrcTail; +import org.junit.Test; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestEncodedOrcFile { + + @Test + public void testFileSystemIsNotInitializedWithKnownTail() throws IOException { + JobConf conf = new JobConf(); + Path path = new Path("fmock:///testtable/bucket_0"); + conf.set("hive.orc.splits.include.file.footer", "true"); + conf.set("fs.defaultFS", "fmock:///"); + conf.set("fs.mock.impl", FailingMockFileSystem.class.getName()); + + List types = new ArrayList<>(); + types.add(OrcProto.Type.newBuilder().setKind(OrcProto.Type.Kind.BINARY).build()); + FileMetadata dummyMetadata = mock(FileMetadata.class); + when(dummyMetadata.getTypes()).thenReturn(types); + when(dummyMetadata.getCompressionKind()).thenReturn(CompressionKind.NONE); + OrcFile.ReaderOptions readerOptions = EncodedOrcFile.readerOptions(conf).filesystem(() -> { + throw new RuntimeException("Filesystem should not have been initialized"); + }).orcTail(new OrcTail(OrcProto.FileTail.getDefaultInstance(), null)).fileMetadata(dummyMetadata); + + // an orc reader is created, this should not cause filesystem initialization + // because orc tail is already provided and we are not making any real reads. + Reader reader = EncodedOrcFile.createReader(path, readerOptions); + + // Following initiates the creation of data reader in ORC reader. This should + // not cause file system initialization either as we are still not making any + // real read. + reader.rows(); + } + + private static class FailingMockFileSystem extends MockFileSystem { + @Override + public void initialize(URI uri, Configuration conf) { + throw new RuntimeException("Filesystem should not have been initialized"); + } + } +}