diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 6d4c198..452cbef 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -69,6 +69,7 @@ public boolean accept(Path path) { } }; public static final String BUCKET_DIGITS = "%05d"; + public static final String LEGACY_FILE_BUCKET_DIGITS = "%06d"; public static final String DELTA_DIGITS = "%07d"; /** * 10K statements per tx. Probably overkill ... since that many delta files @@ -80,7 +81,7 @@ public boolean accept(Path path) { */ public static final int MAX_STATEMENTS_PER_TXN = 10000; public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$"); - public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{5}"); + public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{6}"); public static final PathFilter originalBucketFilter = new PathFilter() { @Override public boolean accept(Path path) { @@ -145,7 +146,7 @@ public static Path createFilename(Path directory, AcidOutputFormat.Options options) { String subdir; if (options.getOldStyle()) { - return new Path(directory, String.format(BUCKET_DIGITS, + return new Path(directory, String.format(LEGACY_FILE_BUCKET_DIGITS, options.getBucket()) + "_0"); } else if (options.isWritingBase()) { subdir = BASE_PREFIX + String.format(DELTA_DIGITS, @@ -490,6 +491,9 @@ public static Directory getAcidState(Path directory, final List original = new ArrayList(); // if we have a base, the original files are obsolete. if (bestBase != null) { + for (FileStatus fileStatus : original) { + obsolete.add(fileStatus); + } // remove the entries so we don't get confused later and think we should // use them. original.clear(); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 5aa2500..1b2352c 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -18,13 +18,16 @@ package org.apache.hadoop.hive.ql; -import org.apache.commons.io.FileUtils; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; -import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.txn.compactor.Cleaner; import org.apache.hadoop.hive.ql.txn.compactor.Worker; import org.junit.After; import org.junit.Assert; @@ -242,6 +245,135 @@ public void testNonAcidInsert() throws Exception { runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(2,3)"); List rs1 = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); } + + /** + * Test the query correctness and directory layout after ACID table conversion and MAJOR compaction + * @throws Exception + */ + @Test + public void testNonAcidToAcidConversionAndMajorCompaction() throws Exception { + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] status; + + // 1. Insert a row to Non-ACID table + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 original bucket files in the location (000000_0 and 000001_0) + Assert.assertEquals(BUCKET_COUNT, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + List rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + int [][] resultData = new int[][] {{1, 2}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + int resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 2. Convert NONACIDORCTBL to ACID table + runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // Everything should be same as before + Assert.assertEquals(BUCKET_COUNT, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 2}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 3. Insert another row to newly-converted ACID table + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(3,4)"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 original bucket files (000000_0 and 000001_0), plus a new delta directory. + // The delta directory should also have 2 bucket files (bucket_00000 and bucket_00001) + Assert.assertEquals(3, status.length); + boolean sawNewDelta = false; + for (int i = 0; i < status.length; i++) { + if (status[i].getPath().getName().matches("delta_.*")) { + sawNewDelta = true; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(BUCKET_COUNT, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + } else { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + } + Assert.assertTrue(sawNewDelta); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 2}, {3, 4}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 2; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 4. Perform a major compaction + runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'"); + Worker w = new Worker(); + w.setThreadId((int) w.getId()); + w.setHiveConf(hiveConf); + AtomicBoolean stop = new AtomicBoolean(); + AtomicBoolean looped = new AtomicBoolean(); + stop.set(true); + w.init(stop, looped); + w.run(); + // There should be 1 new directory: base_xxxxxxx. + // Original bucket files and delta directory should stay until Cleaner kicks in. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(4, status.length); + boolean sawNewBase = false; + for (int i = 0; i < status.length; i++) { + if (status[i].getPath().getName().matches("base_.*")) { + sawNewBase = true; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(BUCKET_COUNT, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + } + } + Assert.assertTrue(sawNewBase); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 2}, {3, 4}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 2; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 5. Let Cleaner delete obsolete files/dirs + Cleaner c = new Cleaner(); + c.setThreadId((int) c.getId()); + c.setHiveConf(hiveConf); + stop = new AtomicBoolean(); + looped = new AtomicBoolean(); + stop.set(true); + c.init(stop, looped); + c.run(); + // There should be only 1 directory left: base_xxxxxxx. + // Original bucket files and delta directory should have been cleaned up. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(1, status.length); + Assert.assertTrue(status[0].getPath().getName().matches("base_.*")); + FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(BUCKET_COUNT, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 2}, {3, 4}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 2; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + } + @Test public void testUpdateMixedCase() throws Exception { int[][] tableData = {{1,2},{3,3},{5,3}}; diff --git ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java index f8ded12..1b598f7 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java @@ -42,10 +42,10 @@ public void testCreateFilename() throws Exception { Configuration conf = new Configuration(); AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) .setOldStyle(true).bucket(1); - assertEquals("/tmp/00001_0", + assertEquals("/tmp/000001_0", AcidUtils.createFilename(p, options).toString()); options.bucket(123); - assertEquals("/tmp/00123_0", + assertEquals("/tmp/000123_0", AcidUtils.createFilename(p, options).toString()); options.bucket(23) .minimumTransactionId(100) @@ -224,10 +224,9 @@ public void testObsoleteOriginals() throws Exception { Path part = new MockPath(fs, "/tbl/part1"); AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:")); - // The two original buckets won't be in the obsolete list because we don't look at those - // until we have determined there is no base. + // Obsolete list should include the two original bucket files, and the old base dir List obsolete = dir.getObsolete(); - assertEquals(1, obsolete.size()); + assertEquals(3, obsolete.size()); assertEquals("mock:/tbl/part1/base_5", obsolete.get(0).getPath().toString()); assertEquals("mock:/tbl/part1/base_10", dir.getBaseDirectory().toString()); } diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java index 39c0571..5545574 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java @@ -293,7 +293,7 @@ private void addFile(Table t, Partition p, long minTxn, long maxTxn, if (bucket == 0 && !allBucketsPresent) continue; // skip one Path partFile = null; if (type == FileType.LEGACY) { - partFile = new Path(location, String.format(AcidUtils.BUCKET_DIGITS, bucket) + "_0"); + partFile = new Path(location, String.format(AcidUtils.LEGACY_FILE_BUCKET_DIGITS, bucket) + "_0"); } else { Path dir = new Path(location, filename); fs.mkdirs(dir); @@ -337,7 +337,7 @@ private void addFile(Table t, Partition p, long minTxn, long maxTxn, FileSystem fs = p.getFileSystem(conf); if (fs.exists(p)) filesToRead.add(p); } else { - filesToRead.add(new Path(baseDirectory, "00000_0")); + filesToRead.add(new Path(baseDirectory, "000000_0")); } }