diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 5d6c9da..11b21bd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -374,6 +374,7 @@ public boolean validateInput(FileSystem fs, HiveConf conf, private final int numBuckets; private final long maxSize; private final long minSize; + private final int minSplits; private final boolean footerInSplits; private final boolean cacheStripeDetails; private final AtomicInteger cacheHitCounter = new AtomicInteger(0); @@ -381,7 +382,7 @@ public boolean validateInput(FileSystem fs, HiveConf conf, private ValidTxnList transactionList; private SplitStrategyKind splitStrategyKind; - Context(Configuration conf) { + Context(Configuration conf, final int minSplits) { this.conf = conf; minSize = conf.getLong(MIN_SPLIT_SIZE, DEFAULT_MIN_SPLIT_SIZE); maxSize = conf.getLong(MAX_SPLIT_SIZE, DEFAULT_MAX_SPLIT_SIZE); @@ -404,6 +405,8 @@ public boolean validateInput(FileSystem fs, HiveConf conf, cacheStripeDetails = (cacheStripeDetailsSize > 0); + this.minSplits = Math.min(cacheStripeDetailsSize, minSplits); + synchronized (Context.class) { if (threadPool == null) { threadPool = Executors.newFixedThreadPool(numThreads, @@ -681,7 +684,7 @@ public SplitStrategy call() throws IOException { break; default: // HYBRID strategy - if (avgFileSize > context.maxSize) { + if (avgFileSize > context.maxSize || numFiles < context.minSplits) { splitStrategy = new ETLSplitStrategy(context, fs, dir, children, isOriginal, deltas, covered); } else { @@ -983,8 +986,13 @@ private boolean isStripeSatisfyPredicate(StripeStatistics stripeStatistics, static List generateSplitsInfo(Configuration conf) throws IOException { + return generateSplitsInfo(conf, -1); + } + + static List generateSplitsInfo(Configuration conf, int numSplits) + throws IOException { // use threads to resolve directories into splits - Context context = new Context(conf); + Context context = new Context(conf, numSplits); List splits = Lists.newArrayList(); List> pathFutures = Lists.newArrayList(); List> splitFutures = Lists.newArrayList(); @@ -1049,7 +1057,7 @@ private static void cancelFutures(List> futures) { public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ORC_GET_SPLITS); - List result = generateSplitsInfo(job); + List result = generateSplitsInfo(job, numSplits); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ORC_GET_SPLITS); return result.toArray(new InputSplit[result.size()]); } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index 0246cd5..a1635ec 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -396,7 +396,7 @@ public void testGetInputPaths() throws Exception { @Test public void testFileGenerator() throws Exception { - OrcInputFormat.Context context = new OrcInputFormat.Context(conf); + OrcInputFormat.Context context = new OrcInputFormat.Context(conf, -1); MockFileSystem fs = new MockFileSystem(conf, new MockFile("mock:/a/b/part-00", 1000, new byte[0]), new MockFile("mock:/a/b/part-01", 1000, new byte[0]), @@ -409,8 +409,21 @@ public void testFileGenerator() throws Exception { SplitStrategy splitStrategy = gen.call(); assertEquals(true, splitStrategy instanceof OrcInputFormat.BISplitStrategy); + /* file count conditions of hybrid */ + context = new OrcInputFormat.Context(conf, 3); + gen = new OrcInputFormat.FileGenerator(context, fs, new MockPath(fs, + "mock:/a/b")); + splitStrategy = gen.call(); + assertEquals(true, splitStrategy instanceof OrcInputFormat.BISplitStrategy); + + context = new OrcInputFormat.Context(conf, 10); + gen = new OrcInputFormat.FileGenerator(context, fs, new MockPath(fs, + "mock:/a/b")); + splitStrategy = gen.call(); + assertEquals(true, splitStrategy instanceof OrcInputFormat.ETLSplitStrategy); + conf.set("mapreduce.input.fileinputformat.split.maxsize", "500"); - context = new OrcInputFormat.Context(conf); + context = new OrcInputFormat.Context(conf, -1); fs = new MockFileSystem(conf, new MockFile("mock:/a/b/part-00", 1000, new byte[1000]), new MockFile("mock:/a/b/part-01", 1000, new byte[1000]), @@ -834,7 +847,7 @@ public void testAddSplit() throws Exception { new MockBlock("host0", "host3-2", "host3-3"), new MockBlock("host4-1", "host4-2", "host4-3"), new MockBlock("host5-1", "host5-2", "host5-3"))); - OrcInputFormat.Context context = new OrcInputFormat.Context(conf); + OrcInputFormat.Context context = new OrcInputFormat.Context(conf, -1); OrcInputFormat.SplitGenerator splitter = new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs, fs.getFileStatus(new Path("/a/file")), null, true, @@ -875,7 +888,7 @@ public void testSplitGenerator() throws Exception { new MockBlock("host5-1", "host5-2", "host5-3"))); conf.setInt(OrcInputFormat.MAX_SPLIT_SIZE, 300); conf.setInt(OrcInputFormat.MIN_SPLIT_SIZE, 200); - OrcInputFormat.Context context = new OrcInputFormat.Context(conf); + OrcInputFormat.Context context = new OrcInputFormat.Context(conf, -1); OrcInputFormat.SplitGenerator splitter = new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs, fs.getFileStatus(new Path("/a/file")), null, true, @@ -899,7 +912,7 @@ public void testSplitGenerator() throws Exception { // test min = 0, max = 0 generates each stripe conf.setInt(OrcInputFormat.MIN_SPLIT_SIZE, 0); conf.setInt(OrcInputFormat.MAX_SPLIT_SIZE, 0); - context = new OrcInputFormat.Context(conf); + context = new OrcInputFormat.Context(conf, -1); splitter = new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs, fs.getFileStatus(new Path("/a/file")), null, true, new ArrayList(), true, null, null));