diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 73c2dcce2c..19e52db3da 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -27,6 +27,7 @@ import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; +import java.util.BitSet; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -88,6 +89,7 @@ import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.SerDeStats; @@ -119,7 +121,6 @@ import org.apache.orc.FileFormatException; import org.apache.orc.OrcProto; import org.apache.orc.OrcProto.Footer; -import org.apache.orc.OrcProto.Type; import org.apache.orc.OrcUtils; import org.apache.orc.StripeInformation; import org.apache.orc.StripeStatistics; @@ -614,6 +615,7 @@ public boolean validateInput(FileSystem fs, HiveConf conf, private final int numBuckets; private final int splitStrategyBatchMs; private final long maxSize; + private final BitSet includedBuckets; private final long minSize; private final int etlFileThreshold; private final boolean footerInSplits; @@ -622,24 +624,25 @@ public boolean validateInput(FileSystem fs, HiveConf conf, private final AtomicInteger cacheHitCounter = new AtomicInteger(0); private final AtomicInteger numFilesCounter = new AtomicInteger(0); private final ValidWriteIdList writeIdList; - private SplitStrategyKind splitStrategyKind; + private final SplitStrategyKind splitStrategyKind; private final SearchArgument sarg; private final AcidOperationalProperties acidOperationalProperties; Context(Configuration conf) throws IOException { - this(conf, 1, null); + this(conf, 1, null, null); } Context(Configuration conf, final int minSplits) throws IOException { - this(conf, minSplits, null); + this(conf, minSplits, null, null); } @VisibleForTesting - Context(Configuration conf, final int minSplits, ExternalFooterCachesByConf efc) + Context(Configuration conf, final int minSplits, ExternalFooterCachesByConf efc, BitSet includedBuckets) throws IOException { this.conf = conf; this.forceThreadpool = HiveConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST); this.sarg = ConvertAstToSearchArg.createFromConf(conf); + this.includedBuckets = includedBuckets; minSize = HiveConf.getLongVar(conf, ConfVars.MAPREDMINSPLITSIZE, DEFAULT_MIN_SPLIT_SIZE); maxSize = HiveConf.getLongVar(conf, ConfVars.MAPREDMAXSPLITSIZE, DEFAULT_MAX_SPLIT_SIZE); String ss = conf.get(ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname); @@ -1290,7 +1293,25 @@ private AcidDirInfo callInternal() throws IOException { // should be considered as usual. parsedDeltas.addAll(dirInfo.getCurrentDirectories()); } - return new AcidDirInfo(fs, dir, dirInfo, baseFiles, parsedDeltas); + return new AcidDirInfo(fs, dir, dirInfo, pruneBuckets(baseFiles), parsedDeltas); + } + + private List pruneBuckets(List baseFiles) { + if (context.includedBuckets == null) return baseFiles; + + BitSet buckets = context.includedBuckets; + String bucketIn = buckets.toString(); + List filteredFileInfos = new ArrayList<>(); + for (AcidBaseFileInfo fileInfo : baseFiles) { + int bucket = Utilities.getBucketIdFromFile(fileInfo.getHdfsFileStatusWithId().getFileStatus().getPath().getName()); + if (bucket < 0 || buckets.get(bucket)) { + // match or UNKNOWN + filteredFileInfos.add(fileInfo); + } else { + LOG.info("Pruning with IN ({}) - removing {}", bucketIn, fileInfo.getHdfsFileStatusWithId().getFileStatus().getPath()); + } + } + return filteredFileInfos; } private List findBaseFiles( @@ -1776,7 +1797,7 @@ private long computeProjectionSize(List fileTypes, // complete path futures and schedule split generation try { CombinedCtx combinedCtx = (context.splitStrategyBatchMs > 0) ? new CombinedCtx() : null; - long maxWaitUs = context.splitStrategyBatchMs * 1000000; + long maxWaitUs = context.splitStrategyBatchMs * 1000000L; int resultsLeft = paths.length; while (resultsLeft > 0) { AcidDirInfo adi = null; @@ -1932,12 +1953,13 @@ private static void scheduleSplits(ETLSplitStrategy splitStrategy, Context conte LOG.debug("getSplits started"); } Configuration conf = job; + MapWork work = Utilities.getMapWork(job); if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_ORC_MS_FOOTER_CACHE_ENABLED)) { // Create HiveConf once, since this is expensive. conf = new HiveConf(conf, OrcInputFormat.class); } List result = generateSplitsInfo(conf, - new Context(conf, numSplits, createExternalCaches())); + new Context(conf, numSplits, createExternalCaches(), work != null ? work.getIncludedBuckets() : null)); if (LOG.isDebugEnabled()) { LOG.debug("getSplits finished"); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java index 5669ddc7ae..90b9031874 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java @@ -142,6 +142,6 @@ private Context createContext(Configuration conf, int numSplits) throws IOExcept // Create HiveConf once, since this is expensive. conf = new HiveConf(conf, OrcInputFormat.class); } - return new Context(conf, numSplits, null); + return new Context(conf, numSplits); } } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index 326c7f65bf..171b678b33 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -2714,7 +2714,7 @@ public void testDoAs() throws Exception { ugi.doAs(new PrivilegedExceptionAction() { @Override public Void run() throws Exception { - OrcInputFormat.generateSplitsInfo(conf, new Context(conf, -1, null)); + OrcInputFormat.generateSplitsInfo(conf, new Context(conf, -1)); return null; } }); @@ -2733,7 +2733,7 @@ public Void run() throws Exception { } assertEquals(1, OrcInputFormat.Context.getCurrentThreadPoolSize()); FileInputFormat.setInputPaths(conf, "mock:/ugi/2"); - List splits = OrcInputFormat.generateSplitsInfo(conf, new Context(conf, -1, null)); + List splits = OrcInputFormat.generateSplitsInfo(conf, new Context(conf, -1)); assertEquals(1, splits.size()); } finally { MockFileSystem.clearGlobalFiles();