diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 5d6c9da..62e6de7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -374,6 +374,7 @@ public boolean validateInput(FileSystem fs, HiveConf conf, private final int numBuckets; private final long maxSize; private final long minSize; + private final int minSplits; private final boolean footerInSplits; private final boolean cacheStripeDetails; private final AtomicInteger cacheHitCounter = new AtomicInteger(0); @@ -382,6 +383,10 @@ public boolean validateInput(FileSystem fs, HiveConf conf, private SplitStrategyKind splitStrategyKind; Context(Configuration conf) { + this(conf, 1); + } + + Context(Configuration conf, final int minSplits) { this.conf = conf; minSize = conf.getLong(MIN_SPLIT_SIZE, DEFAULT_MIN_SPLIT_SIZE); maxSize = conf.getLong(MAX_SPLIT_SIZE, DEFAULT_MAX_SPLIT_SIZE); @@ -404,6 +409,8 @@ public boolean validateInput(FileSystem fs, HiveConf conf, cacheStripeDetails = (cacheStripeDetailsSize > 0); + this.minSplits = Math.min(cacheStripeDetailsSize, minSplits); + synchronized (Context.class) { if (threadPool == null) { threadPool = Executors.newFixedThreadPool(numThreads, @@ -681,7 +688,7 @@ public SplitStrategy call() throws IOException { break; default: // HYBRID strategy - if (avgFileSize > context.maxSize) { + if (avgFileSize > context.maxSize || numFiles <= context.minSplits) { splitStrategy = new ETLSplitStrategy(context, fs, dir, children, isOriginal, deltas, covered); } else { @@ -983,8 +990,13 @@ private boolean isStripeSatisfyPredicate(StripeStatistics stripeStatistics, static List generateSplitsInfo(Configuration conf) throws IOException { + return generateSplitsInfo(conf, -1); + } + + static List generateSplitsInfo(Configuration conf, int numSplits) + throws IOException { // use threads to resolve directories into splits - Context context = new Context(conf); + Context context = new Context(conf, numSplits); List splits = Lists.newArrayList(); List> pathFutures = Lists.newArrayList(); List> splitFutures = Lists.newArrayList(); @@ -1049,7 +1061,7 @@ private static void cancelFutures(List> futures) { public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ORC_GET_SPLITS); - List result = generateSplitsInfo(job); + List result = generateSplitsInfo(job, numSplits); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ORC_GET_SPLITS); return result.toArray(new InputSplit[result.size()]); } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index 0246cd5..12ae902 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -23,8 +23,11 @@ import java.io.DataInput; import java.io.DataOutput; +import java.io.File; import java.io.FileNotFoundException; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.PrintWriter; import java.net.URI; import java.net.URISyntaxException; import java.sql.Date; @@ -67,6 +70,7 @@ import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.io.InputFormatChecker; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.SplitStrategy; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.SplitStrategyKind; import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; @@ -394,6 +398,97 @@ public void testGetInputPaths() throws Exception { OrcInputFormat.getInputPaths(conf)); } + private FileSystem generateMockFiles(final int count, final int size) { + final byte[] data = new byte[size]; + MockFile[] files = new MockFile[count]; + for (int i = 0; i < count; i++) { + files[i] = new MockFile(String.format("mock:/a/b/part-%d", i), size, data); + } + return new MockFileSystem(conf, files); + } + + @Test + public void testSplitStrategySelection() throws Exception { + + conf.set("mapreduce.input.fileinputformat.split.maxsize", "500"); + conf.setLong(HiveConf.ConfVars.HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE.varname, + 100); + final int[] counts = { 1, 10, 100, 256 }; + final int[] sizes = { 100, 1000 }; + final int[] numSplits = { 1, 9, 10, 11, 99, 111 }; + final String[] strategyResults = new String[] { + "ETLSplitStrategy", /* 1 files x 100 size for 1 splits */ + "ETLSplitStrategy", /* 1 files x 100 size for 9 splits */ + "ETLSplitStrategy", /* 1 files x 100 size for 10 splits */ + "ETLSplitStrategy", /* 1 files x 100 size for 11 splits */ + "ETLSplitStrategy", /* 1 files x 100 size for 99 splits */ + "ETLSplitStrategy", /* 1 files x 100 size for 111 splits */ + "ETLSplitStrategy", /* 1 files x 1000 size for 1 splits */ + "ETLSplitStrategy", /* 1 files x 1000 size for 9 splits */ + "ETLSplitStrategy", /* 1 files x 1000 size for 10 splits */ + "ETLSplitStrategy", /* 1 files x 1000 size for 11 splits */ + "ETLSplitStrategy", /* 1 files x 1000 size for 99 splits */ + "ETLSplitStrategy", /* 1 files x 1000 size for 111 splits */ + "BISplitStrategy", /* 10 files x 100 size for 1 splits */ + "BISplitStrategy", /* 10 files x 100 size for 9 splits */ + "ETLSplitStrategy", /* 10 files x 100 size for 10 splits */ + "ETLSplitStrategy", /* 10 files x 100 size for 11 splits */ + "ETLSplitStrategy", /* 10 files x 100 size for 99 splits */ + "ETLSplitStrategy", /* 10 files x 100 size for 111 splits */ + "ETLSplitStrategy", /* 10 files x 1000 size for 1 splits */ + "ETLSplitStrategy", /* 10 files x 1000 size for 9 splits */ + "ETLSplitStrategy", /* 10 files x 1000 size for 10 splits */ + "ETLSplitStrategy", /* 10 files x 1000 size for 11 splits */ + "ETLSplitStrategy", /* 10 files x 1000 size for 99 splits */ + "ETLSplitStrategy", /* 10 files x 1000 size for 111 splits */ + "BISplitStrategy", /* 100 files x 100 size for 1 splits */ + "BISplitStrategy", /* 100 files x 100 size for 9 splits */ + "BISplitStrategy", /* 100 files x 100 size for 10 splits */ + "BISplitStrategy", /* 100 files x 100 size for 11 splits */ + "BISplitStrategy", /* 100 files x 100 size for 99 splits */ + "ETLSplitStrategy", /* 100 files x 100 size for 111 splits */ + "ETLSplitStrategy", /* 100 files x 1000 size for 1 splits */ + "ETLSplitStrategy", /* 100 files x 1000 size for 9 splits */ + "ETLSplitStrategy", /* 100 files x 1000 size for 10 splits */ + "ETLSplitStrategy", /* 100 files x 1000 size for 11 splits */ + "ETLSplitStrategy", /* 100 files x 1000 size for 99 splits */ + "ETLSplitStrategy", /* 100 files x 1000 size for 111 splits */ + "BISplitStrategy", /* 256 files x 100 size for 1 splits */ + "BISplitStrategy", /* 256 files x 100 size for 9 splits */ + "BISplitStrategy", /* 256 files x 100 size for 10 splits */ + "BISplitStrategy", /* 256 files x 100 size for 11 splits */ + "BISplitStrategy", /* 256 files x 100 size for 99 splits */ + "BISplitStrategy", /* 256 files x 100 size for 111 splits */ + "ETLSplitStrategy", /* 256 files x 1000 size for 1 splits */ + "ETLSplitStrategy", /* 256 files x 1000 size for 9 splits */ + "ETLSplitStrategy", /* 256 files x 1000 size for 10 splits */ + "ETLSplitStrategy", /* 256 files x 1000 size for 11 splits */ + "ETLSplitStrategy", /* 256 files x 1000 size for 99 splits */ + "ETLSplitStrategy", /* 256 files x 1000 size for 111 splits */ + }; + + int k = 0; + + for (int c : counts) { + for (int s : sizes) { + final FileSystem fs = generateMockFiles(c, s); + for (int n : numSplits) { + final OrcInputFormat.Context context = new OrcInputFormat.Context( + conf, n); + OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator( + context, fs, new MockPath(fs, "mock:/a/b")); + final SplitStrategy splitStrategy = gen.call(); + assertTrue( + String.format( + "Split strategy for %d files x %d size for %d splits", c, s, + n), + splitStrategy.getClass().getSimpleName() + .equals(strategyResults[k++])); + } + } + } + } + @Test public void testFileGenerator() throws Exception { OrcInputFormat.Context context = new OrcInputFormat.Context(conf); @@ -1115,7 +1210,7 @@ public void testEmptyFile() throws Exception { InputFormat in = new OrcInputFormat(); FileInputFormat.setInputPaths(conf, testFilePath.toString()); InputSplit[] splits = in.getSplits(conf, 1); - assertTrue(1 == splits.length); + assertTrue(0 == splits.length); assertEquals(null, serde.getSerDeStats()); }