diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index a120b4573d..d43fe22146 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2664,6 +2664,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "are not hidden by the INSERT OVERWRITE."), HIVE_TXN_STATS_ENABLED("hive.txn.stats.enabled", true, "Whether Hive supports transactional stats (accurate stats for transactional tables)"), + HIVE_TXN_ACID_DIR_CACHE_DURATION("hive.txn.acid.dir.cache.duration", + 120, "Enable dir cache for ACID tables specified in minutes." + + "0 indicates cache is disabled. "), HIVE_TXN_READONLY_ENABLED("hive.txn.readonly.enabled", false, "Enables read-only transaction classification and related optimizations"), diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 2f5ec5270c..663eb8f207 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -35,10 +35,15 @@ import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import java.util.regex.Pattern; import com.google.common.base.Strings; import com.google.common.base.Preconditions; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -163,6 +168,9 @@ public boolean accept(Path path) { public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$"); public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{6}"); + private static Cache dirCache; + private static AtomicBoolean dirCacheInited = new AtomicBoolean(); + /** * A write into a non-aicd table produces files like 0000_0 or 0000_0_copy_1 * (Unless via Load Data statement) @@ -518,6 +526,7 @@ else if (filename.startsWith(BUCKET_PREFIX)) { private final List obsolete; private final List deltas; private final Path base; + private List baseFiles; public DirectoryImpl(List abortedDirectories, boolean isBaseInRawFormat, List original, @@ -536,6 +545,14 @@ public Path getBaseDirectory() { return base; } + public List getBaseFiles() { + return baseFiles; + } + + void setBaseFiles(List baseFiles) { + this.baseFiles = baseFiles; + } + @Override public boolean isBaseInRawFormat() { return isBaseInRawFormat; @@ -815,6 +832,9 @@ public String toString() { * @return the base directory to read */ Path getBaseDirectory(); + + List getBaseFiles(); + boolean isBaseInRawFormat(); /** @@ -3075,4 +3095,152 @@ public static TxnType getTxnType(Configuration conf, ASTNode tree) { astSearcher.simpleBreadthFirstSearch(tree, pattern) != null)) ? TxnType.READ_ONLY : TxnType.DEFAULT; } + + public static List findBaseFiles( + Path base, Ref useFileIds, Supplier fs) throws IOException { + Boolean val = useFileIds.value; + if (val == null || val) { + try { + List result = SHIMS.listLocatedHdfsStatus( + fs.get(), base, AcidUtils.hiddenFileFilter); + if (val == null) { + useFileIds.value = true; // The call succeeded, so presumably the API is there. + } + return result; + } catch (Throwable t) { + LOG.error("Failed to get files with ID; using regular API: " + t.getMessage()); + if (val == null && t instanceof UnsupportedOperationException) { + useFileIds.value = false; + } + } + } + + // Fall back to regular API and create states without ID. + List children = HdfsUtils.listLocatedStatus(fs.get(), base, AcidUtils.hiddenFileFilter); + List result = new ArrayList<>(children.size()); + for (FileStatus child : children) { + result.add(AcidUtils.createOriginalObj(null, child)); + } + return result; + } + + private static void initDirCache(int durationInMts) { + if (dirCacheInited.get()) { + LOG.debug("DirCache got initialized already"); + return; + } + dirCache = CacheBuilder.newBuilder() + .expireAfterWrite(durationInMts, TimeUnit.MINUTES) + .softValues() + .build(); + dirCacheInited.set(true); + } + + /** + * Tries to get directory details from cache. For now, cache is valid only + * when base directory is available and no deltas are present. This should + * be used only in BI strategy and for ACID tables. + * + * @param fileSystem file system supplier + * @param candidateDirectory the partition directory to analyze + * @param conf the configuration + * @param writeIdList the list of write ids that we are reading + * @param useFileIds + * @param ignoreEmptyFiles + * @param tblproperties + * @param generateDirSnapshots + * @return directory state + * @throws IOException on errors + */ + public static Directory getAcidStateFromCache(Supplier fileSystem, + Path candidateDirectory, Configuration conf, + ValidWriteIdList writeIdList, Ref useFileIds, boolean ignoreEmptyFiles, + Map tblproperties, boolean generateDirSnapshots) throws IOException { + + int dirCacheDuration = HiveConf.getIntVar(conf, + ConfVars.HIVE_TXN_ACID_DIR_CACHE_DURATION); + + if (dirCacheDuration <= 0) { + LOG.debug("dirCache is not enabled"); + return getAcidState(fileSystem.get(), candidateDirectory, conf, writeIdList, + useFileIds, ignoreEmptyFiles, tblproperties, generateDirSnapshots); + } else { + initDirCache(dirCacheDuration); + } + + /* + * Cache for single case, where base directory is there without deltas. + * In case of changes, cache would get invalidated based on + * open/aborted list + */ + //dbName + tableName + dir + String key = writeIdList.getTableName() + "_" + candidateDirectory.toString(); + DirInfoValue value = dirCache.getIfPresent(key); + + // in case of open/aborted txns, recompute dirInfo + long[] exceptions = writeIdList.getInvalidWriteIds(); + boolean recompute = (exceptions != null && exceptions.length > 0); + + if (recompute) { + LOG.info("invalidating cache entry for key: {}", key); + dirCache.invalidate(key); + value = null; + } + + if (value != null) { + // double check writeIds + if (!value.getTxnString().equalsIgnoreCase(writeIdList.writeToString())) { + if (LOG.isDebugEnabled()) { + LOG.info("writeIdList: {} from cache: {} is not matching " + + "for key: {}", writeIdList.writeToString(), + value.getTxnString(), key); + } + recompute = true; + } + } + + // compute and add to cache + if (recompute || (value == null)) { + Directory dirInfo = getAcidState(fileSystem.get(), candidateDirectory, conf, + writeIdList, useFileIds, ignoreEmptyFiles, tblproperties, + generateDirSnapshots); + value = new DirInfoValue(writeIdList.writeToString(), dirInfo); + + if (value.dirInfo != null && value.dirInfo.getBaseDirectory() != null + && value.dirInfo.getCurrentDirectories().isEmpty()) { + populateBaseFiles(dirInfo, useFileIds, fileSystem); + dirCache.put(key, value); + } + } else { + LOG.info("Got {} from cache, cache size: {}", key, dirCache.size()); + } + return value.getDirInfo(); + } + + private static void populateBaseFiles(Directory dirInfo, + Ref useFileIds, Supplier fileSystem) throws IOException { + if (dirInfo.getBaseDirectory() != null) { + // Cache base directory contents + List children = findBaseFiles(dirInfo.getBaseDirectory(), useFileIds, fileSystem); + ((DirectoryImpl) dirInfo).setBaseFiles(children); + } + } + + static class DirInfoValue { + private String txnString; + private AcidUtils.Directory dirInfo; + + DirInfoValue(String txnString, AcidUtils.Directory dirInfo) { + this.txnString = txnString; + this.dirInfo = dirInfo; + } + + String getTxnString() { + return txnString; + } + + AcidUtils.Directory getDirInfo() { + return dirInfo; + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 76984abd0a..e891dd6f84 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -44,6 +44,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; @@ -1209,7 +1210,7 @@ public String toString() { */ static final class FileGenerator implements Callable { private final Context context; - private final FileSystem fs; + private final Supplier fs; /** * For plain or acid tables this is the root of the partition (or table if not partitioned). * For MM table this is delta/ or base/ dir. In MM case applying of the ValidTxnList that @@ -1221,12 +1222,12 @@ public String toString() { private final UserGroupInformation ugi; @VisibleForTesting - FileGenerator(Context context, FileSystem fs, Path dir, boolean useFileIds, + FileGenerator(Context context, Supplier fs, Path dir, boolean useFileIds, UserGroupInformation ugi) { this(context, fs, dir, Ref.from(useFileIds), ugi); } - FileGenerator(Context context, FileSystem fs, Path dir, Ref useFileIds, + FileGenerator(Context context, Supplier fs, Path dir, Ref useFileIds, UserGroupInformation ugi) { this.context = context; this.fs = fs; @@ -1252,6 +1253,17 @@ public AcidDirInfo run() throws Exception { } } + private Directory getAcidState() throws IOException { + if (context.isAcid && context.splitStrategyKind == SplitStrategyKind.BI) { + return AcidUtils.getAcidStateFromCache(fs, dir, context.conf, + context.writeIdList, useFileIds, true, null, true); + } else { + return AcidUtils.getAcidState(fs.get(), dir, context.conf, context.writeIdList, + useFileIds, true, null, true); + } + } + + private AcidDirInfo callInternal() throws IOException { if (context.acidOperationalProperties != null && context.acidOperationalProperties.isInsertOnly()) { @@ -1264,16 +1276,15 @@ private AcidDirInfo callInternal() throws IOException { context.conf.getBoolean("mapred.input.dir.recursive", false)); List originals = new ArrayList<>(); List baseFiles = new ArrayList<>(); - AcidUtils.findOriginals(fs, dir, originals, useFileIds, true, isRecursive); + AcidUtils.findOriginals(fs.get(), dir, originals, useFileIds, true, isRecursive); for (HdfsFileStatusWithId fileId : originals) { baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.ORIGINAL_BASE)); } - return new AcidDirInfo(fs, dir, new AcidUtils.DirectoryImpl(Lists.newArrayList(), true, originals, + return new AcidDirInfo(fs.get(), dir, new AcidUtils.DirectoryImpl(Lists.newArrayList(), true, originals, Lists.newArrayList(), Lists.newArrayList(), null), baseFiles, new ArrayList<>()); } //todo: shouldn't ignoreEmptyFiles be set based on ExecutionEngine? - AcidUtils.Directory dirInfo = AcidUtils.getAcidState( - fs, dir, context.conf, context.writeIdList, useFileIds, true, null, true); + AcidUtils.Directory dirInfo = getAcidState(); // find the base files (original or new style) List baseFiles = new ArrayList<>(); if (dirInfo.getBaseDirectory() == null) { @@ -1282,7 +1293,10 @@ private AcidDirInfo callInternal() throws IOException { baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.ORIGINAL_BASE)); } } else { - List compactedBaseFiles = findBaseFiles(dirInfo.getBaseDirectory(), useFileIds); + List compactedBaseFiles = dirInfo.getBaseFiles(); + if (compactedBaseFiles == null) { + compactedBaseFiles = AcidUtils.findBaseFiles(dirInfo.getBaseDirectory(), useFileIds, fs); + } for (HdfsFileStatusWithId fileId : compactedBaseFiles) { baseFiles.add(new AcidBaseFileInfo(fileId, dirInfo.isBaseInRawFormat() ? AcidUtils.AcidBaseFileType.ORIGINAL_BASE : AcidUtils.AcidBaseFileType.ACID_SCHEMA)); @@ -1323,7 +1337,7 @@ private AcidDirInfo callInternal() throws IOException { if (val == null || val) { try { List insertDeltaFiles = - SHIMS.listLocatedHdfsStatus(fs, parsedDelta.getPath(), bucketFilter); + SHIMS.listLocatedHdfsStatus(fs.get(), parsedDelta.getPath(), bucketFilter); for (HdfsFileStatusWithId fileId : insertDeltaFiles) { baseFiles.add(new AcidBaseFileInfo(fileId, deltaType)); } @@ -1339,7 +1353,8 @@ private AcidDirInfo callInternal() throws IOException { } } // Fall back to regular API and create statuses without ID. - List children = HdfsUtils.listLocatedStatus(fs, parsedDelta.getPath(), bucketFilter); + List children = HdfsUtils.listLocatedStatus(fs.get(), + parsedDelta.getPath(), bucketFilter); for (FileStatus child : children) { HdfsFileStatusWithId fileId = AcidUtils.createOriginalObj(null, child); baseFiles.add(new AcidBaseFileInfo(fileId, deltaType)); @@ -1358,35 +1373,7 @@ private AcidDirInfo callInternal() throws IOException { // should be considered as usual. parsedDeltas.addAll(dirInfo.getCurrentDirectories()); } - return new AcidDirInfo(fs, dir, dirInfo, baseFiles, parsedDeltas); - } - - private List findBaseFiles( - Path base, Ref useFileIds) throws IOException { - Boolean val = useFileIds.value; - if (val == null || val) { - try { - List result = SHIMS.listLocatedHdfsStatus( - fs, base, AcidUtils.hiddenFileFilter); - if (val == null) { - useFileIds.value = true; // The call succeeded, so presumably the API is there. - } - return result; - } catch (Throwable t) { - LOG.error("Failed to get files with ID; using regular API: " + t.getMessage()); - if (val == null && t instanceof UnsupportedOperationException) { - useFileIds.value = false; - } - } - } - - // Fall back to regular API and create states without ID. - List children = HdfsUtils.listLocatedStatus(fs, base, AcidUtils.hiddenFileFilter); - List result = new ArrayList<>(children.size()); - for (FileStatus child : children) { - result.add(AcidUtils.createOriginalObj(null, child)); - } - return result; + return new AcidDirInfo(fs.get(), dir, dirInfo, baseFiles, parsedDeltas); } } @@ -1832,8 +1819,14 @@ private long computeProjectionSize(List fileTypes, Path[] paths = getInputPaths(conf); CompletionService ecs = new ExecutorCompletionService<>(Context.threadPool); for (Path dir : paths) { - FileSystem fs = dir.getFileSystem(conf); - FileGenerator fileGenerator = new FileGenerator(context, fs, dir, useFileIds, ugi); + Supplier fsSupplier = () -> { + try { + return dir.getFileSystem(conf); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + FileGenerator fileGenerator = new FileGenerator(context, fsSupplier, dir, useFileIds, ugi); pathFutures.add(ecs.submit(fileGenerator)); } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index e9f0b98157..9c02e08b86 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -526,7 +526,7 @@ public void testSplitStrategySelection() throws Exception { final OrcInputFormat.Context context = new OrcInputFormat.Context( conf, n); OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator( - context, fs, new MockPath(fs, "mock:/a/b"), false, null); + context, () -> fs, new MockPath(fs, "mock:/a/b"), false, null); List> splitStrategies = createSplitStrategies(context, gen); assertEquals(1, splitStrategies.size()); final SplitStrategy splitStrategy = splitStrategies.get(0); @@ -549,7 +549,7 @@ public void testSplitStrategySelection() throws Exception { final OrcInputFormat.Context context = new OrcInputFormat.Context( conf, n); OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator( - context, fs, new MockPath(fs, "mock:/a/b"), false, null); + context, () -> fs, new MockPath(fs, "mock:/a/b"), false, null); List> splitStrategies = createSplitStrategies(context, gen); assertEquals(1, splitStrategies.size()); final SplitStrategy splitStrategy = splitStrategies.get(0); @@ -567,14 +567,14 @@ public void testSplitStrategySelection() throws Exception { @Test public void testFileGenerator() throws Exception { OrcInputFormat.Context context = new OrcInputFormat.Context(conf); - MockFileSystem fs = new MockFileSystem(conf, + final MockFileSystem fs = new MockFileSystem(conf, new MockFile("mock:/a/b/part-00", 1000, new byte[1]), new MockFile("mock:/a/b/part-01", 1000, new byte[1]), new MockFile("mock:/a/b/_part-02", 1000, new byte[1]), new MockFile("mock:/a/b/.part-03", 1000, new byte[1]), new MockFile("mock:/a/b/part-04", 1000, new byte[1])); OrcInputFormat.FileGenerator gen = - new OrcInputFormat.FileGenerator(context, fs, + new OrcInputFormat.FileGenerator(context, () -> fs, new MockPath(fs, "mock:/a/b"), false, null); List> splitStrategies = createSplitStrategies(context, gen); assertEquals(1, splitStrategies.size()); @@ -582,13 +582,13 @@ public void testFileGenerator() throws Exception { conf.set("mapreduce.input.fileinputformat.split.maxsize", "500"); context = new OrcInputFormat.Context(conf); - fs = new MockFileSystem(conf, + final MockFileSystem fs1 = new MockFileSystem(conf, new MockFile("mock:/a/b/part-00", 1000, new byte[1000]), new MockFile("mock:/a/b/part-01", 1000, new byte[1000]), new MockFile("mock:/a/b/_part-02", 1000, new byte[1000]), new MockFile("mock:/a/b/.part-03", 1000, new byte[1000]), new MockFile("mock:/a/b/part-04", 1000, new byte[1000])); - gen = new OrcInputFormat.FileGenerator(context, fs, + gen = new OrcInputFormat.FileGenerator(context, () -> fs1, new MockPath(fs, "mock:/a/b"), false, null); splitStrategies = createSplitStrategies(context, gen); assertEquals(1, splitStrategies.size()); @@ -608,7 +608,7 @@ public void testACIDSplitStrategy() throws Exception { new MockFile("mock:/a/delta_001_002/bucket_000000", 1000, new byte[1], new MockBlock("host1")), new MockFile("mock:/a/delta_001_002/bucket_000001", 1000, new byte[1], new MockBlock("host1"))); OrcInputFormat.FileGenerator gen = - new OrcInputFormat.FileGenerator(context, fs, + new OrcInputFormat.FileGenerator(context, () -> fs, new MockPath(fs, "mock:/a"), false, null); List> splitStrategies = createSplitStrategies(context, gen); assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.ACIDSplitStrategy); @@ -630,11 +630,11 @@ public void testACIDSplitStrategyForSplitUpdate() throws Exception { OrcInputFormat.Context context = new OrcInputFormat.Context(conf); // Case 1: Test with just originals => Single split strategy with two splits. - MockFileSystem fs = new MockFileSystem(conf, + final MockFileSystem fs = new MockFileSystem(conf, new MockFile("mock:/a/b/000000_0", 1000, new byte[1], new MockBlock("host1")), new MockFile("mock:/a/b/000000_1", 1000, new byte[1], new MockBlock("host1"))); OrcInputFormat.FileGenerator gen = - new OrcInputFormat.FileGenerator(context, fs, + new OrcInputFormat.FileGenerator(context, () -> fs, new MockPath(fs, "mock:/a"), false, null); List> splitStrategies = createSplitStrategies(context, gen); assertEquals(1, splitStrategies.size()); @@ -648,12 +648,13 @@ public void testACIDSplitStrategyForSplitUpdate() throws Exception { // Case 2: Test with originals and base => Single split strategy with two splits on compacted // base since the presence of a base will make the originals obsolete. - fs = new MockFileSystem(conf, + final MockFileSystem fs1 = new MockFileSystem(conf, new MockFile("mock:/a/b/000000_0", 1000, new byte[1], new MockBlock("host1")), new MockFile("mock:/a/b/000000_1", 1000, new byte[1], new MockBlock("host1")), new MockFile("mock:/a/base_0000001/bucket_00000", 1000, new byte[1], new MockBlock("host1")), new MockFile("mock:/a/base_0000001/bucket_00001", 1000, new byte[1], new MockBlock("host1"))); - gen = new OrcInputFormat.FileGenerator(context, fs, new MockPath(fs, "mock:/a"), false, null); + gen = new OrcInputFormat.FileGenerator(context, () -> fs1, new MockPath(fs1, + "mock:/a"), false, null); splitStrategies = createSplitStrategies(context, gen); assertEquals(1, splitStrategies.size()); assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.ACIDSplitStrategy); @@ -665,14 +666,15 @@ public void testACIDSplitStrategyForSplitUpdate() throws Exception { assertFalse(splits.get(1).isOriginal()); // Case 3: Test with originals and deltas => Two split strategies with two splits for each. - fs = new MockFileSystem(conf, + final MockFileSystem fs3 = new MockFileSystem(conf, new MockFile("mock:/a/b/000000_0", 1000, new byte[1], new MockBlock("host1")), new MockFile("mock:/a/b/000000_1", 1000, new byte[1], new MockBlock("host1")), new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1")), new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00001", 1000, new byte[1], new MockBlock("host1")), new MockFile("mock:/a/delete_delta_0000001_0000001_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1")), new MockFile("mock:/a/delete_delta_0000001_0000001_0000/bucket_00001", 1000, new byte[1], new MockBlock("host1"))); - gen = new OrcInputFormat.FileGenerator(context, fs, new MockPath(fs, "mock:/a"), false, null); + gen = new OrcInputFormat.FileGenerator(context, () -> fs3, + new MockPath(fs3, "mock:/a"), false, null); splitStrategies = createSplitStrategies(context, gen); assertEquals(2, splitStrategies.size()); assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.ACIDSplitStrategy); @@ -697,12 +699,13 @@ public void testACIDSplitStrategyForSplitUpdate() throws Exception { // The reason why we are able to do so is because the valid user data has already been considered // as base for the covered buckets. Hence, the uncovered buckets do not have any relevant // data and we can just ignore them. - fs = new MockFileSystem(conf, + final MockFileSystem fs4 = new MockFileSystem(conf, new MockFile("mock:/a/b/000000_0", 1000, new byte[1], new MockBlock("host1")), new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1")), new MockFile("mock:/a/delete_delta_0000001_0000001_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1")), new MockFile("mock:/a/delete_delta_0000001_0000001_0000/bucket_00001", 1000, new byte[1], new MockBlock("host1"))); - gen = new OrcInputFormat.FileGenerator(context, fs, new MockPath(fs, "mock:/a"), false, null); + gen = new OrcInputFormat.FileGenerator(context, () -> fs4, new MockPath(fs4, + "mock:/a"), false, null); splitStrategies = createSplitStrategies(context, gen); assertEquals(2, splitStrategies.size()); assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.ACIDSplitStrategy); @@ -718,7 +721,7 @@ public void testACIDSplitStrategyForSplitUpdate() throws Exception { // Case 5: Test with originals, compacted_base, insert_deltas, delete_deltas (exhaustive test) // This should just generate one strategy with splits for base and insert_deltas. - fs = new MockFileSystem(conf, + final MockFileSystem fs5 = new MockFileSystem(conf, new MockFile("mock:/a/b/000000_0", 1000, new byte[1], new MockBlock("host1")), new MockFile("mock:/a/b/000000_1", 1000, new byte[1], new MockBlock("host1")), new MockFile("mock:/a/base_0000001/bucket_00000", 1000, new byte[1], new MockBlock("host1")), @@ -727,7 +730,8 @@ public void testACIDSplitStrategyForSplitUpdate() throws Exception { new MockFile("mock:/a/delta_0000002_0000002_0000/bucket_00001", 1000, new byte[1], new MockBlock("host1")), new MockFile("mock:/a/delete_delta_0000002_0000002_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1")), new MockFile("mock:/a/delete_delta_0000002_0000002_0000/bucket_00001", 1000, new byte[1], new MockBlock("host1"))); - gen = new OrcInputFormat.FileGenerator(context, fs, new MockPath(fs, "mock:/a"), false, null); + gen = new OrcInputFormat.FileGenerator(context, () -> fs5, new MockPath(fs5, + "mock:/a"), false, null); splitStrategies = createSplitStrategies(context, gen); assertEquals(1, splitStrategies.size()); assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.ACIDSplitStrategy); @@ -789,7 +793,9 @@ public void testFSCallsVectorizedOrcAcidRowBatchReader() throws IOException { conf.set("fs.mock.impl", MockFileSystem.class.getName()); OrcInputFormat.Context context = new OrcInputFormat.Context(conf); - OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(context, fs, new MockPath(fs, "mock:/a"), + OrcInputFormat.FileGenerator gen = + new OrcInputFormat.FileGenerator(context, () -> fs, new MockPath(fs, + "mock:/a"), false, null); List> splitStrategies = createSplitStrategies(context, gen); assertEquals(1, splitStrategies.size()); @@ -821,14 +827,14 @@ public void testFSCallsVectorizedOrcAcidRowBatchReader() throws IOException { public void testBIStrategySplitBlockBoundary() throws Exception { conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI"); OrcInputFormat.Context context = new OrcInputFormat.Context(conf); - MockFileSystem fs = new MockFileSystem(conf, + final MockFileSystem fs = new MockFileSystem(conf, new MockFile("mock:/a/b/part-00", 1000, new byte[1], new MockBlock("host1", "host2")), new MockFile("mock:/a/b/part-01", 1000, new byte[1], new MockBlock("host1", "host2")), new MockFile("mock:/a/b/part-02", 1000, new byte[1], new MockBlock("host1", "host2")), new MockFile("mock:/a/b/part-03", 1000, new byte[1], new MockBlock("host1", "host2")), new MockFile("mock:/a/b/part-04", 1000, new byte[1], new MockBlock("host1", "host2"))); OrcInputFormat.FileGenerator gen = - new OrcInputFormat.FileGenerator(context, fs, + new OrcInputFormat.FileGenerator(context, () -> fs, new MockPath(fs, "mock:/a/b"), false, null); List> splitStrategies = createSplitStrategies(context, gen); assertEquals(1, splitStrategies.size()); @@ -838,14 +844,14 @@ public void testBIStrategySplitBlockBoundary() throws Exception { assertEquals(5, numSplits); context = new OrcInputFormat.Context(conf); - fs = new MockFileSystem(conf, + final MockFileSystem fs0 = new MockFileSystem(conf, new MockFile("mock:/a/b/part-00", 1000, new byte[1000], new MockBlock("host1", "host2")), new MockFile("mock:/a/b/part-01", 1000, new byte[1000], new MockBlock("host1", "host2")), new MockFile("mock:/a/b/part-02", 1000, new byte[1000], new MockBlock("host1", "host2")), new MockFile("mock:/a/b/part-03", 1000, new byte[1000], new MockBlock("host1", "host2")), new MockFile("mock:/a/b/part-04", 1000, new byte[1000], new MockBlock("host1", "host2"))); - gen = new OrcInputFormat.FileGenerator(context, fs, - new MockPath(fs, "mock:/a/b"), false, null); + gen = new OrcInputFormat.FileGenerator(context, () -> fs0, + new MockPath(fs0, "mock:/a/b"), false, null); splitStrategies = createSplitStrategies(context, gen); assertEquals(1, splitStrategies.size()); assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.BISplitStrategy); @@ -854,7 +860,7 @@ public void testBIStrategySplitBlockBoundary() throws Exception { assertEquals(5, numSplits); context = new OrcInputFormat.Context(conf); - fs = new MockFileSystem(conf, + final MockFileSystem fs1 = new MockFileSystem(conf, new MockFile("mock:/a/b/part-00", 1000, new byte[1100], new MockBlock("host1", "host2"), new MockBlock("host1", "host2")), new MockFile("mock:/a/b/part-01", 1000, new byte[1100], new MockBlock("host1", "host2"), @@ -865,8 +871,8 @@ public void testBIStrategySplitBlockBoundary() throws Exception { new MockBlock("host1", "host2")), new MockFile("mock:/a/b/part-04", 1000, new byte[1100], new MockBlock("host1", "host2"), new MockBlock("host1", "host2"))); - gen = new OrcInputFormat.FileGenerator(context, fs, - new MockPath(fs, "mock:/a/b"), false, null); + gen = new OrcInputFormat.FileGenerator(context, () -> fs1, + new MockPath(fs1, "mock:/a/b"), false, null); splitStrategies = createSplitStrategies(context, gen); assertEquals(1, splitStrategies.size()); assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.BISplitStrategy); @@ -875,7 +881,7 @@ public void testBIStrategySplitBlockBoundary() throws Exception { assertEquals(10, numSplits); context = new OrcInputFormat.Context(conf); - fs = new MockFileSystem(conf, + final MockFileSystem fs2 = new MockFileSystem(conf, new MockFile("mock:/a/b/part-00", 1000, new byte[2000], new MockBlock("host1", "host2"), new MockBlock("host1", "host2")), new MockFile("mock:/a/b/part-01", 1000, new byte[2000], new MockBlock("host1", "host2"), @@ -886,8 +892,8 @@ public void testBIStrategySplitBlockBoundary() throws Exception { new MockBlock("host1", "host2")), new MockFile("mock:/a/b/part-04", 1000, new byte[2000], new MockBlock("host1", "host2"), new MockBlock("host1", "host2"))); - gen = new OrcInputFormat.FileGenerator(context, fs, - new MockPath(fs, "mock:/a/b"), false, null); + gen = new OrcInputFormat.FileGenerator(context, () -> fs2, + new MockPath(fs2, "mock:/a/b"), false, null); splitStrategies = createSplitStrategies(context, gen); assertEquals(1, splitStrategies.size()); assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.BISplitStrategy); @@ -896,7 +902,7 @@ public void testBIStrategySplitBlockBoundary() throws Exception { assertEquals(10, numSplits); context = new OrcInputFormat.Context(conf); - fs = new MockFileSystem(conf, + final MockFileSystem fs3 = new MockFileSystem(conf, new MockFile("mock:/a/b/part-00", 1000, new byte[2200], new MockBlock("host1", "host2"), new MockBlock("host1", "host2"), new MockBlock("host1", "host2")), new MockFile("mock:/a/b/part-01", 1000, new byte[2200], new MockBlock("host1", "host2"), @@ -907,8 +913,8 @@ public void testBIStrategySplitBlockBoundary() throws Exception { new MockBlock("host1", "host2"), new MockBlock("host1", "host2")), new MockFile("mock:/a/b/part-04", 1000, new byte[2200], new MockBlock("host1", "host2"), new MockBlock("host1", "host2"), new MockBlock("host1", "host2"))); - gen = new OrcInputFormat.FileGenerator(context, fs, - new MockPath(fs, "mock:/a/b"), false, null); + gen = new OrcInputFormat.FileGenerator(context, () -> fs3, + new MockPath(fs3, "mock:/a/b"), false, null); splitStrategies = createSplitStrategies(context, gen); assertEquals(1, splitStrategies.size()); assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.BISplitStrategy); @@ -1009,9 +1015,9 @@ public void testEtlCombinedStrategy() throws Exception { } public OrcInputFormat.AcidDirInfo createAdi( - OrcInputFormat.Context context, MockFileSystem fs, String path) throws IOException { + OrcInputFormat.Context context, final MockFileSystem fs, String path) throws IOException { return new OrcInputFormat.FileGenerator( - context, fs, new MockPath(fs, path), false, null).call(); + context, () -> fs, new MockPath(fs, path), false, null).call(); } private List> createSplitStrategies( diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java index a8f18d1791..6fe47d5d85 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java @@ -1086,7 +1086,7 @@ private void testVectorizedOrcAcidRowBatchReader(String deleteEventRegistry) thr AcidUtils.AcidOperationalProperties.getDefault().toInt()); OrcInputFormat.Context context = new OrcInputFormat.Context(conf); OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator( - context, fs, root, false, null); + context, () -> fs, root, false, null); OrcInputFormat.AcidDirInfo adi = gen.call(); return OrcInputFormat.determineSplitStrategies( null, context, adi.fs, adi.splitPath, adi.baseFiles, adi.deleteEvents,