diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index beb36d7674..5af047f465 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -17,11 +17,11 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import java.io.File; -import java.io.FileNotFoundException; import java.io.FileWriter; import java.io.IOException; import java.util.ArrayList; @@ -80,7 +80,6 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.mapred.JobConf; import org.apache.hive.common.util.Retry; -import org.apache.hive.common.util.RetryTestRunner; import org.apache.hive.hcatalog.common.HCatUtil; import org.apache.hive.hcatalog.streaming.DelimitedInputWriter; import org.apache.hive.hcatalog.streaming.HiveEndPoint; @@ -1459,25 +1458,10 @@ public void testMinorCompactionForSplitUpdateWithOnlyInserts() throws Exception checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 1L, 2L, 1); - // Verify that we have got correct set of delete_deltas. + //Assert that we have no delete deltas if there are no input delete events. FileStatus[] deleteDeltaStat = fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deleteEventDeltaDirFilter); - String[] deleteDeltas = new String[deleteDeltaStat.length]; - Path minorCompactedDeleteDelta = null; - for (int i = 0; i < deleteDeltas.length; i++) { - deleteDeltas[i] = deleteDeltaStat[i].getPath().getName(); - if (deleteDeltas[i].equals("delete_delta_0000001_0000002_v0000005")) { - minorCompactedDeleteDelta = deleteDeltaStat[i].getPath(); - } - } - Arrays.sort(deleteDeltas); - String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000002_v0000005"}; - if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) { - Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas)); - } - // There should be no rows in the delete_delta because there have been no delete events. - checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, - 0L, 0L, 1); + assertEquals(0, deleteDeltaStat.length); } @Test @@ -1550,25 +1534,10 @@ public void minorCompactWhileStreamingWithSplitUpdate() throws Exception { checkExpectedTxnsPresent(null, new Path[]{resultFile}, columnNamesProperty, columnTypesProperty, 0, 1L, 4L, 1); - // Verify that we have got correct set of delete_deltas also + //Assert that we have no delete deltas if there are no input delete events. FileStatus[] deleteDeltaStat = fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deleteEventDeltaDirFilter); - String[] deleteDeltas = new String[deleteDeltaStat.length]; - Path minorCompactedDeleteDelta = null; - for (int i = 0; i < deleteDeltas.length; i++) { - deleteDeltas[i] = deleteDeltaStat[i].getPath().getName(); - if (deleteDeltas[i].equals("delete_delta_0000001_0000004_v0000009")) { - minorCompactedDeleteDelta = deleteDeltaStat[i].getPath(); - } - } - Arrays.sort(deleteDeltas); - String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000004_v0000009"}; - if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) { - Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas)); - } - // There should be no rows in the delete_delta because there have been no delete events. - checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, - 0L, 0L, 1); + assertEquals(0, deleteDeltaStat.length); if (connection1 != null) { connection1.close(); diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index c6cb7c5254..42ce1746fd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -962,23 +962,17 @@ public void map(WritableComparable key, CompactorInputSplit split, V value = reader.createValue(); getWriter(reporter, reader.getObjectInspector(), split.getBucket()); - AcidUtils.AcidOperationalProperties acidOperationalProperties - = AcidUtils.getAcidOperationalProperties(jobConf); - - if (!isMajor && acidOperationalProperties.isSplitUpdate()) { - // When split-update is enabled for ACID, we initialize a separate deleteEventWriter - // that is used to write all the delete events (in case of minor compaction only). For major - // compaction, history is not required to be maintained hence the delete events are processed - // but not re-written separately. - getDeleteEventWriter(reporter, reader.getObjectInspector(), split.getBucket()); - } + AcidUtils.AcidOperationalProperties acidOperationalProperties = AcidUtils.getAcidOperationalProperties(jobConf); while (reader.next(identifier, value)) { boolean sawDeleteRecord = reader.isDelete(value); - if (isMajor && sawDeleteRecord) continue; - if (sawDeleteRecord && deleteEventWriter != null) { - // When minor compacting, write delete events to a separate file when split-update is - // turned on. + if (isMajor && sawDeleteRecord) { + continue; + } + if (sawDeleteRecord && acidOperationalProperties.isSplitUpdate()) { + if (deleteEventWriter == null) { + getDeleteEventWriter(reporter, reader.getObjectInspector(), split.getBucket()); + } deleteEventWriter.write(value); reporter.progress(); } else { @@ -1027,7 +1021,7 @@ private void getWriter(Reporter reporter, ObjectInspector inspector, .bucket(bucket) .statementId(-1)//setting statementId == -1 makes compacted delta files use .visibilityTxnId(getCompactorTxnId()); - //delta_xxxx_yyyy format + //delta_xxxx_yyyy format // Instantiate the underlying output format @SuppressWarnings("unchecked")//since there is no way to parametrize instance of Class @@ -1040,28 +1034,25 @@ private void getWriter(Reporter reporter, ObjectInspector inspector, private void getDeleteEventWriter(Reporter reporter, ObjectInspector inspector, int bucket) throws IOException { - if (deleteEventWriter == null) { - AcidOutputFormat.Options options = new AcidOutputFormat.Options(jobConf); - options.inspector(inspector) - .writingBase(false) + + AcidOutputFormat.Options options = new AcidOutputFormat.Options(jobConf); + options.inspector(inspector).writingBase(false) .writingDeleteDelta(true) // this is the option which will make it a delete writer .isCompressed(jobConf.getBoolean(IS_COMPRESSED, false)) - .tableProperties(new StringableMap(jobConf.get(TABLE_PROPS)).toProperties()) - .reporter(reporter) + .tableProperties(new StringableMap(jobConf.get(TABLE_PROPS)).toProperties()).reporter(reporter) .minimumWriteId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE)) - .maximumWriteId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE)) - .bucket(bucket) + .maximumWriteId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE)).bucket(bucket) .statementId(-1)//setting statementId == -1 makes compacted delta files use - // delta_xxxx_yyyy format + // delta_xxxx_yyyy format .visibilityTxnId(getCompactorTxnId()); - // Instantiate the underlying output format - @SuppressWarnings("unchecked")//since there is no way to parametrize instance of Class - AcidOutputFormat aof = + // Instantiate the underlying output format + @SuppressWarnings("unchecked")//since there is no way to parametrize instance of Class + AcidOutputFormat aof = instantiate(AcidOutputFormat.class, jobConf.get(OUTPUT_FORMAT_CLASS_NAME)); - deleteEventWriter = aof.getRawRecordWriter(new Path(jobConf.get(TMP_LOCATION)), options); - } + deleteEventWriter = aof.getRawRecordWriter(new Path(jobConf.get(TMP_LOCATION)), options); + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index dc39f5ef61..546ff955b7 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -2248,6 +2248,32 @@ public void testAcidOrcWritePreservesFieldNames() throws Exception { return rs; } + /** + * This tests that delete_delta_x_y dirs will be not produced during minor compaction if no input delete events. + * See HIVE-20941. + * @throws Exception + */ + @Test + public void testDeleteEventsCompaction() throws Exception { + int[][] tableData1 = {{1, 2}}; + int[][] tableData2 = {{2, 3}}; + int[][] tableData3 = {{3, 4}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData1)); + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData2)); + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData3)); + + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); + txnHandler.compact(new CompactionRequest("default", Table.ACIDTBL.name().toLowerCase(), CompactionType.MINOR)); + runWorker(hiveConf); + runCleaner(hiveConf); + + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] fileStatuses = fs.globStatus(new Path(TEST_WAREHOUSE_DIR + "/" + Table.ACIDTBL.name().toLowerCase() + "/*")); + for(FileStatus fileStatus : fileStatuses) { + Assert.assertFalse(fileStatus.getPath().getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)); + } + } + /** * takes raw data and turns it into a string as if from Driver.getResults() * sorts rows in dictionary order diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java index 7535f84a5e..1138f1108b 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java @@ -329,12 +329,9 @@ public void testCleaner2() throws Exception { txnMgr2 = swapTxnManager(txnMgr1); driver2 = swapDrivers(driver1); runStatementOnDriver("alter table T compact 'minor'");//T4 - TestTxnCommands2.runWorker(hiveConf);//makes delta_1_2 & delete_delta_1_2 + TestTxnCommands2.runWorker(hiveConf);//makes delta_1_2 /* Now we should have target/warehouse/t/ - ├── delete_delta_0000001_0000002_v0000019 - │   ├── _orc_acid_version - │   └── bucket_00000 ├── delta_0000001_0000001_0000 │   ├── _orc_acid_version │   └── bucket_00000 @@ -350,7 +347,6 @@ public void testCleaner2() throws Exception { FileUtils.HIDDEN_FILES_PATH_FILTER); String[] expectedList = new String[] { - "/t/delete_delta_0000001_0000002_v0000019", "/t/delta_0000001_0000002_v0000019", "/t/delta_0000001_0000001_0000", "/t/delta_0000002_0000002_0000", @@ -370,8 +366,7 @@ public void testCleaner2() throws Exception { txnMgr1 = swapTxnManager(txnMgr2); driver1 = swapDrivers(driver2); runStatementOnDriver("commit");//commits T3 - //so now cleaner should be able to delete delta_0000001_0000001_0000 - // & delta_0000002_0000002_0000 + //so now cleaner should be able to delete delta_0000002_0000002_0000 //insert a row so that compactor makes a new delta (due to HIVE-20901) runStatementOnDriver("insert into T values(2,5)");//makes delta_3_3 in T5 @@ -379,13 +374,12 @@ public void testCleaner2() throws Exception { runStatementOnDriver("alter table T compact 'minor'"); TestTxnCommands2.runWorker(hiveConf); /* - at this point delete|delta_0000001_0000003_v0000022 are visible to everyone - so cleaner removes all files shadowed by them (which is everything in this case) + at this point delta_0000001_0000003_v0000022 is visible to everyone + so cleaner removes all files shadowed by it (which is everything in this case) */ TestTxnCommands2.runCleaner(hiveConf); expectedList = new String[] { - "/t/delete_delta_0000001_0000003_v0000022", "/t/delta_0000001_0000003_v0000022" }; actualList = fs.listStatus(new Path(warehousePath + "/t"),