diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 647e7c8..0813033 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -1226,7 +1226,8 @@ private AcidDirInfo callInternal() throws IOException { private final boolean hasBase; private OrcFile.WriterVersion writerVersion; private long projColsUncompressedSize; - private final List deltaSplits; + private List deltaSplits; + private final SplitInfo splitInfo; private final ByteBuffer ppdResult; private final UserGroupInformation ugi; private final boolean allowSyntheticFileIds; @@ -1249,6 +1250,7 @@ public SplitGenerator(SplitInfo splitInfo, UserGroupInformation ugi, this.hasBase = splitInfo.hasBase; this.projColsUncompressedSize = -1; this.deltaSplits = splitInfo.getSplits(); + this.splitInfo = splitInfo; this.allowSyntheticFileIds = allowSyntheticFileIds; this.ppdResult = splitInfo.ppdResult; } @@ -1423,6 +1425,7 @@ public String toString() { stripeStats, stripes.size(), file.getPath(), evolution); } } + return generateSplitsFromStripes(includeStripe); } } @@ -1455,31 +1458,44 @@ public String toString() { private List generateSplitsFromStripes(boolean[] includeStripe) throws IOException { List splits = new ArrayList<>(stripes.size()); - // if we didn't have predicate pushdown, read everything - if (includeStripe == null) { - includeStripe = new boolean[stripes.size()]; - Arrays.fill(includeStripe, true); - } - OffsetAndLength current = new OffsetAndLength(); - int idx = -1; - for (StripeInformation stripe : stripes) { - idx++; - - if (!includeStripe[idx]) { - // create split for the previous unfinished stripe - if (current.offset != -1) { - splits.add(createSplit(current.offset, current.length, orcTail)); - current.offset = -1; - } - continue; + // after major compaction, base files may become empty base files. Following sequence is an example + // 1) insert some rows + // 2) delete all rows + // 3) major compaction + // 4) insert some rows + // In such cases, consider base files without any stripes as uncovered delta + if (stripes == null || stripes.isEmpty()) { + AcidOutputFormat.Options options = AcidUtils.parseBaseOrDeltaBucketFilename(file.getPath(), context.conf); + int bucket = options.getBucket(); + splitInfo.covered[bucket] = false; + deltaSplits = splitInfo.getSplits(); + } else { + // if we didn't have predicate pushdown, read everything + if (includeStripe == null) { + includeStripe = new boolean[stripes.size()]; + Arrays.fill(includeStripe, true); } - current = generateOrUpdateSplit( + OffsetAndLength current = new OffsetAndLength(); + int idx = -1; + for (StripeInformation stripe : stripes) { + idx++; + + if (!includeStripe[idx]) { + // create split for the previous unfinished stripe + if (current.offset != -1) { + splits.add(createSplit(current.offset, current.length, orcTail)); + current.offset = -1; + } + continue; + } + + current = generateOrUpdateSplit( splits, current, stripe.getOffset(), stripe.getLength(), orcTail); + } + generateLastSplit(splits, current, orcTail); } - generateLastSplit(splits, current, orcTail); - // Add uncovered ACID delta splits. splits.addAll(deltaSplits); return splits; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 6726273..267de21 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -319,6 +319,79 @@ public void testOriginalFileReaderWhenNonAcidConvertedToAcid() throws Exception resultData = new int[][] {{3,8}, {5,6}, {9,20}}; Assert.assertEquals(stringifyValues(resultData), rs); } + + @Test + public void testBICompactedNoStripes() throws Exception { + hiveConf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI"); + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)"); + List rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + int[][] resultData = new int[][] {{1,2}}; + Assert.assertEquals(stringifyValues(resultData), rs); + runStatementOnDriver("delete from " + Table.ACIDTBL); + rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals(0, rs.size()); + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(3,4)"); + rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + resultData = new int[][] {{3,4}}; + Assert.assertEquals(stringifyValues(resultData), rs); + runStatementOnDriver("delete from " + Table.ACIDTBL); + rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals(0, rs.size()); + + runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'"); + runWorker(hiveConf); + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); + ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize()); + Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState()); + Assert.assertTrue(resp.getCompacts().get(0).getHadoopJobId().startsWith("job_local")); + + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(3,4)"); + rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + resultData = new int[][] {{3,4}}; + Assert.assertEquals(stringifyValues(resultData), rs); + runStatementOnDriver("delete from " + Table.ACIDTBL); + rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals(0, rs.size()); + hiveConf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "HYBRID"); + } + + @Test + public void testETLCompactedNoStripes() throws Exception { + hiveConf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "ETL"); + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)"); + List rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + int[][] resultData = new int[][] {{1,2}}; + Assert.assertEquals(stringifyValues(resultData), rs); + runStatementOnDriver("delete from " + Table.ACIDTBL); + rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals(0, rs.size()); + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(3,4)"); + rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + resultData = new int[][] {{3,4}}; + Assert.assertEquals(stringifyValues(resultData), rs); + runStatementOnDriver("delete from " + Table.ACIDTBL); + rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals(0, rs.size()); + + runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'"); + runWorker(hiveConf); + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); + ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize()); + Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState()); + Assert.assertTrue(resp.getCompacts().get(0).getHadoopJobId().startsWith("job_local")); + + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(3,4)"); + rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + resultData = new int[][] {{3,4}}; + Assert.assertEquals(stringifyValues(resultData), rs); + runStatementOnDriver("delete from " + Table.ACIDTBL); + rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals(0, rs.size()); + hiveConf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "HYBRID"); + } + /** * see HIVE-16177 * See also {@link TestTxnCommands#testNonAcidToAcidConversion01()}