diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 37c5314..b54a95d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -250,10 +250,9 @@ void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, } } - if (parsedDeltas.size() == 0) { - // Seriously, no deltas? Can't compact that. - LOG.error( "No delta files found to compact in " + sd.getLocation()); - //couldn't someone want to run a Major compaction to convert old table to ACID? + if (parsedDeltas.size() == 0 && dir.getOriginalFiles() == null) { + // Skip compaction if there's no delta files AND there's no original files + LOG.error("No delta files or original files found to compact in " + sd.getLocation()); return; } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index a1bd0fb..e76c925 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -270,10 +270,15 @@ public void testNonAcidInsert() throws Exception { /** * Test the query correctness and directory layout after ACID table conversion and MAJOR compaction + * 1. Insert a row to Non-ACID table + * 2. Convert Non-ACID to ACID table + * 3. Insert a row to ACID table + * 4. Perform Major compaction + * 5. Clean * @throws Exception */ @Test - public void testNonAcidToAcidConversionAndMajorCompaction() throws Exception { + public void testNonAcidToAcidConversion1() throws Exception { FileSystem fs = FileSystem.get(hiveConf); FileStatus[] status; @@ -394,6 +399,307 @@ public void testNonAcidToAcidConversionAndMajorCompaction() throws Exception { Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); } + /** + * Test the query correctness and directory layout after ACID table conversion and MAJOR compaction + * 1. Insert a row to Non-ACID table + * 2. Convert Non-ACID to ACID table + * 3. Update the existing row in ACID table + * 4. Perform Major compaction + * 5. Clean + * @throws Exception + */ + @Test + public void testNonAcidToAcidConversion2() throws Exception { + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] status; + + // 1. Insert a row to Non-ACID table + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 original bucket files in the location (000000_0 and 000001_0) + Assert.assertEquals(BUCKET_COUNT, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + List rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + int [][] resultData = new int[][] {{1, 2}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + int resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 2. Convert NONACIDORCTBL to ACID table + runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // Everything should be same as before + Assert.assertEquals(BUCKET_COUNT, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 2}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 3. Update the existing row in newly-converted ACID table + runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where a=1"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 original bucket files (000000_0 and 000001_0), plus a new delta directory. + // The delta directory should also have 2 bucket files (bucket_00000 and bucket_00001) + Assert.assertEquals(3, status.length); + boolean sawNewDelta = false; + for (int i = 0; i < status.length; i++) { + if (status[i].getPath().getName().matches("delta_.*")) { + sawNewDelta = true; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + } else { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + } + Assert.assertTrue(sawNewDelta); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 3}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 4. Perform a major compaction + runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'"); + runWorker(hiveConf); + // There should be 1 new directory: base_0000001. + // Original bucket files and delta directory should stay until Cleaner kicks in. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(4, status.length); + boolean sawNewBase = false; + for (int i = 0; i < status.length; i++) { + if (status[i].getPath().getName().matches("base_.*")) { + sawNewBase = true; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001")); + } + } + Assert.assertTrue(sawNewBase); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 3}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 5. Let Cleaner delete obsolete files/dirs + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // Before Cleaner, there should be 4 items: + // 2 original files, 1 delta directory and 1 base directory + Assert.assertEquals(4, status.length); + runCleaner(hiveConf); + // There should be only 1 directory left: base_0000001. + // Original bucket files and delta directory should have been cleaned up. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(1, status.length); + Assert.assertTrue(status[0].getPath().getName().matches("base_.*")); + FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001")); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 3}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + } + + /** + * Test the query correctness and directory layout after ACID table conversion and MAJOR compaction + * 1. Insert a row to Non-ACID table + * 2. Convert Non-ACID to ACID table + * 3. Perform Major compaction + * 4. Insert a new row to ACID table + * 5. Perform another Major compaction + * 6. Clean + * @throws Exception + */ + @Test + public void testNonAcidToAcidConversion3() throws Exception { + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] status; + + // 1. Insert a row to Non-ACID table + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 original bucket files in the location (000000_0 and 000001_0) + Assert.assertEquals(BUCKET_COUNT, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + List rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + int [][] resultData = new int[][] {{1, 2}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + int resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 2. Convert NONACIDORCTBL to ACID table + runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // Everything should be same as before + Assert.assertEquals(BUCKET_COUNT, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 2}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 3. Perform a major compaction + runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'"); + runWorker(hiveConf); + // There should be 1 new directory: base_-9223372036854775808 + // Original bucket files should stay until Cleaner kicks in. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(3, status.length); + boolean sawNewBase = false; + for (int i = 0; i < status.length; i++) { + if (status[i].getPath().getName().matches("base_.*")) { + Assert.assertEquals("base_-9223372036854775808", status[i].getPath().getName()); + sawNewBase = true; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); + } + } + Assert.assertTrue(sawNewBase); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 2}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 4. Update the existing row, and insert another row to newly-converted ACID table + runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where a=1"); + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(3,4)"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Arrays.sort(status); // make sure delta_0000001_0000001_0000 appears before delta_0000002_0000002_0000 + // There should be 2 original bucket files (000000_0 and 000001_0), a base directory, + // plus two new delta directories + Assert.assertEquals(5, status.length); + int numDelta = 0; + sawNewBase = false; + for (int i = 0; i < status.length; i++) { + if (status[i].getPath().getName().matches("delta_.*")) { + numDelta++; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Arrays.sort(buckets); + if (numDelta == 1) { + Assert.assertEquals("delta_0000001_0000001_0000", status[i].getPath().getName()); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); + } else if (numDelta == 2) { + Assert.assertEquals("delta_0000002_0000002_0000", status[i].getPath().getName()); + Assert.assertEquals(BUCKET_COUNT, buckets.length); + Assert.assertEquals("bucket_00000", buckets[0].getPath().getName()); + Assert.assertEquals("bucket_00001", buckets[1].getPath().getName()); + } + } else if (status[i].getPath().getName().matches("base_.*")) { + Assert.assertEquals("base_-9223372036854775808", status[i].getPath().getName()); + sawNewBase = true; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); + } else { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + } + Assert.assertEquals(2, numDelta); + Assert.assertTrue(sawNewBase); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 3}, {3, 4}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 2; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 5. Perform another major compaction + runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'"); + runWorker(hiveConf); + // There should be 1 new base directory: base_0000001 + // Original bucket files, delta directories and the previous base directory should stay until Cleaner kicks in. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Arrays.sort(status); + Assert.assertEquals(6, status.length); + int numBase = 0; + for (int i = 0; i < status.length; i++) { + if (status[i].getPath().getName().matches("base_.*")) { + numBase++; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Arrays.sort(buckets); + if (numBase == 1) { + Assert.assertEquals("base_-9223372036854775808", status[i].getPath().getName()); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); + } else if (numBase == 2) { + // The new base dir now has two bucket files, since the delta dir has two bucket files + Assert.assertEquals("base_0000002", status[i].getPath().getName()); + Assert.assertEquals(BUCKET_COUNT, buckets.length); + Assert.assertEquals("bucket_00000", buckets[0].getPath().getName()); + Assert.assertEquals("bucket_00001", buckets[1].getPath().getName()); + } + } + } + Assert.assertEquals(2, numBase); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 3}, {3, 4}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 2; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 6. Let Cleaner delete obsolete files/dirs + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // Before Cleaner, there should be 6 items: + // 2 original files, 2 delta directories and 2 base directories + Assert.assertEquals(6, status.length); + runCleaner(hiveConf); + // There should be only 1 directory left: base_0000001. + // Original bucket files, delta directories and previous base directory should have been cleaned up. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(1, status.length); + Assert.assertEquals("base_0000002", status[0].getPath().getName()); + FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Arrays.sort(buckets); + Assert.assertEquals(BUCKET_COUNT, buckets.length); + Assert.assertEquals("bucket_00000", buckets[0].getPath().getName()); + Assert.assertEquals("bucket_00001", buckets[1].getPath().getName()); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 3}, {3, 4}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 2; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + } + @Test public void testValidTxnsBookkeeping() throws Exception { // 1. Run a query against a non-ACID table, and we shouldn't have txn logged in conf