diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java index 78174f345b..20de9dc4f7 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java @@ -148,7 +148,7 @@ private void createTable(String tblName, boolean isPartitioned, boolean isBucket /** * 5 txns. */ - void insertTestDataPartitioned(String tblName) throws Exception { + private void insertTestDataPartitioned(String tblName) throws Exception { executeStatementOnDriver("insert into " + tblName + " values('1',2, 'today'),('1',3, 'today'),('1',4, 'yesterday'),('2',2, 'tomorrow')," + "('2',3, 'yesterday'),('2',4, 'today')", driver); @@ -164,7 +164,7 @@ void insertTestDataPartitioned(String tblName) throws Exception { /** * 3 txns. */ - protected void insertMmTestDataPartitioned(String tblName) throws Exception { + private void insertMmTestDataPartitioned(String tblName) throws Exception { executeStatementOnDriver("insert into " + tblName + " values('1',2, 'today'),('1',3, 'today'),('1',4, 'yesterday'),('2',2, 'tomorrow')," + "('2',3, 'yesterday'),('2',4, 'today')", driver); @@ -175,10 +175,28 @@ protected void insertMmTestDataPartitioned(String tblName) throws Exception { + "('5',4, 'today'),('6',2, 'today'),('6',3, 'today'),('6',4, 'today')", driver); } + + protected void insertTestData(String tblName, boolean isPartitioned) throws Exception { + if (isPartitioned) { + insertTestDataPartitioned(tblName); + } else { + insertTestDataNotPartitioned(tblName); + } + } + + + protected void insertMmTestData(String tblName, boolean isPartitioned) throws Exception { + if (isPartitioned) { + insertMmTestDataPartitioned(tblName); + } else { + insertMmTestDataNotPartitioned(tblName); + } + } + /** * 5 txns. */ - protected void insertTestData(String tblName) throws Exception { + private void insertTestDataNotPartitioned(String tblName) throws Exception { executeStatementOnDriver("insert into " + tblName + " values('1',2),('1',3),('1',4),('2',2),('2',3),('2',4)", driver); executeStatementOnDriver("insert into " + tblName + " values('3',2),('3',3),('3',4),('4',2),('4',3),('4',4)", @@ -192,7 +210,7 @@ protected void insertTestData(String tblName) throws Exception { /** * 3 txns. */ - protected void insertMmTestData(String tblName) throws Exception { + private void insertMmTestDataNotPartitioned(String tblName) throws Exception { executeStatementOnDriver("insert into " + tblName + " values('1',2),('1',3),('1',4),('2',2),('2',3),('2',4)", driver); executeStatementOnDriver("insert into " + tblName + " values('3',2),('3',3),('3',4),('4',2),('4',3),('4',4)", diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java index 9659a3f048..ecda03a739 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java @@ -151,7 +151,7 @@ public void testMinorCompactionNotPartitionedWithoutBuckets() throws Exception { Table table = msClient.getTable(dbName, tableName); FileSystem fs = FileSystem.get(conf); // Insert test data into test table - dataProvider.insertTestData(tableName); + dataProvider.insertTestData(tableName, false); // Get all data before compaction is run List expectedData = dataProvider.getAllData(tableName); // Verify deltas @@ -221,7 +221,7 @@ public void testMinorCompactionNotPartitionedWithBuckets() throws Exception { Table table = metaStoreClient.getTable(dbName, tableName); FileSystem fs = FileSystem.get(conf); // Insert test data into test table - dataProvider.insertTestData(tableName); + dataProvider.insertTestData(tableName, false); // Get all data before compaction is run List expectedData = dataProvider.getAllData(tableName); // Verify deltas @@ -294,7 +294,7 @@ public void testMinorCompactionPartitionedWithoutBuckets() throws Exception { Table table = metaStoreClient.getTable(dbName, tableName); FileSystem fs = FileSystem.get(conf); // Insert test data into test table - dataProvider.insertTestDataPartitioned(tableName); + dataProvider.insertTestData(tableName, true); // Get all data before compaction is run List expectedData = dataProvider.getAllData(tableName); // Verify deltas @@ -374,7 +374,7 @@ public void testMinorCompactionPartitionedWithBuckets() throws Exception { Table table = metaStoreClient.getTable(dbName, tableName); FileSystem fs = FileSystem.get(conf); // Insert test data into test table - dataProvider.insertTestDataPartitioned(tableName); + dataProvider.insertTestData(tableName, true); // Get all data before compaction is run List expectedData = dataProvider.getAllData(tableName); // Verify deltas @@ -511,7 +511,7 @@ public void testMultipleMinorCompactions() throws Exception { Table table = metaStoreClient.getTable(dbName, tableName); FileSystem fs = FileSystem.get(conf); // Insert test data into test table - dataProvider.insertTestData(tableName); + dataProvider.insertTestData(tableName, false); // Run a compaction CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true); // Clean up resources @@ -522,7 +522,7 @@ public void testMultipleMinorCompactions() throws Exception { Assert.assertEquals("Completed compaction queue must contain one element", 1, compacts.size()); Assert.assertEquals("Compaction state is not succeeded", "succeeded", compacts.get(0).getState()); // Insert test data into test table - dataProvider.insertTestData(tableName); + dataProvider.insertTestData(tableName, false); // Run a compaction CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true); // Clean up resources @@ -532,7 +532,7 @@ public void testMultipleMinorCompactions() throws Exception { Assert.assertEquals("Completed compaction queue must contain one element", 2, compacts.size()); Assert.assertEquals("Compaction state is not succeeded", "succeeded", compacts.get(1).getState()); // Insert test data into test table - dataProvider.insertTestData(tableName); + dataProvider.insertTestData(tableName, false); // Run a compaction CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true); // Clean up resources @@ -691,7 +691,7 @@ public void testMajorCompactionAfterMinor() throws Exception { Table table = metaStoreClient.getTable(dbName, tableName); FileSystem fs = FileSystem.get(conf); // Insert test data into test table - dataProvider.insertTestData(tableName); + dataProvider.insertTestData(tableName, false); // Get all data before compaction is run List expectedData = dataProvider.getAllData(tableName); Collections.sort(expectedData); @@ -715,7 +715,7 @@ public void testMajorCompactionAfterMinor() throws Exception { List actualData = dataProvider.getAllData(tableName); Assert.assertEquals(expectedData, actualData); // Insert another round of test data - dataProvider.insertTestData(tableName); + dataProvider.insertTestData(tableName, false); expectedData = dataProvider.getAllData(tableName); Collections.sort(expectedData); // Run a compaction @@ -747,7 +747,7 @@ public void testMinorCompactionAfterMajor() throws Exception { Table table = metaStoreClient.getTable(dbName, tableName); FileSystem fs = FileSystem.get(conf); // Insert test data into test table - dataProvider.insertTestData(tableName); + dataProvider.insertTestData(tableName, false); // Get all data before compaction is run List expectedData = dataProvider.getAllData(tableName); Collections.sort(expectedData); @@ -768,7 +768,7 @@ public void testMinorCompactionAfterMajor() throws Exception { List actualData = dataProvider.getAllData(tableName); Assert.assertEquals(expectedData, actualData); // Insert another round of test data - dataProvider.insertTestData(tableName); + dataProvider.insertTestData(tableName, false); expectedData = dataProvider.getAllData(tableName); Collections.sort(expectedData); // Run a compaction @@ -931,7 +931,7 @@ public void testLlapCacheOffDuringCompaction() throws Exception { QueryCompactor qc = new QueryCompactor() { @Override void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor, - ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException { + ValidWriteIdList writeIds, CompactionInfo compactionInfo, AcidUtils.Directory dir) throws IOException { } @Override @@ -956,4 +956,164 @@ protected void commitCompaction(String dest, String tmpTableName, HiveConf conf, qc.runCompactionQueries(hiveConf, null, sdMock, null, null, emptyQueries, emptyQueries, emptyQueries); Assert.assertEquals("none", hiveConf.getVar(HiveConf.ConfVars.LLAP_IO_ETL_SKIP_FORMAT)); } + + @Test public void testMajorCompactionWithMergeNotPartitionedWithoutBuckets() throws Exception { + testCompactionWithMerge(CompactionType.MAJOR, false, false, null, Collections.singletonList("bucket_00000"), + Arrays.asList("delta_0000001_0000001_0000", "delta_0000002_0000002_0000", "delta_0000003_0000003_0000"), + Collections.singletonList("base_0000003_v0000007"), true, true); + } + + @Test public void testMajorCompactionWithMergePartitionedWithoutBuckets() throws Exception { + testCompactionWithMerge(CompactionType.MAJOR, true, false, "ds=today", Collections.singletonList("bucket_00000"), + Arrays.asList("delta_0000001_0000001_0000", "delta_0000002_0000002_0000", "delta_0000003_0000003_0000"), + Collections.singletonList("base_0000003_v0000007"), true, true); + } + + @Test public void testMajorCompactionWithMergeNotPartitionedWithBuckets() throws Exception { + testCompactionWithMerge(CompactionType.MAJOR, false, true, null, Arrays.asList("bucket_00000", "bucket_00001"), + Arrays.asList("delta_0000001_0000001_0000", "delta_0000002_0000002_0000", "delta_0000003_0000003_0000"), + Collections.singletonList("base_0000003_v0000007"), true, true); + } + + @Test public void testMajorCompactionWithMergerPartitionedWithBuckets() throws Exception { + testCompactionWithMerge(CompactionType.MAJOR, true, true, "ds=today", Arrays.asList("bucket_00000", "bucket_00001"), + Arrays.asList("delta_0000001_0000001_0000", "delta_0000002_0000002_0000", "delta_0000003_0000003_0000"), + Collections.singletonList("base_0000003_v0000007"), true, true); + } + + @Test + public void testMinorCompactionWithMergeNotPartitionedWithoutBuckets() throws Exception { + testCompactionWithMerge(CompactionType.MINOR, false, false, null, + Collections.singletonList("bucket_00000"), + Arrays.asList("delta_0000001_0000001_0000", "delta_0000002_0000002_0000", "delta_0000003_0000003_0000"), + Collections.singletonList("delta_0000001_0000003_v0000007"), true, true); + } + + @Test + public void testMinorCompactionWithMergePartitionedWithoutBuckets() throws Exception { + testCompactionWithMerge(CompactionType.MINOR, true, false, "ds=today", + Collections.singletonList("bucket_00000"), + Arrays.asList("delta_0000001_0000001_0000", "delta_0000002_0000002_0000", "delta_0000003_0000003_0000"), + Collections.singletonList("delta_0000001_0000003_v0000007"), true, true); + } + + @Test + public void testMinorCompactionWithMergeNotPartitionedWithBuckets() throws Exception { + testCompactionWithMerge(CompactionType.MINOR, false, true, null, + Arrays.asList("bucket_00000", "bucket_00001"), + Arrays.asList("delta_0000001_0000001_0000", "delta_0000002_0000002_0000", "delta_0000003_0000003_0000"), + Collections.singletonList("delta_0000001_0000003_v0000007"), true, true); + } + + @Test + public void testMinorCompactionWithMergePartitionedWithBuckets() throws Exception { + testCompactionWithMerge(CompactionType.MINOR, true, true, "ds=today", + Arrays.asList("bucket_00000", "bucket_00001"), + Arrays.asList("delta_0000001_0000001_0000", "delta_0000002_0000002_0000", "delta_0000003_0000003_0000"), + Collections.singletonList("delta_0000001_0000003_v0000007"), true, true); + } + + @Test + public void testMajorCompactionAfterMinorWithMerge() throws Exception { + testCompactionWithMerge(CompactionType.MINOR, true, true, "ds=today", + Arrays.asList("bucket_00000", "bucket_00001"), + Arrays.asList("delta_0000001_0000001_0000", "delta_0000002_0000002_0000", "delta_0000003_0000003_0000"), + Collections.singletonList("delta_0000001_0000003_v0000007"),true, false); + testCompactionWithMerge(CompactionType.MAJOR, true, true, "ds=today", + Arrays.asList("bucket_00000", "bucket_00001"), + Arrays.asList("delta_0000001_0000003_v0000007", "delta_0000004_0000004_0000", "delta_0000005_0000005_0000", + "delta_0000006_0000006_0000"), Collections.singletonList("base_0000006_v0000013"), false, true); + } + + @Test + public void testMinorCompactionAfterMajorWithMerge() throws Exception { + testCompactionWithMerge(CompactionType.MAJOR, false, false, null, + Collections.singletonList("bucket_00000"), + Arrays.asList("delta_0000001_0000001_0000", "delta_0000002_0000002_0000", "delta_0000003_0000003_0000"), + Collections.singletonList("base_0000003_v0000007"), true, false); + testCompactionWithMerge(CompactionType.MINOR, false, false, null, + Collections.singletonList("bucket_00000"), + Arrays.asList("delta_0000004_0000004_0000", "delta_0000005_0000005_0000", "delta_0000006_0000006_0000"), + Collections.singletonList("delta_0000001_0000006_v0000013"), false, true); + } + + @Test + public void testMultipleMajorCompactionWithMerge() throws Exception { + testCompactionWithMerge(CompactionType.MAJOR, true, true, "ds=today", + Arrays.asList("bucket_00000", "bucket_00001"), + Arrays.asList("delta_0000001_0000001_0000", "delta_0000002_0000002_0000", "delta_0000003_0000003_0000"), + Collections.singletonList("base_0000003_v0000007"), true, false); + testCompactionWithMerge(CompactionType.MAJOR, true, true, "ds=today", + Arrays.asList("bucket_00000", "bucket_00001"), + Arrays.asList("delta_0000004_0000004_0000", "delta_0000005_0000005_0000", "delta_0000006_0000006_0000"), + Collections.singletonList("base_0000006_v0000013"), false, true); + } + + @Test + public void testMultipleMinorCompactionWithMerge() throws Exception { + testCompactionWithMerge(CompactionType.MINOR, false, false, null, + Collections.singletonList("bucket_00000"), + Arrays.asList("delta_0000001_0000001_0000", "delta_0000002_0000002_0000", "delta_0000003_0000003_0000"), + Collections.singletonList("delta_0000001_0000003_v0000007"), true, false); + testCompactionWithMerge(CompactionType.MINOR, false, false, null, + Collections.singletonList("bucket_00000"), + Arrays.asList("delta_0000001_0000003_v0000007", "delta_0000004_0000004_0000", "delta_0000005_0000005_0000", + "delta_0000006_0000006_0000"), + Collections.singletonList("delta_0000001_0000006_v0000013"), false, true); + } + + private void testCompactionWithMerge(CompactionType compactionType, boolean isPartitioned, boolean isBucketed, + String partitionName, List bucketName, List deltaDirNames, List compactDirNames, + boolean createTable, boolean dropTable) throws Exception { + String dbName = "default"; + String tableName = "testCompaction"; + // Create test table + TestDataProvider dataProvider = new TestDataProvider(); + if (createTable) { + dataProvider.createFullAcidTable(tableName, isPartitioned, isBucketed); + } + // Find the location of the table + IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf); + Table table = metaStoreClient.getTable(dbName, tableName); + FileSystem fs = FileSystem.get(conf); + // Insert test data into test table + dataProvider.insertMmTestData(tableName, isPartitioned); + // Get all data before compaction is run + List expectedData = dataProvider.getAllData(tableName); + // Verify deltas + Assert.assertEquals("Delta directories does not match", + deltaDirNames, CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, partitionName)); + // Verify delete delta + Assert.assertTrue(CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deleteEventDeltaDirFilter, table, + partitionName).isEmpty()); + // Run a compaction + if (partitionName == null) { + CompactorTestUtil.runCompaction(conf, dbName, tableName, compactionType, true); + } else { + CompactorTestUtil.runCompaction(conf, dbName, tableName, compactionType, true, partitionName); + } + // Clean up resources + CompactorTestUtil.runCleaner(conf); + // Only 1 compaction should be in the response queue with succeeded state + List compacts = + TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts(); + compacts.forEach(c -> Assert.assertEquals("succeeded", c.getState())); + // Verify directories after compaction + List actualDirAfterComp = + CompactorTestUtil.getBaseOrDeltaNames(fs, compactionType == CompactionType.MAJOR ? AcidUtils.baseFileFilter : + AcidUtils.deltaFileFilter, table, partitionName); + Assert.assertEquals("Base directory does not match after compaction", compactDirNames, actualDirAfterComp); + Assert.assertTrue(CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deleteEventDeltaDirFilter, table, + partitionName).isEmpty()); + // Verify bucket files in delta dirs + Assert.assertEquals("Bucket names are not matching after compaction", bucketName, + CompactorTestUtil.getBucketFileNames(fs, table, partitionName, actualDirAfterComp.get(0))); + // Verify all contents + List actualData = dataProvider.getAllData(tableName); + Assert.assertEquals(expectedData, actualData); + // Clean up + if (dropTable) { + dataProvider.dropTable(tableName); + } + } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java index 074430ce7f..71662e89c8 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java @@ -59,7 +59,7 @@ Table table = msClient.getTable(dbName, tableName); FileSystem fs = FileSystem.get(conf); // Insert test data into test table - testDataProvider.insertMmTestData(tableName); + testDataProvider.insertMmTestData(tableName, false); // Get all data before compaction is run List expectedData = testDataProvider.getAllData(tableName); // Verify deltas @@ -98,7 +98,7 @@ Table table = metaStoreClient.getTable(dbName, tableName); FileSystem fs = FileSystem.get(conf); // Insert test data into test table - testDataProvider.insertMmTestData(tableName); + testDataProvider.insertMmTestData(tableName, false); // Get all data before compaction is run List expectedData = testDataProvider.getAllData(tableName); // Verify deltas @@ -139,7 +139,7 @@ Table table = metaStoreClient.getTable(dbName, tableName); FileSystem fs = FileSystem.get(conf); // Insert test data into test table - dataProvider.insertMmTestDataPartitioned(tableName); + dataProvider.insertMmTestData(tableName, true); // Get all data before compaction is run List expectedData = dataProvider.getAllData(tableName); // Verify deltas @@ -211,7 +211,7 @@ private void testMmMinorCompactionPartitionedWithBuckets(String fileFormat) thro Table table = metaStoreClient.getTable(dbName, tableName); FileSystem fs = FileSystem.get(conf); // Insert test data into test table - dataProvider.insertMmTestDataPartitioned(tableName); + dataProvider.insertMmTestData(tableName, true); // Get all data before compaction is run List expectedData = dataProvider.getAllData(tableName); // Verify deltas @@ -296,20 +296,20 @@ private void testMmMinorCompactionPartitionedWithBuckets(String fileFormat) thro Table table = metaStoreClient.getTable(dbName, tableName); FileSystem fs = FileSystem.get(conf); // Insert test data into test table - dataProvider.insertMmTestData(tableName); + dataProvider.insertMmTestData(tableName, false); // Run a compaction CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true); CompactorTestUtil.runCleaner(conf); verifySuccessulTxn(1); List compacts; // Insert test data into test table - dataProvider.insertMmTestData(tableName); + dataProvider.insertMmTestData(tableName, false); // Run a compaction CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true); CompactorTestUtil.runCleaner(conf); verifySuccessulTxn(2); // Insert test data into test table - dataProvider.insertMmTestData(tableName); + dataProvider.insertMmTestData(tableName, false); // Run a compaction CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true); CompactorTestUtil.runCleaner(conf); @@ -318,7 +318,7 @@ private void testMmMinorCompactionPartitionedWithBuckets(String fileFormat) thro List actualDeltasAfterComp = CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null); Assert.assertEquals("Delta directories does not match after compaction", - Collections.singletonList("delta_0000001_0000009_v0000026"), actualDeltasAfterComp); + Collections.singletonList("delta_0000001_0000009_v0000014"), actualDeltasAfterComp); } @@ -333,7 +333,7 @@ private void testMmMinorCompactionPartitionedWithBuckets(String fileFormat) thro Table table = metaStoreClient.getTable(dbName, tableName); FileSystem fs = FileSystem.get(conf); // Insert test data into test table - dataProvider.insertMmTestData(tableName); + dataProvider.insertMmTestData(tableName, false); // Get all data before compaction is run List expectedData = dataProvider.getAllData(tableName); Collections.sort(expectedData); @@ -349,7 +349,7 @@ private void testMmMinorCompactionPartitionedWithBuckets(String fileFormat) thro verifyAllContents(tableName, dataProvider, expectedData); List actualData; // Insert a second round of test data into test table; update expectedData - dataProvider.insertMmTestData(tableName); + dataProvider.insertMmTestData(tableName, false); expectedData = dataProvider.getAllData(tableName); // Run a compaction CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MAJOR, true); @@ -357,7 +357,7 @@ private void testMmMinorCompactionPartitionedWithBuckets(String fileFormat) thro verifySuccessulTxn(2); // Verify base directory after compaction Assert.assertEquals("Base directory does not match after major compaction", - Collections.singletonList("base_0000006_v0000019"), + Collections.singletonList("base_0000006_v0000013"), CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, table, null)); actualData = dataProvider.getAllData(tableName); Assert.assertEquals(expectedData, actualData); @@ -374,7 +374,7 @@ private void testMmMinorCompactionPartitionedWithBuckets(String fileFormat) thro Table table = metaStoreClient.getTable(dbName, tableName); FileSystem fs = FileSystem.get(conf); // Insert test data into test table - dataProvider.insertMmTestData(tableName); + dataProvider.insertMmTestData(tableName, false); // Get all data before compaction is run List expectedData = dataProvider.getAllData(tableName); Collections.sort(expectedData); @@ -389,7 +389,7 @@ private void testMmMinorCompactionPartitionedWithBuckets(String fileFormat) thro CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, table, null)); verifyAllContents(tableName, dataProvider, expectedData); // Insert test data into test table - dataProvider.insertMmTestData(tableName); + dataProvider.insertMmTestData(tableName, false); expectedData = dataProvider.getAllData(tableName); Collections.sort(expectedData); // Run a compaction @@ -401,7 +401,7 @@ private void testMmMinorCompactionPartitionedWithBuckets(String fileFormat) thro Collections.singletonList("base_0000003_v0000007"), CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, table, null)); Assert.assertEquals("Delta directories does not match after minor compaction", - Collections.singletonList("delta_0000001_0000006_v0000016"), + Collections.singletonList("delta_0000001_0000006_v0000013"), CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null)); verifyAllContents(tableName, dataProvider, expectedData); } @@ -498,7 +498,7 @@ private void testMmMinorCompactionPartitionedWithBuckets(String fileFormat) thro Table table = metaStoreClient.getTable(dbName, tableName); FileSystem fs = FileSystem.get(conf); // Insert test data into test table - testDataProvider.insertMmTestData(tableName); + testDataProvider.insertMmTestData(tableName, false); // Get all data before compaction is run. Expected data is 2 x MmTestData insertion List expectedData = new ArrayList<>(); List oneMmTestDataInsertion = testDataProvider.getAllData(tableName); @@ -507,7 +507,7 @@ private void testMmMinorCompactionPartitionedWithBuckets(String fileFormat) thro Collections.sort(expectedData); // Insert an aborted directory (txns 4-6) rollbackAllTxns(true, driver); - testDataProvider.insertMmTestData(tableName); + testDataProvider.insertMmTestData(tableName, false); rollbackAllTxns(false, driver); // Check that delta dirs 4-6 exist List actualDeltasAfterComp = @@ -517,7 +517,7 @@ private void testMmMinorCompactionPartitionedWithBuckets(String fileFormat) thro "delta_0000003_0000003_0000", "delta_0000004_0000004_0000", "delta_0000005_0000005_0000", "delta_0000006_0000006_0000"), actualDeltasAfterComp); // Insert another round of test data (txns 7-9) - testDataProvider.insertMmTestData(tableName); + testDataProvider.insertMmTestData(tableName, false); verifyAllContents(tableName, testDataProvider, expectedData); // Run a minor compaction CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index c44f2b5026..32b41d26b6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -228,7 +228,7 @@ void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor */ QueryCompactor queryCompactor = QueryCompactorFactory.getQueryCompactor(t, conf, ci); if (queryCompactor != null) { - queryCompactor.runCompaction(conf, t, p, sd, writeIds, ci); + queryCompactor.runCompaction(conf, t, p, sd, writeIds, ci, dir); return; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java index 9385080713..692701bad1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java @@ -26,10 +26,10 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; -import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; + import java.io.IOException; import java.util.List; @@ -40,7 +40,7 @@ @Override void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor, - ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException, HiveException { + ValidWriteIdList writeIds, CompactionInfo compactionInfo, AcidUtils.Directory dir) throws IOException, HiveException { AcidUtils .setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(table.getParameters())); @@ -52,19 +52,17 @@ void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageD * with same bucket number in one map task. */ conf.set(HiveConf.ConfVars.SPLIT_GROUPING_MODE.varname, "compactor"); + Path outputDirPath = Util.getCompactionOutputDirPath(conf, writeIds, true, storageDescriptor); + if (!Util.hasDeleteOrAbortedDirectories(dir, writeIds)) { + // Only inserts happened, it is much more performant to merge the files than running a query + Util.mergeOrcFiles(conf, true, dir, outputDirPath, false); + return; + } String tmpPrefix = table.getDbName() + "_tmp_compactor_" + table.getTableName() + "_"; String tmpTableName = tmpPrefix + System.currentTimeMillis(); - long minOpenWriteId = writeIds.getMinOpenWriteId() == null ? 1 : writeIds.getMinOpenWriteId(); - long highWaterMark = writeIds.getHighWatermark(); - long compactorTxnId = CompactorMR.CompactorMap.getCompactorTxnId(conf); - AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf).writingBase(true) - .writingDeleteDelta(false).isCompressed(false).minimumWriteId(minOpenWriteId) - .maximumWriteId(highWaterMark).statementId(-1).visibilityTxnId(compactorTxnId); - Path tmpTablePath = AcidUtils.baseOrDeltaSubdirPath(new Path(storageDescriptor.getLocation()), options); - - List createQueries = getCreateQueries(tmpTableName, table, tmpTablePath.toString()); + List createQueries = getCreateQueries(tmpTableName, table, outputDirPath.toString()); List compactionQueries = getCompactionQueries(table, partition, tmpTableName); List dropQueries = getDropQueries(tmpTableName); runCompactionQueries(conf, tmpTableName, storageDescriptor, writeIds, compactionInfo, createQueries, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java index 01cd2fc93d..17110aa28a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hive.common.util.Ref; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,19 +48,22 @@ @Override void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor, - ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException, HiveException { + ValidWriteIdList writeIds, CompactionInfo compactionInfo, AcidUtils.Directory dir) throws IOException, HiveException { LOG.info("Running query based minor compaction"); AcidUtils .setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(table.getParameters())); - AcidUtils.Directory dir = AcidUtils - .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false, - table.getParameters(), false); // Set up the session for driver. HiveConf conf = new HiveConf(hiveConf); conf.set(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column"); conf.set(HiveConf.ConfVars.SPLIT_GROUPING_MODE.varname, "compactor"); conf.setBoolVar(HiveConf.ConfVars.HIVE_STATS_FETCH_COLUMN_STATS, false); conf.setBoolVar(HiveConf.ConfVars.HIVE_STATS_ESTIMATE_STATS, false); + + if (!Util.hasDeleteOrAbortedDirectories(dir, writeIds)) { + Util.mergeOrcFiles(conf, false, dir, + Util.getCompactionOutputDirPath(conf, writeIds, false, storageDescriptor), false); + return; + } String tmpTableName = table.getDbName() + "_tmp_compactor_" + table.getTableName() + "_" + System.currentTimeMillis(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java index 41fdd7e210..45b6f7c7d0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java @@ -31,7 +31,6 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hive.common.util.Ref; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,21 +45,24 @@ private static final Logger LOG = LoggerFactory.getLogger(MmMajorQueryCompactor.class.getName()); @Override void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor, - ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException { + ValidWriteIdList writeIds, CompactionInfo compactionInfo, AcidUtils.Directory dir) throws IOException { LOG.debug("Going to delete directories for aborted transactions for MM table " + table.getDbName() + "." + table .getTableName()); - AcidUtils.Directory dir = AcidUtils - .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false, - table.getParameters(), false); MmQueryCompactorUtils.removeFilesForMmTable(hiveConf, dir); - String tmpLocation = Util.generateTmpPath(storageDescriptor); - Path baseLocation = new Path(tmpLocation, "_base"); - // Set up the session for driver. HiveConf driverConf = new HiveConf(hiveConf); driverConf.set(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column"); + if (storageDescriptor.getOutputFormat().equalsIgnoreCase("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat") + && !Util.hasDeleteOrAbortedDirectories(dir, writeIds)) { + Util.mergeOrcFiles(driverConf, true, dir, + Util.getCompactionOutputDirPath(driverConf, writeIds, true, storageDescriptor), true); + return; + } + + String tmpLocation = Util.generateTmpPath(storageDescriptor); + Path baseLocation = new Path(tmpLocation, "_base"); // Note: we could skip creating the table and just add table type stuff directly to the // "insert overwrite directory" command if there were no bucketing or list bucketing. String tmpPrefix = table.getDbName() + ".tmp_compactor_" + table.getTableName() + "_"; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java index feb667cba9..90057a804e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java @@ -31,7 +31,6 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hive.common.util.Ref; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,23 +47,26 @@ private static final Logger LOG = LoggerFactory.getLogger(MmMinorQueryCompactor.class.getName()); - @Override void runCompaction(HiveConf hiveConf, Table table, Partition partition, - StorageDescriptor storageDescriptor, ValidWriteIdList writeIds, CompactionInfo compactionInfo) - throws IOException { + @Override void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor, + ValidWriteIdList writeIds, CompactionInfo compactionInfo, AcidUtils.Directory dir) throws IOException { LOG.debug( "Going to delete directories for aborted transactions for MM table " + table.getDbName() + "." + table.getTableName()); - AcidUtils.Directory dir = AcidUtils - .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, - Ref.from(false), false, table.getParameters(), false); MmQueryCompactorUtils.removeFilesForMmTable(hiveConf, dir); + HiveConf driverConf = setUpDriverSession(hiveConf); + + if (storageDescriptor.getOutputFormat().equalsIgnoreCase("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat") + && !Util.hasDeleteOrAbortedDirectories(dir, writeIds)) { + Util.mergeOrcFiles(driverConf, false, dir, + Util.getCompactionOutputDirPath(driverConf, writeIds, false, storageDescriptor), true); + return; + } + String tmpLocation = Util.generateTmpPath(storageDescriptor); Path sourceTabLocation = new Path(tmpLocation); Path resultTabLocation = new Path(tmpLocation, "_result"); - HiveConf driverConf = setUpDriverSession(hiveConf); - String tmpPrefix = table.getDbName() + ".tmp_minor_compactor_" + table.getTableName() + "_"; String tmpTableBase = tmpPrefix + System.currentTimeMillis(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/OrcFileMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/OrcFileMerger.java new file mode 100644 index 0000000000..d459931675 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/OrcFileMerger.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.orc.CompressionKind; +import org.apache.hadoop.hive.ql.io.orc.OrcFile; +import org.apache.hadoop.hive.ql.io.orc.Reader; +import org.apache.hadoop.hive.ql.io.orc.RecordReader; +import org.apache.hadoop.hive.ql.io.orc.Writer; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Class to support fast merging of ORC files. + */ +public class OrcFileMerger { + + private final Configuration conf; + private final static Logger LOG = LoggerFactory.getLogger(OrcFileMerger.class); + + public OrcFileMerger(Configuration conf) { + this.conf = conf; + } + + /** + * Merge orc files into a single file + * @param files list of orc file paths to be merged + * @param outPath the path of output orc file + * @throws IOException error happened during file operations + */ + public void mergeFiles(List files, Path outPath) throws IOException { + Writer writer = null; + List readers = new ArrayList<>(); + for (Path path : files) { + FileSystem fs = path.getFileSystem(conf); + readers.add(OrcFile.createReader(fs, path)); + } + + if (!checkCompatibility(readers)) { + return; + } + try { + for (int i = 0; i < readers.size(); i++) { + Reader reader = readers.get(i); + Path path = files.get(i); + LOG.info("ORC merge file input path: {}", path.toString()); + if (writer == null) { + writer = setupWriter(reader, outPath); + } + VectorizedRowBatch batch = reader.getSchema().createRowBatchV2(); + RecordReader rows = reader.rows(); + while (rows.nextBatch(batch)) { + if (batch != null) { + writer.addRowBatch(batch); + } + } + rows.close(); + } + } finally { + if (writer != null) { + writer.close(); + } + } + } + + /** + * Create a new instance of orc output writer. The writer parameters are collected from the orc reader. + * @param reader orc reader of file + * @param outPath the path of the output file + * @return a new instance of orc writer, always non-null + * @throws IOException error during file operation + */ + private Writer setupWriter(Reader reader, Path outPath) throws IOException { + OrcFile.WriterOptions options = + OrcFile.writerOptions(conf).compress(reader.getCompression()).version(reader.getFileVersion()) + .rowIndexStride(reader.getRowIndexStride()).inspector(reader.getObjectInspector()); + if (CompressionKind.NONE != reader.getCompression()) { + options.bufferSize(reader.getCompressionSize()).enforceBufferSize(); + } + Writer writer = OrcFile.createWriter(outPath, options); + LOG.info("ORC merge file output path: " + outPath); + return writer; + } + + /** + * Check compatibility between readers. + * @param readers list of readers + * @return true, if the readers are compatible + */ + private boolean checkCompatibility(final List readers) { + if (readers == null || readers.isEmpty()) { + return false; + } + + if (!readers.stream().allMatch( + r -> readers.get(0).getSchema().equals(r.getSchema()) && readers.get(0).getCompression() + .equals(r.getCompression()) && readers.get(0).getCompressionSize() == r.getCompressionSize() && readers + .get(0).getFileVersion().equals(r.getFileVersion()) && readers.get(0).getWriterVersion() + .equals(r.getWriterVersion()) && readers.get(0).getRowIndexStride() == r.getRowIndexStride())) { + LOG.warn("Incompatible ORC file merge!"); + return false; + } + return true; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java index 7a9e48ff1e..228161db77 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.txn.compactor; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidWriteIdList; @@ -29,6 +30,8 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.ql.DriverUtils; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.session.SessionState; @@ -37,8 +40,12 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; +import java.util.regex.Matcher; /** * Common interface for query based compactions. @@ -59,7 +66,7 @@ * @throws IOException compaction cannot be finished. */ abstract void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor, - ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException, HiveException; + ValidWriteIdList writeIds, CompactionInfo compactionInfo, AcidUtils.Directory dir) throws IOException, HiveException; /** * This is the final step of the compaction, which can vary based on compaction type. Usually this involves some file @@ -203,5 +210,117 @@ static void cleanupEmptyDir(HiveConf conf, String tmpTableName) throws IOExcepti } } } + + /** + * Scan a directory for delete deltas or aborted directories. + * @param directory the directory to be scanned + * @param validWriteIdList list of valid write IDs + * @return true, if delete or aborted directory found + */ + static boolean hasDeleteOrAbortedDirectories(AcidUtils.Directory directory, ValidWriteIdList validWriteIdList) { + if (!directory.getCurrentDirectories().isEmpty()) { + final long minWriteId = validWriteIdList.getMinOpenWriteId() == null ? 1 : validWriteIdList.getMinOpenWriteId(); + final long maxWriteId = validWriteIdList.getHighWatermark(); + return directory.getCurrentDirectories().stream() + .filter(AcidUtils.ParsedDeltaLight::isDeleteDelta) + .filter(delta -> delta.getMinWriteId() >= minWriteId) + .anyMatch(delta -> delta.getMaxWriteId() <= maxWriteId) || !directory.getAbortedDirectories().isEmpty(); + } + return true; + } + + /** + * Collect the list of all bucket file paths, which belong to the same bucket Id. This method scans all the base + * and delta dirs. + * @param conf hive configuration, must be not null + * @param dir the root directory of delta dirs + * @param includeBaseDir true, if the base directory should be scanned + * @param isMm + * @return map of bucket ID -> bucket files + * @throws IOException an error happened during the reading of the directory/bucket file + */ + private static Map> matchBucketIdToBucketFiles(HiveConf conf, AcidUtils.Directory dir, + boolean includeBaseDir, boolean isMm) throws IOException { + Map> result = new HashMap<>(); + if (includeBaseDir && dir.getBaseDirectory() != null) { + getBucketFiles(conf, dir.getBaseDirectory(), isMm, result); + } + for (AcidUtils.ParsedDelta deltaDir : dir.getCurrentDirectories()) { + Path deltaDirPath = deltaDir.getPath(); + getBucketFiles(conf, deltaDirPath, isMm, result); + } + return result; + } + + /** + * Collect the list of all bucket file paths, which belong to the same bucket Id. This method checks only one + * directory. + * @param conf hive configuration, must be not null + * @param dirPath the directory to be scanned. + * @param isMm collect bucket files fron insert only directories + * @param bucketIdToBucketFilePath the result of the scan + * @throws IOException an error happened during the reading of the directory/bucket file + */ + private static void getBucketFiles(HiveConf conf, Path dirPath, boolean isMm, Map> bucketIdToBucketFilePath) throws IOException { + FileSystem fs = dirPath.getFileSystem(conf); + FileStatus[] fileStatuses = + fs.listStatus(dirPath, isMm ? AcidUtils.originalBucketFilter : AcidUtils.bucketFileFilter); + for (FileStatus f : fileStatuses) { + final Path fPath = f.getPath(); + Matcher matcher = isMm ? AcidUtils.LEGACY_BUCKET_DIGIT_PATTERN + .matcher(fPath.getName()) : AcidUtils.BUCKET_PATTERN.matcher(fPath.getName()); + if (!matcher.find()) { + String errorMessage = String + .format("Found a bucket file matching the bucket pattern! %s Matcher=%s", fPath.toString(), + matcher.toString()); + LOG.error(errorMessage); + throw new IllegalArgumentException(errorMessage); + } + int bucketNum = matcher.groupCount() > 0 ? Integer.parseInt(matcher.group(1)) : Integer.parseInt(matcher.group()); + bucketIdToBucketFilePath.computeIfAbsent(bucketNum, ArrayList::new); + bucketIdToBucketFilePath.computeIfPresent(bucketNum, (k, v) -> v).add(fPath); + } + } + + /** + * Generate output path for compaction. This can be used to generate delta or base directories. + * @param conf hive configuration, must be non-null + * @param writeIds list of valid write IDs + * @param isBaseDir if base directory path should be generated + * @param sd the resolved storadge descriptor + * @return output path, always non-null + */ + static Path getCompactionOutputDirPath(HiveConf conf, ValidWriteIdList writeIds, boolean isBaseDir, + StorageDescriptor sd) { + long minOpenWriteId = writeIds.getMinOpenWriteId() == null ? 1 : writeIds.getMinOpenWriteId(); + long highWatermark = writeIds.getHighWatermark(); + long compactorTxnId = CompactorMR.CompactorMap.getCompactorTxnId(conf); + AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf).writingBase(isBaseDir) + .writingDeleteDelta(false).isCompressed(false).minimumWriteId(minOpenWriteId) + .maximumWriteId(highWatermark).statementId(-1).visibilityTxnId(compactorTxnId); + return AcidUtils.baseOrDeltaSubdirPath(new Path(sd.getLocation()), options); + } + + /** + * Merge ORC files from base/delta directories. If the directories contains multiple buckets, the result will also + * contain the same amount. + * @param conf hive configuration + * @param includeBaseDir if base directory should be scanned for orc files + * @param dir the root directory of the table/partition + * @param outputDirPath the result directory path + * @param isMm merge orc files from insert only tables + * @throws IOException error occurred during file operation + */ + static void mergeOrcFiles(HiveConf conf, boolean includeBaseDir, AcidUtils.Directory dir, + Path outputDirPath, boolean isMm) throws IOException { + Map> bucketIdToBucketFiles = matchBucketIdToBucketFiles(conf, dir, includeBaseDir, isMm); + OrcFileMerger fileMerger = new OrcFileMerger(conf); + for (Map.Entry> e : bucketIdToBucketFiles.entrySet()) { + Path path = isMm ? new Path(outputDirPath, String.format(AcidUtils.LEGACY_FILE_BUCKET_DIGITS, + e.getKey()) + "_0") : new Path(outputDirPath, AcidUtils.BUCKET_PREFIX + String.format(AcidUtils.BUCKET_DIGITS, + e.getKey())); + fileMerger.mergeFiles(e.getValue(), path); + } + } } }