diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 6d4c198..99c4435 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -69,6 +69,7 @@ public boolean accept(Path path) { } }; public static final String BUCKET_DIGITS = "%05d"; + public static final String LEGACY_FILE_BUCKET_DIGITS = "%06d"; public static final String DELTA_DIGITS = "%07d"; /** * 10K statements per tx. Probably overkill ... since that many delta files @@ -80,7 +81,7 @@ public boolean accept(Path path) { */ public static final int MAX_STATEMENTS_PER_TXN = 10000; public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$"); - public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{5}"); + public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{6}"); public static final PathFilter originalBucketFilter = new PathFilter() { @Override public boolean accept(Path path) { @@ -145,7 +146,7 @@ public static Path createFilename(Path directory, AcidOutputFormat.Options options) { String subdir; if (options.getOldStyle()) { - return new Path(directory, String.format(BUCKET_DIGITS, + return new Path(directory, String.format(LEGACY_FILE_BUCKET_DIGITS, options.getBucket()) + "_0"); } else if (options.isWritingBase()) { subdir = BASE_PREFIX + String.format(DELTA_DIGITS, @@ -453,6 +454,7 @@ public static Directory getAcidState(Path directory, final List deltas = new ArrayList(); List working = new ArrayList(); List originalDirectories = new ArrayList(); + final List original = new ArrayList(); final List obsolete = new ArrayList(); List children = SHIMS.listLocatedStatus(fs, directory, hiddenFileFilter); @@ -478,21 +480,27 @@ public static Directory getAcidState(Path directory, ValidTxnList.RangeResponse.NONE) { working.add(delta); } - } else { + } else if (child.isDir()) { // This is just the directory. We need to recurse and find the actual files. But don't // do this until we have determined there is no base. This saves time. Plus, // it is possible that the cleaner is running and removing these original files, // in which case recursing through them could cause us to get an error. originalDirectories.add(child); + } else { + original.add(child); } } - final List original = new ArrayList(); // if we have a base, the original files are obsolete. if (bestBase != null) { + // Add original files to obsolete list if any + obsolete.addAll(original); + // Add original direcotries to obsolete list if any + obsolete.addAll(originalDirectories); // remove the entries so we don't get confused later and think we should // use them. original.clear(); + originalDirectories.clear(); } else { // Okay, we're going to need these originals. Recurse through them and figure out what we // really need. diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 5aa2500..5a01695 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -18,13 +18,16 @@ package org.apache.hadoop.hive.ql; -import org.apache.commons.io.FileUtils; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; -import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.txn.compactor.Cleaner; import org.apache.hadoop.hive.ql.txn.compactor.Worker; import org.junit.After; import org.junit.Assert; @@ -242,6 +245,147 @@ public void testNonAcidInsert() throws Exception { runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(2,3)"); List rs1 = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); } + + /** + * Test the query correctness and directory layout after ACID table conversion and MAJOR compaction + * @throws Exception + */ + @Test + public void testNonAcidToAcidConversionAndMajorCompaction() throws Exception { + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] status; + + // 1. Insert a row to Non-ACID table + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 original bucket files in the location (000000_0 and 000001_0) + Assert.assertEquals(BUCKET_COUNT, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + List rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + int [][] resultData = new int[][] {{1, 2}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + int resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 2. Convert NONACIDORCTBL to ACID table + runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // Everything should be same as before + Assert.assertEquals(BUCKET_COUNT, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 2}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 3. Insert another row to newly-converted ACID table + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(3,4)"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 original bucket files (000000_0 and 000001_0), plus a new delta directory. + // The delta directory should also have 2 bucket files (bucket_00000 and bucket_00001) + Assert.assertEquals(3, status.length); + boolean sawNewDelta = false; + for (int i = 0; i < status.length; i++) { + if (status[i].getPath().getName().matches("delta_.*")) { + sawNewDelta = true; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(BUCKET_COUNT, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + } else { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + } + Assert.assertTrue(sawNewDelta); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 2}, {3, 4}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 2; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 4. Perform a major compaction + runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'"); + Worker w = new Worker(); + w.setThreadId((int) w.getId()); + w.setHiveConf(hiveConf); + AtomicBoolean stop = new AtomicBoolean(); + AtomicBoolean looped = new AtomicBoolean(); + stop.set(true); + w.init(stop, looped); + w.run(); + // There should be 1 new directory: base_xxxxxxx. + // Original bucket files and delta directory should stay until Cleaner kicks in. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(4, status.length); + boolean sawNewBase = false; + for (int i = 0; i < status.length; i++) { + if (status[i].getPath().getName().matches("base_.*")) { + sawNewBase = true; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(BUCKET_COUNT, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + } + } + Assert.assertTrue(sawNewBase); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 2}, {3, 4}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 2; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 5. Let Cleaner delete obsolete files/dirs + // Note, here we create a fake directory along with fake files as original directories/files + String fakeFile0 = TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase() + + "/subdir/000000_0"; + String fakeFile1 = TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase() + + "/subdir/000000_1"; + fs.create(new Path(fakeFile0)); + fs.create(new Path(fakeFile1)); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // Before Cleaner, there should be 5 items: + // 2 original files, 1 original directory, 1 base directory and 1 delta directory + Assert.assertEquals(5, status.length); + Cleaner c = new Cleaner(); + c.setThreadId((int) c.getId()); + c.setHiveConf(hiveConf); + stop = new AtomicBoolean(); + looped = new AtomicBoolean(); + stop.set(true); + c.init(stop, looped); + c.run(); + // There should be only 1 directory left: base_xxxxxxx. + // Original bucket files and delta directory should have been cleaned up. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(1, status.length); + Assert.assertTrue(status[0].getPath().getName().matches("base_.*")); + FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(BUCKET_COUNT, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 2}, {3, 4}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 2; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + } + @Test public void testUpdateMixedCase() throws Exception { int[][] tableData = {{1,2},{3,3},{5,3}}; diff --git ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java index f8ded12..1b598f7 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java @@ -42,10 +42,10 @@ public void testCreateFilename() throws Exception { Configuration conf = new Configuration(); AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) .setOldStyle(true).bucket(1); - assertEquals("/tmp/00001_0", + assertEquals("/tmp/000001_0", AcidUtils.createFilename(p, options).toString()); options.bucket(123); - assertEquals("/tmp/00123_0", + assertEquals("/tmp/000123_0", AcidUtils.createFilename(p, options).toString()); options.bucket(23) .minimumTransactionId(100) @@ -224,10 +224,9 @@ public void testObsoleteOriginals() throws Exception { Path part = new MockPath(fs, "/tbl/part1"); AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:")); - // The two original buckets won't be in the obsolete list because we don't look at those - // until we have determined there is no base. + // Obsolete list should include the two original bucket files, and the old base dir List obsolete = dir.getObsolete(); - assertEquals(1, obsolete.size()); + assertEquals(3, obsolete.size()); assertEquals("mock:/tbl/part1/base_5", obsolete.get(0).getPath().toString()); assertEquals("mock:/tbl/part1/base_10", dir.getBaseDirectory().toString()); } diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java index 39c0571..5545574 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java @@ -293,7 +293,7 @@ private void addFile(Table t, Partition p, long minTxn, long maxTxn, if (bucket == 0 && !allBucketsPresent) continue; // skip one Path partFile = null; if (type == FileType.LEGACY) { - partFile = new Path(location, String.format(AcidUtils.BUCKET_DIGITS, bucket) + "_0"); + partFile = new Path(location, String.format(AcidUtils.LEGACY_FILE_BUCKET_DIGITS, bucket) + "_0"); } else { Path dir = new Path(location, filename); fs.mkdirs(dir); @@ -337,7 +337,7 @@ private void addFile(Table t, Partition p, long minTxn, long maxTxn, FileSystem fs = p.getFileSystem(conf); if (fs.exists(p)) filesToRead.add(p); } else { - filesToRead.add(new Path(baseDirectory, "00000_0")); + filesToRead.add(new Path(baseDirectory, "000000_0")); } }