diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 3a5a325..d070477 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -250,10 +250,9 @@ void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, } } - if (parsedDeltas.size() == 0) { - // Seriously, no deltas? Can't compact that. - LOG.error( "No delta files found to compact in " + sd.getLocation()); - //couldn't someone want to run a Major compaction to convert old table to ACID? + if (parsedDeltas.size() == 0 && dir.getOriginalFiles() == null) { + // Skip compaction if there's no delta files AND there's no original files + LOG.error("No delta files or original files found to compact in " + sd.getLocation()); return; } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index a1bd0fb..3bad794 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -270,10 +270,15 @@ public void testNonAcidInsert() throws Exception { /** * Test the query correctness and directory layout after ACID table conversion and MAJOR compaction + * 1. Insert a row to Non-ACID table + * 2. Convert Non-ACID to ACID table + * 3. Insert a row to ACID table + * 4. Perform Major compaction + * 5. Clean * @throws Exception */ @Test - public void testNonAcidToAcidConversionAndMajorCompaction() throws Exception { + public void testNonAcidToAcidConversion1() throws Exception { FileSystem fs = FileSystem.get(hiveConf); FileStatus[] status; @@ -394,6 +399,295 @@ public void testNonAcidToAcidConversionAndMajorCompaction() throws Exception { Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); } + /** + * Test the query correctness and directory layout after ACID table conversion and MAJOR compaction + * 1. Insert a row to Non-ACID table + * 2. Convert Non-ACID to ACID table + * 3. Update the existing row in ACID table + * 4. Perform Major compaction + * 5. Clean + * @throws Exception + */ + @Test + public void testNonAcidToAcidConversion2() throws Exception { + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] status; + + // 1. Insert a row to Non-ACID table + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 original bucket files in the location (000000_0 and 000001_0) + Assert.assertEquals(BUCKET_COUNT, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + List rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + int [][] resultData = new int[][] {{1, 2}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + int resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 2. Convert NONACIDORCTBL to ACID table + runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // Everything should be same as before + Assert.assertEquals(BUCKET_COUNT, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 2}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 3. Update the existing row in newly-converted ACID table + runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where a=1"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 original bucket files (000000_0 and 000001_0), plus a new delta directory. + // The delta directory should also have 2 bucket files (bucket_00000 and bucket_00001) + Assert.assertEquals(3, status.length); + boolean sawNewDelta = false; + for (int i = 0; i < status.length; i++) { + if (status[i].getPath().getName().matches("delta_.*")) { + sawNewDelta = true; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + } else { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + } + Assert.assertTrue(sawNewDelta); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 3}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 4. Perform a major compaction + runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'"); + runWorker(hiveConf); + // There should be 1 new directory: base_0000001. + // Original bucket files and delta directory should stay until Cleaner kicks in. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(4, status.length); + boolean sawNewBase = false; + for (int i = 0; i < status.length; i++) { + if (status[i].getPath().getName().matches("base_.*")) { + sawNewBase = true; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001")); + } + } + Assert.assertTrue(sawNewBase); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 3}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 5. Let Cleaner delete obsolete files/dirs + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // Before Cleaner, there should be 4 items: + // 2 original files, 1 delta directory and 1 base directory + Assert.assertEquals(4, status.length); + runCleaner(hiveConf); + // There should be only 1 directory left: base_0000001. + // Original bucket files and delta directory should have been cleaned up. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(1, status.length); + Assert.assertTrue(status[0].getPath().getName().matches("base_.*")); + FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001")); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 3}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + } + + /** + * Test the query correctness and directory layout after ACID table conversion and MAJOR compaction + * 1. Insert a row to Non-ACID table + * 2. Convert Non-ACID to ACID table + * 3. Perform Major compaction + * 4. Insert a new row to ACID table + * 5. Perform another Major compaction + * 6. Clean + * @throws Exception + */ + @Test + public void testNonAcidToAcidConversion3() throws Exception { + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] status; + + // 1. Insert a row to Non-ACID table + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 original bucket files in the location (000000_0 and 000001_0) + Assert.assertEquals(BUCKET_COUNT, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + List rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + int [][] resultData = new int[][] {{1, 2}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + int resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 2. Convert NONACIDORCTBL to ACID table + runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // Everything should be same as before + Assert.assertEquals(BUCKET_COUNT, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 2}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 3. Perform a major compaction + runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'"); + runWorker(hiveConf); + // There should be 1 new directory: base_-9223372036854775808 + // Original bucket files should stay until Cleaner kicks in. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(3, status.length); + boolean sawNewBase = false; + for (int i = 0; i < status.length; i++) { + if (status[i].getPath().getName().matches("base_.*")) { + Assert.assertTrue(status[i].getPath().getName().matches("base_-9223372036854775808")); + sawNewBase = true; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001")); + } + } + Assert.assertTrue(sawNewBase); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 2}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 4. Insert another row to newly-converted ACID table + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(3,4)"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 original bucket files (000000_0 and 000001_0), a base directory, + // plus a new delta directory which has 1 bucket file bucket_00001 + Assert.assertEquals(4, status.length); + boolean sawNewDelta = false; + sawNewBase = false; + for (int i = 0; i < status.length; i++) { + if (status[i].getPath().getName().matches("delta_.*")) { + Assert.assertTrue(status[i].getPath().getName().matches("delta_0000001_0000001_0000")); + sawNewDelta = true; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(BUCKET_COUNT, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00000")); + Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_00001")); + } else if (status[i].getPath().getName().matches("base_.*")) { + Assert.assertTrue(status[i].getPath().getName().matches("base_-9223372036854775808")); + sawNewBase = true; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001")); + } else { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + } + Assert.assertTrue(sawNewDelta); + Assert.assertTrue(sawNewBase); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 2}, {3, 4}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 2; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 5. Perform another major compaction + runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'"); + runWorker(hiveConf); + // There should be 1 new base directory: base_0000001 + // Original bucket files, delta directory and the previous base directory should stay until Cleaner kicks in. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(5, status.length); + int numBase = 0; + for (int i = 0; i < status.length; i++) { + if (status[i].getPath().getName().matches("base_.*")) { + numBase++; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + if (numBase == 1) { + Assert.assertTrue(status[i].getPath().getName().matches("base_-9223372036854775808")); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001")); + } else if (numBase == 2) { + // The new base dir now has two bucket files, since the delta dir has two bucket files + Assert.assertTrue(status[i].getPath().getName().matches("base_0000001")); + Assert.assertEquals(BUCKET_COUNT, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00000")); + Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_00001")); + } + } + } + Assert.assertEquals(2, numBase); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 2}, {3, 4}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 2; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 6. Let Cleaner delete obsolete files/dirs + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // Before Cleaner, there should be 5 items: + // 2 original files, 1 delta directory and 2 base directory + Assert.assertEquals(5, status.length); + runCleaner(hiveConf); + // There should be only 1 directory left: base_0000001. + // Original bucket files, delta directory and previous base directory should have been cleaned up. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(1, status.length); + Assert.assertTrue(status[0].getPath().getName().matches("base_0000001")); + FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(BUCKET_COUNT, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00000")); + Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_00001")); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 2}, {3, 4}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 2; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + } + @Test public void testValidTxnsBookkeeping() throws Exception { // 1. Run a query against a non-ACID table, and we shouldn't have txn logged in conf