diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index cd2a668..e2bfc24 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -881,7 +881,6 @@ private void runGetSplitsSync(List>> splitFutures, private final boolean isOriginal; private final List deltas; private final FileSystem fs; - private final Context context; private final Path dir; private final boolean allowSyntheticFileIds; @@ -889,7 +888,6 @@ public BISplitStrategy(Context context, FileSystem fs, Path dir, List fileStatuses, boolean isOriginal, List deltas, boolean[] covered, boolean allowSyntheticFileIds) { super(dir, context.numBuckets, deltas, covered); - this.context = context; this.fileStatuses = fileStatuses; this.isOriginal = isOriginal; this.deltas = deltas; @@ -904,15 +902,25 @@ public BISplitStrategy(Context context, FileSystem fs, for (HdfsFileStatusWithId file : fileStatuses) { FileStatus fileStatus = file.getFileStatus(); if (fileStatus.getLen() != 0) { - String[] hosts = SHIMS.getLocationsWithOffset(fs, fileStatus).firstEntry().getValue() - .getHosts(); + TreeMap blockOffsets = SHIMS.getLocationsWithOffset(fs, fileStatus); + String[] hosts = blockOffsets.firstEntry().getValue().getHosts(); Object fileKey = file.getFileId(); if (fileKey == null && allowSyntheticFileIds) { fileKey = new SyntheticFileId(fileStatus); } - OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), fileKey, 0, - fileStatus.getLen(), hosts, null, isOriginal, true, deltas, -1); - splits.add(orcSplit); + final long blockSize = fileStatus.getBlockSize(); + final long fileSize = fileStatus.getLen(); + if (fileSize > blockSize) { + for (Map.Entry entry : blockOffsets.entrySet()) { + OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), fileKey, entry.getKey(), + entry.getValue().getLength(), hosts, null, isOriginal, true, deltas, -1); + splits.add(orcSplit); + } + } else { + OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), fileKey, 0, + fileStatus.getLen(), hosts, null, isOriginal, true, deltas, -1); + splits.add(orcSplit); + } } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index f824e18..72364ca 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -555,6 +555,101 @@ public void testFileGenerator() throws Exception { } @Test + public void testBIStrategySplitBlockBoundary() throws Exception { + conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI"); + OrcInputFormat.Context context = new OrcInputFormat.Context(conf); + MockFileSystem fs = new MockFileSystem(conf, + new MockFile("mock:/a/b/part-00", 1000, new byte[1], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-01", 1000, new byte[1], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-02", 1000, new byte[1], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-03", 1000, new byte[1], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-04", 1000, new byte[1], new MockBlock("host1", "host2"))); + OrcInputFormat.FileGenerator gen = + new OrcInputFormat.FileGenerator(context, fs, + new MockPath(fs, "mock:/a/b"), false, null); + OrcInputFormat.SplitStrategy splitStrategy = createSplitStrategy(context, gen); + assertEquals(true, splitStrategy instanceof OrcInputFormat.BISplitStrategy); + List splits = splitStrategy.getSplits(); + int numSplits = splits.size(); + assertEquals(5, numSplits); + + context = new OrcInputFormat.Context(conf); + fs = new MockFileSystem(conf, + new MockFile("mock:/a/b/part-00", 1000, new byte[1000], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-01", 1000, new byte[1000], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-02", 1000, new byte[1000], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-03", 1000, new byte[1000], new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-04", 1000, new byte[1000], new MockBlock("host1", "host2"))); + gen = new OrcInputFormat.FileGenerator(context, fs, + new MockPath(fs, "mock:/a/b"), false, null); + splitStrategy = createSplitStrategy(context, gen); + assertEquals(true, splitStrategy instanceof OrcInputFormat.BISplitStrategy); + splits = splitStrategy.getSplits(); + numSplits = splits.size(); + assertEquals(5, numSplits); + + context = new OrcInputFormat.Context(conf); + fs = new MockFileSystem(conf, + new MockFile("mock:/a/b/part-00", 1000, new byte[1100], new MockBlock("host1", "host2"), + new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-01", 1000, new byte[1100], new MockBlock("host1", "host2"), + new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-02", 1000, new byte[1100], new MockBlock("host1", "host2"), + new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-03", 1000, new byte[1100], new MockBlock("host1", "host2"), + new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-04", 1000, new byte[1100], new MockBlock("host1", "host2"), + new MockBlock("host1", "host2"))); + gen = new OrcInputFormat.FileGenerator(context, fs, + new MockPath(fs, "mock:/a/b"), false, null); + splitStrategy = createSplitStrategy(context, gen); + assertEquals(true, splitStrategy instanceof OrcInputFormat.BISplitStrategy); + splits = splitStrategy.getSplits(); + numSplits = splits.size(); + assertEquals(10, numSplits); + + context = new OrcInputFormat.Context(conf); + fs = new MockFileSystem(conf, + new MockFile("mock:/a/b/part-00", 1000, new byte[2000], new MockBlock("host1", "host2"), + new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-01", 1000, new byte[2000], new MockBlock("host1", "host2"), + new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-02", 1000, new byte[2000], new MockBlock("host1", "host2"), + new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-03", 1000, new byte[2000], new MockBlock("host1", "host2"), + new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-04", 1000, new byte[2000], new MockBlock("host1", "host2"), + new MockBlock("host1", "host2"))); + gen = new OrcInputFormat.FileGenerator(context, fs, + new MockPath(fs, "mock:/a/b"), false, null); + splitStrategy = createSplitStrategy(context, gen); + assertEquals(true, splitStrategy instanceof OrcInputFormat.BISplitStrategy); + splits = splitStrategy.getSplits(); + numSplits = splits.size(); + assertEquals(10, numSplits); + + context = new OrcInputFormat.Context(conf); + fs = new MockFileSystem(conf, + new MockFile("mock:/a/b/part-00", 1000, new byte[2200], new MockBlock("host1", "host2"), + new MockBlock("host1", "host2"), new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-01", 1000, new byte[2200], new MockBlock("host1", "host2"), + new MockBlock("host1", "host2"), new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-02", 1000, new byte[2200], new MockBlock("host1", "host2"), + new MockBlock("host1", "host2"), new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-03", 1000, new byte[2200], new MockBlock("host1", "host2"), + new MockBlock("host1", "host2"), new MockBlock("host1", "host2")), + new MockFile("mock:/a/b/part-04", 1000, new byte[2200], new MockBlock("host1", "host2"), + new MockBlock("host1", "host2"), new MockBlock("host1", "host2"))); + gen = new OrcInputFormat.FileGenerator(context, fs, + new MockPath(fs, "mock:/a/b"), false, null); + splitStrategy = createSplitStrategy(context, gen); + assertEquals(true, splitStrategy instanceof OrcInputFormat.BISplitStrategy); + splits = splitStrategy.getSplits(); + numSplits = splits.size(); + assertEquals(15, numSplits); + } + + @Test public void testEtlCombinedStrategy() throws Exception { conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "ETL"); conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_DIRECTORY_BATCH_MS.varname, "1000000");