diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java index 2f63524..e1d2395 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java @@ -155,6 +155,8 @@ public Reporter getReporter() { public static interface RawReader extends RecordReader { public ObjectInspector getObjectInspector(); + + public boolean isDelete(V value); } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 74ea23b..23e92a2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -651,6 +651,11 @@ public ObjectInspector getObjectInspector() { (OrcStruct.createObjectInspector(rowType)); } + @Override + public boolean isDelete(OrcStruct value) { + return OrcRecordUpdater.getOperation(value) == OrcRecordUpdater.DELETE_OPERATION; + } + /** * Get the number of columns in the underlying rows. * @return 0 if there are no base and no deltas. diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 93d1bc0..1d3cf00 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -506,13 +506,15 @@ public void map(WritableComparable key, CompactorInputSplit split, ValidTxnList txnList = new ValidTxnListImpl(jobConf.get(ValidTxnList.VALID_TXNS_KEY)); + boolean isMajor = jobConf.getBoolean(IS_MAJOR, false); AcidInputFormat.RawReader reader = - aif.getRawReader(jobConf, jobConf.getBoolean(IS_MAJOR, false), split.getBucket(), + aif.getRawReader(jobConf, isMajor, split.getBucket(), txnList, split.getBaseDir(), split.getDeltaDirs()); RecordIdentifier identifier = reader.createKey(); V value = reader.createValue(); getWriter(reporter, reader.getObjectInspector(), split.getBucket()); while (reader.next(identifier, value)) { + if (isMajor && reader.isDelete(value)) continue; writer.write(value); reporter.progress(); } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java index 421fbda..595e003 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java @@ -56,6 +56,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; public class TestOrcRawRecordMerger { @@ -574,12 +575,14 @@ public void testNewBaseAndDelta() throws Exception { OrcRecordUpdater.getOperation(event)); assertEquals(new ReaderKey(0, BUCKET, 0, 200), id); assertEquals("update 1", getValue(event)); + assertFalse(merger.isDelete(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); assertEquals(new ReaderKey(0, BUCKET, 1, 0), id); assertEquals("second", getValue(event)); + assertFalse(merger.isDelete(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.UPDATE_OPERATION, @@ -616,6 +619,7 @@ public void testNewBaseAndDelta() throws Exception { OrcRecordUpdater.getOperation(event)); assertEquals(new ReaderKey(0, BUCKET, 7, 200), id); assertNull(OrcRecordUpdater.getRow(event)); + assertTrue(merger.isDelete(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.DELETE_OPERATION, diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java index ec1379d..2f6c299 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java @@ -332,6 +332,7 @@ public boolean validateInput(FileSystem fs, HiveConf conf, ArrayList private final Configuration conf; private FSDataInputStream is = null; private final FileSystem fs; + private boolean lastWasDelete = true; MockRawReader(Configuration conf, List files) throws IOException { filesToRead = new Stack(); @@ -346,6 +347,15 @@ public ObjectInspector getObjectInspector() { } @Override + public boolean isDelete(Text value) { + // Alternate between returning deleted and not. This is easier than actually + // tracking operations. We test that this is getting properly called by checking that only + // half the records show up in base files after major compactions. + lastWasDelete = !lastWasDelete; + return lastWasDelete; + } + + @Override public boolean next(RecordIdentifier identifier, Text text) throws IOException { if (is == null) { // Open the next file diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java index 90a722b..576df97 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java @@ -428,8 +428,8 @@ public void majorTableWithBase() throws Exception { Assert.assertEquals(2, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); - Assert.assertEquals(1248L, buckets[0].getLen()); - Assert.assertEquals(1248L, buckets[1].getLen()); + Assert.assertEquals(624L, buckets[0].getLen()); + Assert.assertEquals(624L, buckets[1].getLen()); } else { LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName()); } @@ -475,8 +475,8 @@ public void majorPartitionWithBase() throws Exception { Assert.assertEquals(2, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); - Assert.assertEquals(1248L, buckets[0].getLen()); - Assert.assertEquals(1248L, buckets[1].getLen()); + Assert.assertEquals(624L, buckets[0].getLen()); + Assert.assertEquals(624L, buckets[1].getLen()); } else { LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName()); } @@ -520,8 +520,8 @@ public void majorTableNoBase() throws Exception { Assert.assertEquals(2, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); - Assert.assertEquals(208L, buckets[0].getLen()); - Assert.assertEquals(208L, buckets[1].getLen()); + Assert.assertEquals(104L, buckets[0].getLen()); + Assert.assertEquals(104L, buckets[1].getLen()); } else { LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName()); } @@ -566,8 +566,8 @@ public void majorTableLegacy() throws Exception { Assert.assertEquals(2, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); - Assert.assertEquals(1248L, buckets[0].getLen()); - Assert.assertEquals(1248L, buckets[1].getLen()); + Assert.assertEquals(624L, buckets[0].getLen()); + Assert.assertEquals(624L, buckets[1].getLen()); } else { LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName()); } @@ -626,7 +626,7 @@ public void majorPartitionWithBaseMissingBuckets() throws Exception { addBaseFile(conf, t, p, 20L, 20, 2, false); addDeltaFile(conf, t, p, 21L, 22L, 2, 2, false); - addDeltaFile(conf, t, p, 23L, 24L, 2); + addDeltaFile(conf, t, p, 23L, 26L, 4); burnThroughTransactions(25); @@ -649,7 +649,7 @@ public void majorPartitionWithBaseMissingBuckets() throws Exception { // Find the new delta file and make sure it has the right contents boolean sawNewBase = false; for (int i = 0; i < stat.length; i++) { - if (stat[i].getPath().getName().equals("base_0000024")) { + if (stat[i].getPath().getName().equals("base_0000026")) { sawNewBase = true; FileStatus[] buckets = fs.listStatus(stat[i].getPath()); Assert.assertEquals(2, buckets.length); @@ -658,10 +658,12 @@ public void majorPartitionWithBaseMissingBuckets() throws Exception { // Bucket 0 should be small and bucket 1 should be large, make sure that's the case Assert.assertTrue( ("bucket_00000".equals(buckets[0].getPath().getName()) && 104L == buckets[0].getLen() - && "bucket_00001".equals(buckets[1].getPath().getName()) && 1248L == buckets[1] .getLen()) + && "bucket_00001".equals(buckets[1].getPath().getName()) && 676L == buckets[1] + .getLen()) || ("bucket_00000".equals(buckets[1].getPath().getName()) && 104L == buckets[1].getLen() - && "bucket_00001".equals(buckets[0].getPath().getName()) && 1248L == buckets[0] .getLen()) + && "bucket_00001".equals(buckets[0].getPath().getName()) && 676L == buckets[0] + .getLen()) ); } else { LOG.debug("This is not the file you are looking for " + stat[i].getPath().getName());