diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index cd2a668..eb1dc4a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -878,6 +878,7 @@ private void runGetSplitsSync(List>> splitFutures, */ static final class BISplitStrategy extends ACIDSplitStrategy { private final List fileStatuses; + private static final float BLOCK_SLOP = 1.1f; // 10% private final boolean isOriginal; private final List deltas; private final FileSystem fs; @@ -910,9 +911,35 @@ public BISplitStrategy(Context context, FileSystem fs, if (fileKey == null && allowSyntheticFileIds) { fileKey = new SyntheticFileId(fileStatus); } - OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), fileKey, 0, - fileStatus.getLen(), hosts, null, isOriginal, true, deltas, -1); - splits.add(orcSplit); + final long blockSize = fileStatus.getBlockSize(); + final long fileSize = fileStatus.getLen(); + + // consider 10% slop for last strip, metadata or footers that may overshoot block boundary + if (fileSize > (blockSize * BLOCK_SLOP)) { + long startOffset = 0; + long remaining = fileSize; + long endOffset = Math.min(startOffset + blockSize, fileSize); + long length = endOffset - startOffset; + + while (startOffset < fileSize && remaining > 0) { + OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), fileKey, startOffset, + length, hosts, null, isOriginal, true, deltas, -1); + splits.add(orcSplit); + remaining = remaining - length; + startOffset = endOffset; + endOffset = Math.min(startOffset + blockSize, fileSize); + + // if last block overshoots (within 10%) then consider it part of previous block + if (remaining <= (blockSize * BLOCK_SLOP)) { + endOffset = fileSize; + } + length = endOffset - startOffset; + } + } else { + OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), fileKey, 0, + fileStatus.getLen(), hosts, null, isOriginal, true, deltas, -1); + splits.add(orcSplit); + } } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index f824e18..3f9a3bb 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -555,6 +555,131 @@ public void testFileGenerator() throws Exception { } @Test + public void testBIStrategySplitBlockBoundary() throws Exception { + conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI"); + OrcInputFormat.Context context = new OrcInputFormat.Context(conf); + MockFileSystem fs = new MockFileSystem(conf, + new MockFile("mock:/a/b/part-00", 1000, new byte[1], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-01", 1000, new byte[1], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-02", 1000, new byte[1], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-03", 1000, new byte[1], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-04", 1000, new byte[1], new MockBlock("host1", "host2"))); + OrcInputFormat.FileGenerator gen = + new OrcInputFormat.FileGenerator(context, fs, + new MockPath(fs, "mock:/a/b"), false, null); + OrcInputFormat.SplitStrategy splitStrategy = createSplitStrategy(context, gen); + assertEquals(true, splitStrategy instanceof OrcInputFormat.BISplitStrategy); + List splits = splitStrategy.getSplits(); + int numSplits = splits.size(); + assertEquals(5, numSplits); + + context = new OrcInputFormat.Context(conf); + fs = new MockFileSystem(conf, + new MockFile("mock:/a/b/part-00", 1000, new byte[1000], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-01", 1000, new byte[1000], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-02", 1000, new byte[1000], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-03", 1000, new byte[1000], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-04", 1000, new byte[1000], new MockBlock("host1", "host2"))); + gen = new OrcInputFormat.FileGenerator(context, fs, + new MockPath(fs, "mock:/a/b"), false, null); + splitStrategy = createSplitStrategy(context, gen); + assertEquals(true, splitStrategy instanceof OrcInputFormat.BISplitStrategy); + splits = splitStrategy.getSplits(); + numSplits = splits.size(); + assertEquals(5, numSplits); + + context = new OrcInputFormat.Context(conf); + fs = new MockFileSystem(conf, + new MockFile("mock:/a/b/part-00", 1000, new byte[1100], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-01", 1000, new byte[1100], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-02", 1000, new byte[1100], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-03", 1000, new byte[1100], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-04", 1000, new byte[1100], new MockBlock("host1", "host2"))); + gen = new OrcInputFormat.FileGenerator(context, fs, + new MockPath(fs, "mock:/a/b"), false, null); + splitStrategy = createSplitStrategy(context, gen); + assertEquals(true, splitStrategy instanceof OrcInputFormat.BISplitStrategy); + splits = splitStrategy.getSplits(); + numSplits = splits.size(); + assertEquals(5, numSplits); + + context = new OrcInputFormat.Context(conf); + fs = new MockFileSystem(conf, + new MockFile("mock:/a/b/part-00", 1000, new byte[1101], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-01", 1000, new byte[1101], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-02", 1000, new byte[1101], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-03", 1000, new byte[1101], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-04", 1000, new byte[1101], new MockBlock("host1", "host2"))); + gen = new OrcInputFormat.FileGenerator(context, fs, + new MockPath(fs, "mock:/a/b"), false, null); + splitStrategy = createSplitStrategy(context, gen); + assertEquals(true, splitStrategy instanceof OrcInputFormat.BISplitStrategy); + splits = splitStrategy.getSplits(); + numSplits = splits.size(); + assertEquals(10, numSplits); + + context = new OrcInputFormat.Context(conf); + fs = new MockFileSystem(conf, + new MockFile("mock:/a/b/part-00", 1000, new byte[2000], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-01", 1000, new byte[2000], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-02", 1000, new byte[2000], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-03", 1000, new byte[2000], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-04", 1000, new byte[2000], new MockBlock("host1", "host2"))); + gen = new OrcInputFormat.FileGenerator(context, fs, + new MockPath(fs, "mock:/a/b"), false, null); + splitStrategy = createSplitStrategy(context, gen); + assertEquals(true, splitStrategy instanceof OrcInputFormat.BISplitStrategy); + splits = splitStrategy.getSplits(); + numSplits = splits.size(); + assertEquals(10, numSplits); + + context = new OrcInputFormat.Context(conf); + fs = new MockFileSystem(conf, + new MockFile("mock:/a/b/part-00", 1000, new byte[2100], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-01", 1000, new byte[2100], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-02", 1000, new byte[2100], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-03", 1000, new byte[2100], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-04", 1000, new byte[2100], new MockBlock("host1", "host2"))); + gen = new OrcInputFormat.FileGenerator(context, fs, + new MockPath(fs, "mock:/a/b"), false, null); + splitStrategy = createSplitStrategy(context, gen); + assertEquals(true, splitStrategy instanceof OrcInputFormat.BISplitStrategy); + splits = splitStrategy.getSplits(); + numSplits = splits.size(); + assertEquals(10, numSplits); + + context = new OrcInputFormat.Context(conf); + fs = new MockFileSystem(conf, + new MockFile("mock:/a/b/part-00", 1000, new byte[2101], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-01", 1000, new byte[2101], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-02", 1000, new byte[2101], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-03", 1000, new byte[2101], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-04", 1000, new byte[2101], new MockBlock("host1", "host2"))); + gen = new OrcInputFormat.FileGenerator(context, fs, + new MockPath(fs, "mock:/a/b"), false, null); + splitStrategy = createSplitStrategy(context, gen); + assertEquals(true, splitStrategy instanceof OrcInputFormat.BISplitStrategy); + splits = splitStrategy.getSplits(); + numSplits = splits.size(); + assertEquals(15, numSplits); + + context = new OrcInputFormat.Context(conf); + fs = new MockFileSystem(conf, + new MockFile("mock:/a/b/part-00", 1000, new byte[2200], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-01", 1000, new byte[2200], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-02", 1000, new byte[2200], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-03", 1000, new byte[2200], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-04", 1000, new byte[2200], new MockBlock("host1", "host2"))); + gen = new OrcInputFormat.FileGenerator(context, fs, + new MockPath(fs, "mock:/a/b"), false, null); + splitStrategy = createSplitStrategy(context, gen); + assertEquals(true, splitStrategy instanceof OrcInputFormat.BISplitStrategy); + splits = splitStrategy.getSplits(); + numSplits = splits.size(); + assertEquals(15, numSplits); + } + + @Test public void testEtlCombinedStrategy() throws Exception { conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "ETL"); conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_DIRECTORY_BATCH_MS.varname, "1000000");