diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java new file mode 100644 index 0000000000..c2aa73b5f1 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java @@ -0,0 +1,408 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hive.common.ValidWriteIdList; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.CompactionRequest; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.ql.IDriver; +import org.apache.hadoop.hive.ql.io.AcidInputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcStruct; +import org.apache.hadoop.hive.ql.processors.CommandProcessorException; +import org.apache.hive.hcatalog.streaming.DelimitedInputWriter; +import org.apache.hive.hcatalog.streaming.TransactionBatch; +import org.apache.hive.streaming.HiveStreamingConnection; +import org.apache.hive.streaming.StreamingConnection; +import org.apache.hive.streaming.StreamingException; +import org.apache.hive.streaming.StrictDelimitedInputWriter; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + + +/** + * Collection of helper methods for compaction tests like TestCompactor. + */ +class CompactorTestUtil { + + private static final Logger LOG = LoggerFactory.getLogger(CompactorTestUtil.class); + + /** + * Get a list of base/delta directory names. + * @param fs the resolved file system + * @param filter filter down the contents to delta/delete delta directories. + * If this is null, no filter is applied + * @param table resolved table, where to search for delta directories + * @param partitionName the name of the partition, can be null, if table is not partitioned + * @return Collection of delta directory names, always non-null. + * @throws IOException if table location is unreachable. + */ + static List getBaseOrDeltaNames(FileSystem fs, PathFilter filter, Table table, String partitionName) + throws IOException { + Path path = partitionName == null ? new Path(table.getSd().getLocation()) : new Path(table.getSd().getLocation(), + partitionName); + FileStatus[] fileStatuses = filter != null ? fs.listStatus(path, filter) : fs.listStatus(path); + return Arrays.stream(fileStatuses).map(FileStatus::getPath).map(Path::getName).sorted() + .collect(Collectors.toList()); + } + + /** + * Get the bucket files under a delta directory. + * @param fs the resolved file system + * @param table resolved table, where to search for bucket files + * @param partitionName he name of the partition, can be null, if table is not partitioned + * @param deltaName the name of the delta directory, underneath the search begins + * @return Collection of bucket file names, always non-null. + * @throws IOException if the table or delta directory location is unreachable. + */ + static List getBucketFileNames(FileSystem fs, Table table, String partitionName, String deltaName) + throws IOException { + Path path = partitionName == null ? new Path(table.getSd().getLocation(), deltaName) : new Path( + new Path(table.getSd().getLocation()), new Path(partitionName, deltaName)); + return Arrays.stream(fs.listStatus(path)).map(FileStatus::getPath).map(Path::getName).sorted() + .collect(Collectors.toList()); + } + + /** + * Trigger a compaction run. + * @param conf hive configuration + * @param dbName database name + * @param tblName table name + * @param compactionType major/minor + * @param isQueryBased true, if query based compaction should be run + * @param partNames partition names + * @throws Exception compaction cannot be started. + */ + static void runCompaction(HiveConf conf, String dbName, String tblName, CompactionType compactionType, + boolean isQueryBased, String... partNames) throws Exception { + HiveConf hiveConf = new HiveConf(conf); + hiveConf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, isQueryBased); + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); + Worker t = new Worker(); + t.setThreadId((int) t.getId()); + t.setConf(hiveConf); + t.init(new AtomicBoolean(true), new AtomicBoolean()); + if (partNames.length == 0) { + txnHandler.compact(new CompactionRequest(dbName, tblName, compactionType)); + t.run(); + } else { + for (String partName : partNames) { + CompactionRequest cr = new CompactionRequest(dbName, tblName, compactionType); + cr.setPartitionname(partName); + txnHandler.compact(cr); + t.run(); + } + } + } + + /** + * Trigger a compaction cleaner. + * @param hConf hive configuration + * @throws Exception if cleaner cannot be started. + */ + static void runCleaner(HiveConf hConf) throws Exception { + HiveConf hiveConf = new HiveConf(hConf); + AtomicBoolean stop = new AtomicBoolean(true); + Cleaner t = new Cleaner(); + t.setThreadId((int) t.getId()); + t.setConf(hiveConf); + AtomicBoolean looped = new AtomicBoolean(); + t.init(stop, looped); + t.run(); + } + + /** + * Trigger compaction initiator. + * @param hConf hive configuration + * @param isQueryBased run compaction as query based + * @throws Exception if initiator cannot be started. + */ + static void runInitiator(HiveConf hConf, boolean isQueryBased) throws Exception { + HiveConf hiveConf = new HiveConf(hConf); + hiveConf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, isQueryBased); + AtomicBoolean stop = new AtomicBoolean(true); + Initiator t = new Initiator(); + t.setThreadId((int) t.getId()); + t.setConf(hiveConf); + AtomicBoolean looped = new AtomicBoolean(); + t.init(stop, looped); + t.run(); + } + + /** + * Trigger compaction worker. + * @param hConf hive configuration + * @param isQueryBased run compaction as query based + * @throws Exception if worker cannot be started. + */ + static void runWorker(HiveConf hConf, boolean isQueryBased) throws Exception { + HiveConf hiveConf = new HiveConf(hConf); + hiveConf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, isQueryBased); + AtomicBoolean stop = new AtomicBoolean(true); + Worker t = new Worker(); + t.setThreadId((int) t.getId()); + t.setConf(hiveConf); + AtomicBoolean looped = new AtomicBoolean(); + t.init(stop, looped); + t.run(); + } + + /** + * Execute Hive CLI statement. + * @param cmd arbitrary statement to execute + * @param driver execution driver + * @throws Exception failed to execute statement + */ + void executeStatementOnDriver(String cmd, IDriver driver) throws Exception { + LOG.debug("Executing: " + cmd); + try { + driver.run(cmd); + } catch (CommandProcessorException e) { + throw new IOException("Failed to execute \"" + cmd + "\". Driver returned: " + e); + } + } + + /** + * Execute Hive CLI statement and get back result. + * @param cmd arbitrary statement to execute + * @param driver execution driver + * @return the result of the query + * @throws Exception failed to execute statement + */ + static List executeStatementOnDriverAndReturnResults(String cmd, IDriver driver) throws Exception { + LOG.debug("Executing: " + cmd); + try { + driver.run(cmd); + } catch (CommandProcessorException e) { + throw new IOException("Failed to execute \"" + cmd + "\". Driver returned: " + e); + } + List rs = new ArrayList<>(); + driver.getResults(rs); + return rs; + } + + /** + * Open a hive streaming connection, write some content in two transactions. + * @param conf hive configuration + * @param dbName name of the database + * @param tblName name of the table + * @param abort abort all transactions in connection + * @param keepOpen keep the streaming connection open after the transaction has been committed + * @return open streaming connection, can be null + * @throws StreamingException streaming connection cannot be established + */ + static StreamingConnection writeBatch(HiveConf conf, String dbName, String tblName, boolean abort, boolean keepOpen) + throws StreamingException { + StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build(); + StreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable(tblName) + .withAgentInfo("UT_" + Thread.currentThread().getName()).withHiveConf(conf).withRecordWriter(writer) + .withTransactionBatchSize(2).connect(); + connection.beginTransaction(); + if (abort) { + connection.abortTransaction(); + } else { + connection.write("50,Kiev".getBytes()); + connection.write("51,St. Petersburg".getBytes()); + connection.write("44,Boston".getBytes()); + connection.commitTransaction(); + } + + if (!keepOpen) { + connection.beginTransaction(); + if (abort) { + connection.abortTransaction(); + } else { + connection.write("52,Tel Aviv".getBytes()); + connection.write("53,Atlantis".getBytes()); + connection.write("53,Boston".getBytes()); + connection.commitTransaction(); + } + connection.close(); + return null; + } + return connection; + } + + static void writeBatch(org.apache.hive.hcatalog.streaming.StreamingConnection connection, + DelimitedInputWriter writer, + boolean closeEarly) throws InterruptedException, org.apache.hive.hcatalog.streaming.StreamingException { + TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); + txnBatch.beginNextTransaction(); + txnBatch.write("50,Kiev".getBytes()); + txnBatch.write("51,St. Petersburg".getBytes()); + txnBatch.write("44,Boston".getBytes()); + txnBatch.commit(); + if (!closeEarly) { + txnBatch.beginNextTransaction(); + txnBatch.write("52,Tel Aviv".getBytes()); + txnBatch.write("53,Atlantis".getBytes()); + txnBatch.write("53,Boston".getBytes()); + txnBatch.commit(); + txnBatch.close(); + } + } + + static void checkExpectedTxnsPresent(Path base, Path[] deltas, String columnNamesProperty, String columnTypesProperty, + int bucket, long min, long max, List invaliWriteIDs, int numBuckets) throws IOException { + ValidWriteIdList writeIdList = new ValidWriteIdList() { + @Override + public String getTableName() { + return "AcidTable"; + } + + @Override + public boolean isWriteIdValid(long writeid) { + return true; + } + + @Override + public RangeResponse isWriteIdRangeValid(long minWriteId, long maxWriteId) { + return RangeResponse.ALL; + } + + @Override + public String writeToString() { + return ""; + } + + @Override + public void readFromString(String src) { + + } + + @Override + public Long getMinOpenWriteId() { + return null; + } + + @Override + public long getHighWatermark() { + return Long.MAX_VALUE; + } + + @Override + public long[] getInvalidWriteIds() { + return new long[0]; + } + + @Override + public boolean isValidBase(long writeid) { + return true; + } + + @Override + public boolean isWriteIdAborted(long writeid) { + return true; + } + + @Override + public RangeResponse isWriteIdRangeAborted(long minWriteId, long maxWriteId) { + return RangeResponse.ALL; + } + }; + + OrcInputFormat aif = new OrcInputFormat(); + + Configuration conf = new Configuration(); + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, columnNamesProperty); + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, columnTypesProperty); + conf.set(hive_metastoreConstants.BUCKET_COUNT, Integer.toString(numBuckets)); + conf.setBoolean("orc.schema.evolution.case.sensitive", false); + HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true); + AcidInputFormat.RawReader reader = + aif.getRawReader(conf, true, bucket, writeIdList, base, deltas); + RecordIdentifier identifier = reader.createKey(); + OrcStruct value = reader.createValue(); + long currentTxn = min; + boolean seenCurrentTxn = false; + while (reader.next(identifier, value)) { + if (!seenCurrentTxn) { + Assert.assertEquals(currentTxn, identifier.getWriteId()); + seenCurrentTxn = true; + } + if (currentTxn != identifier.getWriteId()) { + if (invaliWriteIDs != null) { + Assert.assertFalse(invaliWriteIDs.contains(identifier.getWriteId())); + } + currentTxn++; + } + } + Assert.assertEquals(max, currentTxn); + } + + /** + * Turn a list of file statuses into string. + * @param stat list of files + * @return string value + */ + static String printFileStatus(FileStatus[] stat) { + StringBuilder sb = new StringBuilder("stat{"); + if (stat == null) { + return sb.toString(); + } + for (FileStatus f : stat) { + sb.append(f.getPath()).append(","); + } + sb.setCharAt(sb.length() - 1, '}'); + return sb.toString(); + } + + static void runStreamingAPI(HiveConf conf, String dbName, String tblName, + List connectionOption) throws StreamingException { + List connections = new ArrayList<>(); + try { + for (StreamingConnectionOption option : connectionOption) { + connections.add(writeBatch(conf, dbName, tblName, option.abort, option.keepOpen)); + } + } finally { + connections.stream().filter(Objects::nonNull).forEach(StreamingConnection::close); + } + } + + static class StreamingConnectionOption { + private final boolean abort; + private final boolean keepOpen; + + StreamingConnectionOption(boolean abort, boolean keepOpen) { + this.abort = abort; + this.keepOpen = keepOpen; + } + } + +} diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index 7ae33fadf7..f1e2bc968e 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -22,7 +22,6 @@ import static org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import java.io.File; import java.io.FileWriter; @@ -39,14 +38,12 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.cli.CliSessionState; -import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -63,21 +60,15 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.DriverFactory; import org.apache.hadoop.hive.ql.IDriver; -import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveInputFormat; -import org.apache.hadoop.hive.ql.io.IOConstants; -import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.orc.OrcFile; -import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; -import org.apache.hadoop.hive.ql.io.orc.OrcStruct; import org.apache.hadoop.hive.ql.io.orc.Reader; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.common.util.Retry; @@ -102,6 +93,10 @@ import com.google.common.collect.Lists; +/** + * Compaction related unit tests. + */ +@SuppressWarnings("deprecation") public class TestCompactor { private static final AtomicInteger salt = new AtomicInteger(new Random().nextInt()); private static final Logger LOG = LoggerFactory.getLogger(TestCompactor.class); @@ -623,11 +618,11 @@ public void minorCompactWhileStreaming() throws Exception { try { // Write a couple of batches for (int i = 0; i < 2; i++) { - writeBatch(dbName, tblName, false); + CompactorTestUtil.writeBatch(conf, dbName, tblName, false, false); } // Start a third batch, but don't close it. - connection = writeBatch(dbName, tblName, true); + connection = CompactorTestUtil.writeBatch(conf, dbName, tblName, false, true); // Now, compact TxnStore txnHandler = TxnUtils.getTxnStore(conf); @@ -652,10 +647,12 @@ public void minorCompactWhileStreaming() throws Exception { String[] expected = new String[]{"delta_0000001_0000002", "delta_0000001_0000004_v0000009", "delta_0000003_0000004", "delta_0000005_0000006"}; if (!Arrays.deepEquals(expected, names)) { - Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names) + ",stat=" + toString(stat)); + Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names) + ",stat=" + + CompactorTestUtil.printFileStatus(stat)); } - checkExpectedTxnsPresent(null, new Path[]{resultFile}, columnNamesProperty, columnTypesProperty, - 0, 1L, 4L, 1); + CompactorTestUtil + .checkExpectedTxnsPresent(null, new Path[] {resultFile}, columnNamesProperty, columnTypesProperty, 0, 1L, + 4L, null, 1); } finally { if (connection != null) { @@ -679,12 +676,12 @@ public void majorCompactWhileStreaming() throws Exception { try { // Write a couple of batches for (int i = 0; i < 2; i++) { - writeBatch(dbName, tblName, false); + CompactorTestUtil.writeBatch(conf, dbName, tblName, false, false); } // Start a third batch, but don't close it. this delta will be ignored by compaction since // it has an open txn in it - connection = writeBatch(dbName, tblName, true); + connection = CompactorTestUtil.writeBatch(conf, dbName, tblName, false, true); runMajorCompaction(dbName, tblName); @@ -699,7 +696,9 @@ public void majorCompactWhileStreaming() throws Exception { } String name = stat[0].getPath().getName(); Assert.assertEquals("base_0000004_v0000009", name); - checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L, 1); + CompactorTestUtil + .checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L, null, + 1); } finally { if (connection != null) { connection.close(); @@ -751,7 +750,9 @@ private void minorCompactAfterAbort(boolean newStreamingAPI) throws Exception { if (!Arrays.deepEquals(expected, names)) { Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names)); } - checkExpectedTxnsPresent(null, new Path[]{resultDelta}, columnNamesProperty, columnTypesProperty, 0, 1L, 4L, 1); + CompactorTestUtil + .checkExpectedTxnsPresent(null, new Path[] {resultDelta}, columnNamesProperty, columnTypesProperty, 0, 1L, 4L, + Lists.newArrayList(5, 6), 1); } @Test @@ -781,9 +782,6 @@ private void majorCompactAfterAbort(boolean newStreamingAPI) throws Exception { FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.baseFileFilter); - if (1 != stat.length) { - Assert.fail("majorCompactAfterAbort FileStatus[] stat " + Arrays.toString(stat)); - } if (1 != stat.length) { Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat)); } @@ -791,7 +789,9 @@ private void majorCompactAfterAbort(boolean newStreamingAPI) throws Exception { if (!name.equals("base_0000004_v0000009")) { Assert.fail("majorCompactAfterAbort name " + name + " not equals to base_0000004"); } - checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L, 1); + CompactorTestUtil + .checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L, + Lists.newArrayList(5, 6), 1); } @@ -1130,11 +1130,11 @@ private void majorCompactWhileStreamingForSplitUpdate(boolean newStreamingAPI) t if (newStreamingAPI) { // Write a couple of batches for (int i = 0; i < 2; i++) { - writeBatch(dbName, tblName, false); + CompactorTestUtil.writeBatch(conf, dbName, tblName, false, false); } // Start a third batch, but don't close it. - connection1 = writeBatch(dbName, tblName, true); + connection1 = CompactorTestUtil.writeBatch(conf, dbName, tblName, false, true); } else { HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null); DelimitedInputWriter writer = new DelimitedInputWriter(new String[]{"a", "b"}, ",", endPt); @@ -1142,11 +1142,11 @@ private void majorCompactWhileStreamingForSplitUpdate(boolean newStreamingAPI) t .newConnection(false, "UT_" + Thread.currentThread().getName()); // Write a couple of batches for (int i = 0; i < 2; i++) { - writeBatch(connection2, writer, false); + CompactorTestUtil.writeBatch(connection2, writer, false); } // Start a third batch, but don't close it. - writeBatch(connection2, writer, true); + CompactorTestUtil.writeBatch(connection2, writer, true); } runMajorCompaction(dbName, tblName); @@ -1161,7 +1161,9 @@ private void majorCompactWhileStreamingForSplitUpdate(boolean newStreamingAPI) t } String name = stat[0].getPath().getName(); Assert.assertEquals("base_0000004_v0000009", name); - checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 1, 1L, 4L, 2); + CompactorTestUtil + .checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 1, 1L, 4L, null, + 2); if (connection1 != null) { connection1.close(); } @@ -1220,8 +1222,9 @@ public void testMinorCompactionForSplitUpdateWithInsertsAndDeletes() throws Exce if (!Arrays.deepEquals(expectedDeltas, deltas)) { Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas)); } - checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, - 0, 1L, 2L, 1); + CompactorTestUtil + .checkExpectedTxnsPresent(null, new Path[] {minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, + 1L, 2L, null, 1); // Verify that we have got correct set of delete_deltas. FileStatus[] deleteDeltaStat = @@ -1239,8 +1242,8 @@ public void testMinorCompactionForSplitUpdateWithInsertsAndDeletes() throws Exce if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) { Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas)); } - checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, - 0, 2L, 2L, 1); + CompactorTestUtil.checkExpectedTxnsPresent(null, new Path[] {minorCompactedDeleteDelta}, columnNamesProperty, + columnTypesProperty, 0, 2L, 2L, null, 1); } @Test @@ -1292,8 +1295,9 @@ public void testMinorCompactionForSplitUpdateWithOnlyInserts() throws Exception if (!Arrays.deepEquals(expectedDeltas, deltas)) { Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas)); } - checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, - 0, 1L, 2L, 1); + CompactorTestUtil + .checkExpectedTxnsPresent(null, new Path[] {minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, + 1L, 2L, null, 1); //Assert that we have no delete deltas if there are no input delete events. FileStatus[] deleteDeltaStat = @@ -1303,7 +1307,7 @@ public void testMinorCompactionForSplitUpdateWithOnlyInserts() throws Exception @Test public void minorCompactWhileStreamingWithSplitUpdate() throws Exception { - minorCompactWhileStreamingWithSplitUpdate(true); + minorCompactWhileStreamingWithSplitUpdate(false); } @Test public void minorCompactWhileStreamingWithSplitUpdateNew() throws Exception { @@ -1326,11 +1330,11 @@ private void minorCompactWhileStreamingWithSplitUpdate(boolean newStreamingAPI) // Write a couple of batches for (int i = 0; i < 2; i++) { - writeBatch(dbName, tblName, false); + CompactorTestUtil.writeBatch(conf, dbName, tblName, false, false); } // Start a third batch, but don't close it. - connection1 = writeBatch(dbName, tblName, true); + connection1 = CompactorTestUtil.writeBatch(conf, dbName, tblName, false, true); } else { HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null); DelimitedInputWriter writer = new DelimitedInputWriter(new String[]{"a", "b"}, ",", endPt); @@ -1338,11 +1342,11 @@ private void minorCompactWhileStreamingWithSplitUpdate(boolean newStreamingAPI) .newConnection(false, "UT_" + Thread.currentThread().getName()); // Write a couple of batches for (int i = 0; i < 2; i++) { - writeBatch(connection2, writer, false); + CompactorTestUtil.writeBatch(connection2, writer, false); } // Start a third batch, but don't close it. - writeBatch(connection2, writer, true); + CompactorTestUtil.writeBatch(connection2, writer, true); } // Now, compact TxnStore txnHandler = TxnUtils.getTxnStore(conf); @@ -1369,8 +1373,9 @@ private void minorCompactWhileStreamingWithSplitUpdate(boolean newStreamingAPI) if (!Arrays.deepEquals(expected, names)) { Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names)); } - checkExpectedTxnsPresent(null, new Path[]{resultFile}, columnNamesProperty, columnTypesProperty, - 0, 1L, 4L, 1); + CompactorTestUtil + .checkExpectedTxnsPresent(null, new Path[] {resultFile}, columnNamesProperty, columnTypesProperty, 0, 1L, 4L, + null, 1); //Assert that we have no delete deltas if there are no input delete events. FileStatus[] deleteDeltaStat = @@ -1705,141 +1710,6 @@ public void testCompactionOnDataLoadedInPath() throws Exception { return rsp.getCompacts(); } - private void writeBatch(org.apache.hive.hcatalog.streaming.StreamingConnection connection, - DelimitedInputWriter writer, - boolean closeEarly) throws InterruptedException, org.apache.hive.hcatalog.streaming.StreamingException { - TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); - txnBatch.beginNextTransaction(); - txnBatch.write("50,Kiev".getBytes()); - txnBatch.write("51,St. Petersburg".getBytes()); - txnBatch.write("44,Boston".getBytes()); - txnBatch.commit(); - if (!closeEarly) { - txnBatch.beginNextTransaction(); - txnBatch.write("52,Tel Aviv".getBytes()); - txnBatch.write("53,Atlantis".getBytes()); - txnBatch.write("53,Boston".getBytes()); - txnBatch.commit(); - txnBatch.close(); - } - } - - private StreamingConnection writeBatch(String dbName, String tblName, boolean closeEarly) throws StreamingException { - StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() - .withFieldDelimiter(',') - .build(); - StreamingConnection connection = HiveStreamingConnection.newBuilder() - .withDatabase(dbName) - .withTable(tblName) - .withAgentInfo("UT_" + Thread.currentThread().getName()) - .withHiveConf(conf) - .withRecordWriter(writer) - .withTransactionBatchSize(2) - .connect(); - connection.beginTransaction(); - connection.write("50,Kiev".getBytes()); - connection.write("51,St. Petersburg".getBytes()); - connection.write("44,Boston".getBytes()); - connection.commitTransaction(); - - if (!closeEarly) { - connection.beginTransaction(); - connection.write("52,Tel Aviv".getBytes()); - connection.write("53,Atlantis".getBytes()); - connection.write("53,Boston".getBytes()); - connection.commitTransaction(); - connection.close(); - return null; - } - return connection; - } - - private void checkExpectedTxnsPresent(Path base, Path[] deltas, String columnNamesProperty, - String columnTypesProperty, int bucket, long min, long max, int numBuckets) - throws IOException { - ValidWriteIdList writeIdList = new ValidWriteIdList() { - @Override - public String getTableName() { - return "AcidTable"; - } - - @Override - public boolean isWriteIdValid(long writeid) { - return true; - } - - @Override - public RangeResponse isWriteIdRangeValid(long minWriteId, long maxWriteId) { - return RangeResponse.ALL; - } - - @Override - public String writeToString() { - return ""; - } - - @Override - public void readFromString(String src) { - - } - - @Override - public Long getMinOpenWriteId() { - return null; - } - - @Override - public long getHighWatermark() { - return Long.MAX_VALUE; - } - - @Override - public long[] getInvalidWriteIds() { - return new long[0]; - } - - @Override - public boolean isValidBase(long writeid) { - return true; - } - - @Override - public boolean isWriteIdAborted(long writeid) { - return true; - } - - @Override - public RangeResponse isWriteIdRangeAborted(long minWriteId, long maxWriteId) { - return RangeResponse.ALL; - } - }; - - OrcInputFormat aif = new OrcInputFormat(); - - Configuration conf = new Configuration(); - conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, columnNamesProperty); - conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, columnTypesProperty); - conf.set(hive_metastoreConstants.BUCKET_COUNT, Integer.toString(numBuckets)); - HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true); - AcidInputFormat.RawReader reader = - aif.getRawReader(conf, true, bucket, writeIdList, base, deltas); - RecordIdentifier identifier = reader.createKey(); - OrcStruct value = reader.createValue(); - long currentTxn = min; - boolean seenCurrentTxn = false; - while (reader.next(identifier, value)) { - if (!seenCurrentTxn) { - Assert.assertEquals(currentTxn, identifier.getWriteId()); - seenCurrentTxn = true; - } - if (currentTxn != identifier.getWriteId()) { - Assert.assertEquals(currentTxn + 1, identifier.getWriteId()); - currentTxn++; - } - } - Assert.assertEquals(max, currentTxn); - } - /** * convenience method to execute a select stmt and dump results to log file */ @@ -1883,18 +1753,6 @@ static void createTestDataFile(String filename, String[] lines) throws IOExcepti } - private static String toString(FileStatus[] stat) { - StringBuilder sb = new StringBuilder("stat{"); - if(stat == null) { - return sb.toString(); - } - for(FileStatus f : stat) { - sb.append(f.getPath()).append(","); - } - sb.setCharAt(sb.length() - 1, '}'); - return sb.toString(); - } - private void verifyCompactions(List compacts, SortedSet partNames, String tblName) { for (ShowCompactResponseElement compact : compacts) { Assert.assertEquals("default", compact.getDbname()); @@ -1909,34 +1767,11 @@ private void processStreamingAPI(String dbName, String tblName, boolean newStrea throws StreamingException, ClassNotFoundException, org.apache.hive.hcatalog.streaming.StreamingException, InterruptedException { if (newStreamingAPI) { - StreamingConnection connection = null; - try { - // Write a couple of batches - for (int i = 0; i < 2; i++) { - connection = writeBatch(dbName, tblName, false); - assertNull(connection); - } - - StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() - .withFieldDelimiter(',') - .build(); - StreamingConnection connection2 = HiveStreamingConnection.newBuilder() - .withDatabase(dbName) - .withTable(tblName) - .withAgentInfo("UT_" + Thread.currentThread().getName()) - .withHiveConf(conf) - .withRecordWriter(writer) - .withTransactionBatchSize(2) - .connect(); - // Start a third batch, but don't close it. - connection2.beginTransaction(); - connection2.abortTransaction(); - connection2.close(); - } finally { - if (connection != null) { - connection.close(); - } - } + List options = Lists + .newArrayList(new CompactorTestUtil.StreamingConnectionOption(false, false), + new CompactorTestUtil.StreamingConnectionOption(false, false), + new CompactorTestUtil.StreamingConnectionOption(true, false)); + CompactorTestUtil.runStreamingAPI(conf, dbName, tblName, options); } else { HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null); DelimitedInputWriter writer = new DelimitedInputWriter(new String[]{"a", "b"}, ",", endPt); @@ -1945,7 +1780,7 @@ private void processStreamingAPI(String dbName, String tblName, boolean newStrea try { // Write a couple of batches for (int i = 0; i < 2; i++) { - writeBatch(connection, writer, false); + CompactorTestUtil.writeBatch(connection, writer, false); } // Start a third batch, abort everything, don't properly close it diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java index b7245e2c35..7afef0fce6 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java @@ -18,14 +18,14 @@ package org.apache.hadoop.hive.ql.txn.compactor; import java.io.File; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Random; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.collect.Lists; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; @@ -34,39 +34,37 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.CompactionRequest; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; -import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; -import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.DriverFactory; import org.apache.hadoop.hive.ql.IDriver; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveInputFormat; -import org.apache.hadoop.hive.ql.processors.CommandProcessorException; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hive.streaming.HiveStreamingConnection; +import org.apache.hive.streaming.StreamingConnection; +import org.apache.hive.streaming.StrictDelimitedInputWriter; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeStatementOnDriver; +import static org.apache.hadoop.hive.ql.txn.compactor.CompactorTestUtil.executeStatementOnDriverAndReturnResults; @SuppressWarnings("deprecation") -// TODO: Add tests for bucketing_version=1 when HIVE-21167 is fixed public class TestCrudCompactorOnTez { private static final AtomicInteger salt = new AtomicInteger(new Random().nextInt()); - private static final Logger LOG = LoggerFactory.getLogger(TestCrudCompactorOnTez.class); private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator + TestCrudCompactorOnTez.class.getCanonicalName() + "-" + System.currentTimeMillis() + "_" + salt .getAndIncrement()).getPath().replaceAll("\\\\", "/"); private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse"; private HiveConf conf; - IMetaStoreClient msClient; + private IMetaStoreClient msClient; private IDriver driver; @Before @@ -179,8 +177,8 @@ public void testMajorCompaction() throws Exception { + " where ROW__ID.bucketid = 536936448 order by ROW__ID", driver); Assert.assertEquals("normal read", expectedRsBucket1, rsBucket1); // Run major compaction and cleaner - runCompaction(dbName, tblName, CompactionType.MAJOR); - runCleaner(conf); + CompactorTestUtil.runCompaction(conf, dbName, tblName, CompactionType.MAJOR, true); + CompactorTestUtil.runCleaner(conf); // Should contain only one base directory now filestatus = fs.listStatus(new Path(table.getSd().getLocation())); String[] bases = new String[filestatus.length]; @@ -205,51 +203,633 @@ public void testMajorCompaction() throws Exception { } @Test - public void testMinorCompactionDisabled() throws Exception { + public void testMinorCompactionNotPartitionedWithoutBuckets() throws Exception { String dbName = "default"; - String tblName = "testMinorCompactionDisabled"; - executeStatementOnDriver("drop table if exists " + tblName, driver); - executeStatementOnDriver("create transactional table " + tblName + " (a int, b int) clustered by (a) into 2 buckets" - + " stored as ORC TBLPROPERTIES('bucketing_version'='2', 'transactional'='true'," - + " 'transactional_properties'='default')", driver); - executeStatementOnDriver("insert into " + tblName + " values(1,2),(1,3),(1,4),(2,2),(2,3),(2,4)", driver); - executeStatementOnDriver("insert into " + tblName + " values(3,2),(3,3),(3,4),(4,2),(4,3),(4,4)", driver); - executeStatementOnDriver("delete from " + tblName + " where b = 2", driver); + String tableName = "testMinorCompaction"; + // Create test table + TestDataProvider dataProvider = new TestDataProvider(); + dataProvider.createTable(tableName, false, false); // Find the location of the table IMetaStoreClient msClient = new HiveMetaStoreClient(conf); - Table table = msClient.getTable(dbName, tblName); + Table table = msClient.getTable(dbName, tableName); FileSystem fs = FileSystem.get(conf); - // Verify deltas (delta_0000001_0000001_0000, delta_0000002_0000002_0000) are present - FileStatus[] filestatus = fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deltaFileFilter); - String[] deltas = new String[filestatus.length]; - for (int i = 0; i < deltas.length; i++) { - deltas[i] = filestatus[i].getPath().getName(); - } - Arrays.sort(deltas); - String[] expectedDeltas = new String[] { "delta_0000001_0000001_0000", "delta_0000002_0000002_0000" }; - if (!Arrays.deepEquals(expectedDeltas, deltas)) { - Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas)); - } - // Verify that delete delta (delete_delta_0000003_0000003_0000) is present - FileStatus[] deleteDeltaStat = fs.listStatus(new Path(table.getSd().getLocation()), - AcidUtils.deleteEventDeltaDirFilter); - String[] deleteDeltas = new String[deleteDeltaStat.length]; - for (int i = 0; i < deleteDeltas.length; i++) { - deleteDeltas[i] = deleteDeltaStat[i].getPath().getName(); + // Insert test data into test table + dataProvider.insertTestData(tableName); + // Get all data before compaction is run + List expectedData = dataProvider.getAllData(tableName); + // Verify deltas + Assert.assertEquals("Delta directories does not match", + Arrays.asList("delta_0000001_0000001_0000", "delta_0000002_0000002_0000", "delta_0000004_0000004_0000"), + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null)); + // Verify delete delta + Assert.assertEquals("Delete directories does not match", + Arrays.asList("delete_delta_0000003_0000003_0000", "delete_delta_0000005_0000005_0000"), + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deleteEventDeltaDirFilter, table, null)); + // Run a compaction + CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true); + // Clean up resources + CompactorTestUtil.runCleaner(conf); + // Only 1 compaction should be in the response queue with succeeded state + List compacts = + TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts(); + Assert.assertEquals("Completed compaction queue must contain one element", 1, compacts.size()); + Assert.assertEquals("Compaction state is not succeeded", "succeeded", compacts.get(0).getState()); + // Verify delta directories after compaction + List actualDeltasAfterComp = + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null); + Assert.assertEquals("Delta directories does not match after compaction", + Collections.singletonList("delta_0000001_0000005_v0000009"), actualDeltasAfterComp); + List actualDeleteDeltasAfterComp = + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deleteEventDeltaDirFilter, table, null); + Assert.assertEquals("Delete delta directories does not match after compaction", + Collections.singletonList("delete_delta_0000001_0000005_v0000009"), actualDeleteDeltasAfterComp); + // Verify bucket files in delta dirs + List expectedBucketFiles = Collections.singletonList("bucket_00000"); + Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles, + CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeltasAfterComp.get(0))); + Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles, + CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeleteDeltasAfterComp.get(0))); + // Verify contents of bucket files. + // Bucket 0 + List expectedRsBucket0 = Arrays.asList("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t2\t3", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t2\t4", + "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t3", + "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":2}\t3\t4", + "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":4}\t4\t3", + "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":5}\t4\t4", + "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t2", + "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t5\t3", + "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":2}\t5\t4", + "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":3}\t6\t2", + "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":4}\t6\t3", + "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":5}\t6\t4"); + List rsBucket0 = dataProvider.getBucketData(tableName, "536870912"); + Assert.assertEquals(expectedRsBucket0, rsBucket0); + // Verify all contents + List actualData = dataProvider.getAllData(tableName); + Assert.assertEquals(expectedData, actualData); + // Clean up + dataProvider.dropTable(tableName); + } + + @Test + public void testMinorCompactionNotPartitionedWithBuckets() throws Exception { + String dbName = "default"; + String tableName = "testMinorCompaction"; + // Create test table + TestDataProvider dataProvider = new TestDataProvider(); + dataProvider.createTable(tableName, false, true); + // Find the location of the table + IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf); + Table table = metaStoreClient.getTable(dbName, tableName); + FileSystem fs = FileSystem.get(conf); + // Insert test data into test table + dataProvider.insertTestData(tableName); + // Get all data before compaction is run + List expectedData = dataProvider.getAllData(tableName); + // Verify deltas + Assert.assertEquals("Delta directories does not match", + Arrays.asList("delta_0000001_0000001_0000", "delta_0000002_0000002_0000", "delta_0000004_0000004_0000"), + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null)); + // Verify delete delta + Assert.assertEquals("Delete directories does not match", + Arrays.asList("delete_delta_0000003_0000003_0000", "delete_delta_0000005_0000005_0000"), + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deleteEventDeltaDirFilter, table, null)); + // Run a compaction + CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true); + // Clean up resources + CompactorTestUtil.runCleaner(conf); + // Only 1 compaction should be in the response queue with succeeded state + List compacts = + TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts(); + Assert.assertEquals("Completed compaction queue must contain one element", 1, compacts.size()); + Assert.assertEquals("Compaction state is not succeeded", "succeeded", compacts.get(0).getState()); + // Verify delta directories after compaction + List actualDeltasAfterComp = + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null); + Assert.assertEquals("Delta directories does not match after compaction", + Collections.singletonList("delta_0000001_0000005_v0000009"), actualDeltasAfterComp); + List actualDeleteDeltasAfterComp = + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deleteEventDeltaDirFilter, table, null); + Assert.assertEquals("Delete delta directories does not match after compaction", + Collections.singletonList("delete_delta_0000001_0000005_v0000009"), actualDeleteDeltasAfterComp); + // Verify bucket files in delta dirs + List expectedBucketFiles = Arrays.asList("bucket_00000", "bucket_00001"); + Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles, + CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeltasAfterComp.get(0))); + Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles, + CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeleteDeltasAfterComp.get(0))); + // Verify contents of bucket files. + // Bucket 0 + List expectedRsBucket0 = Arrays.asList("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t3", + "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":2}\t3\t4"); + List rsBucket0 = dataProvider.getBucketData(tableName, "536870912"); + Assert.assertEquals(expectedRsBucket0, rsBucket0); + // Bucket 1 + List expectedRs1Bucket = Arrays.asList("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t2\t3", + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t2\t4", + "{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t4\t3", + "{\"writeid\":2,\"bucketid\":536936448,\"rowid\":2}\t4\t4", + "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":0}\t5\t2", + "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":1}\t5\t3", + "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":2}\t5\t4", + "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":3}\t6\t2", + "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":4}\t6\t3", + "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":5}\t6\t4"); + List rsBucket1 = dataProvider.getBucketData(tableName, "536936448"); + Assert.assertEquals(expectedRs1Bucket, rsBucket1); + // Verify all contents + List actualData = dataProvider.getAllData(tableName); + Assert.assertEquals(expectedData, actualData); + // Clean up + dataProvider.dropTable(tableName); + } + + @Test + public void testMinorCompactionPartitionedWithoutBuckets() throws Exception { + String dbName = "default"; + String tableName = "testMinorCompaction"; + // Create test table + TestDataProvider dataProvider = new TestDataProvider(); + dataProvider.createTable(tableName, true, false); + // Find the location of the table + IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf); + Table table = metaStoreClient.getTable(dbName, tableName); + FileSystem fs = FileSystem.get(conf); + // Insert test data into test table + dataProvider.insertTestDataPartitioned(tableName); + // Get all data before compaction is run + List expectedData = dataProvider.getAllData(tableName); + // Verify deltas + String partitionToday = "ds=today"; + String partitionTomorrow = "ds=tomorrow"; + String partitionYesterday = "ds=yesterday"; + Assert.assertEquals("Delta directories does not match", + Arrays.asList("delta_0000001_0000001_0000", "delta_0000002_0000002_0000", "delta_0000004_0000004_0000"), + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, partitionToday)); + // Verify delete delta + Assert.assertEquals("Delete directories does not match", + Arrays.asList("delete_delta_0000003_0000003_0000", "delete_delta_0000005_0000005_0000"), + CompactorTestUtil + .getBaseOrDeltaNames(fs, AcidUtils.deleteEventDeltaDirFilter, table, partitionToday)); + // Run a compaction + CompactorTestUtil + .runCompaction(conf, dbName, tableName, CompactionType.MINOR, true, partitionToday, partitionTomorrow, + partitionYesterday); + // Clean up resources + CompactorTestUtil.runCleaner(conf); + // 3 compaction should be in the response queue with succeeded state + List compacts = + TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts(); + Assert.assertEquals("Completed compaction queue must contain 3 element", 3, compacts.size()); + compacts.forEach(c -> Assert.assertEquals("Compaction state is not succeeded", "succeeded", c.getState())); + // Verify delta directories after compaction in each partition + List actualDeltasAfterCompPartToday = + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, partitionToday); + Assert.assertEquals("Delta directories does not match after compaction", + Collections.singletonList("delta_0000001_0000005_v0000009"), actualDeltasAfterCompPartToday); + List actualDeleteDeltasAfterCompPartToday = + CompactorTestUtil + .getBaseOrDeltaNames(fs, AcidUtils.deleteEventDeltaDirFilter, table, partitionToday); + Assert.assertEquals("Delete delta directories does not match after compaction", + Collections.singletonList("delete_delta_0000001_0000005_v0000009"), actualDeleteDeltasAfterCompPartToday); + // Verify bucket files in delta dirs + List expectedBucketFiles = Collections.singletonList("bucket_00000"); + Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles, + CompactorTestUtil + .getBucketFileNames(fs, table, partitionToday, actualDeltasAfterCompPartToday.get(0))); + Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles, + CompactorTestUtil + .getBucketFileNames(fs, table, partitionToday, actualDeleteDeltasAfterCompPartToday.get(0))); + // Verify contents of bucket files. + // Bucket 0 + List expectedRsBucket0 = Arrays + .asList("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t3\tyesterday", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t2\t4\ttoday", + "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t3\ttoday", + "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t4\tyesterday", + "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t4\t3\ttomorrow", + "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":2}\t4\t4\ttoday", + "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t4\ttoday", + "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t2\tyesterday", + "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t6\t2\ttoday", + "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t5\t3\tyesterday", + "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":2}\t6\t3\ttoday", + "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":3}\t6\t4\ttoday"); + List rsBucket0 = dataProvider.getBucketData(tableName, "536870912"); + Assert.assertEquals(expectedRsBucket0, rsBucket0); + // Verify all contents + List actualData = dataProvider.getAllData(tableName); + Assert.assertEquals(expectedData, actualData); + // Clean up + dataProvider.dropTable(tableName); + } + + @Test + public void testMinorCompactionPartitionedWithBuckets() throws Exception { + String dbName = "default"; + String tableName = "testMinorCompaction"; + // Create test table + TestDataProvider dataProvider = new TestDataProvider(); + dataProvider.createTable(tableName, true, true); + // Find the location of the table + IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf); + Table table = metaStoreClient.getTable(dbName, tableName); + FileSystem fs = FileSystem.get(conf); + // Insert test data into test table + dataProvider.insertTestDataPartitioned(tableName); + // Get all data before compaction is run + List expectedData = dataProvider.getAllData(tableName); + // Verify deltas + String partitionToday = "ds=today"; + String partitionTomorrow = "ds=tomorrow"; + String partitionYesterday = "ds=yesterday"; + Assert.assertEquals("Delta directories does not match", + Arrays.asList("delta_0000001_0000001_0000", "delta_0000002_0000002_0000", "delta_0000004_0000004_0000"), + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, partitionToday)); + // Verify delete delta + Assert.assertEquals("Delete directories does not match", + Arrays.asList("delete_delta_0000003_0000003_0000", "delete_delta_0000005_0000005_0000"), + CompactorTestUtil + .getBaseOrDeltaNames(fs, AcidUtils.deleteEventDeltaDirFilter, table, partitionToday)); + // Run a compaction + CompactorTestUtil + .runCompaction(conf, dbName, tableName, CompactionType.MINOR, true, partitionToday, partitionTomorrow, + partitionYesterday); + // Clean up resources + CompactorTestUtil.runCleaner(conf); + // Only 1 compaction should be in the response queue with succeeded state + List compacts = + TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts(); + Assert.assertEquals("Completed compaction queue must contain 3 element", 3, compacts.size()); + compacts.forEach(c -> Assert.assertEquals("Compaction state is not succeeded", "succeeded", c.getState())); + // Verify delta directories after compaction in each partition + List actualDeltasAfterCompPartToday = + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, partitionToday); + Assert.assertEquals("Delta directories does not match after compaction", + Collections.singletonList("delta_0000001_0000005_v0000009"), actualDeltasAfterCompPartToday); + List actualDeleteDeltasAfterCompPartToday = + CompactorTestUtil + .getBaseOrDeltaNames(fs, AcidUtils.deleteEventDeltaDirFilter, table, partitionToday); + Assert.assertEquals("Delete delta directories does not match after compaction", + Collections.singletonList("delete_delta_0000001_0000005_v0000009"), actualDeleteDeltasAfterCompPartToday); + // Verify bucket files in delta dirs + List expectedBucketFiles = Arrays.asList("bucket_00000", "bucket_00001"); + Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles, + CompactorTestUtil + .getBucketFileNames(fs, table, partitionToday, actualDeltasAfterCompPartToday.get(0))); + Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles, + CompactorTestUtil + .getBucketFileNames(fs, table, partitionToday, actualDeleteDeltasAfterCompPartToday.get(0))); + // Verify contents of bucket files. + // Bucket 0 + List expectedRsBucket0 = Arrays.asList("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t3\ttoday", + "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t4\tyesterday"); + List rsBucket0 = dataProvider.getBucketData(tableName, "536870912"); + Assert.assertEquals(expectedRsBucket0, rsBucket0); + // Bucket 1 + List expectedRsBucket1 = Arrays.asList("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t4\ttoday", + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t3\tyesterday", + "{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t4\t3\ttomorrow", + "{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t4\t4\ttoday", + "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":0}\t5\t4\ttoday", + "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":0}\t5\t2\tyesterday", + "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":1}\t6\t2\ttoday", + "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":1}\t5\t3\tyesterday", + "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":2}\t6\t3\ttoday", + "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":3}\t6\t4\ttoday"); + List rsBucket1 = dataProvider.getBucketData(tableName, "536936448"); + Assert.assertEquals(expectedRsBucket1, rsBucket1); + // Verify all contents + List actualData = dataProvider.getAllData(tableName); + Assert.assertEquals(expectedData, actualData); + // Clean up + dataProvider.dropTable(tableName); + } + + @Test + public void testMinorCompaction10DeltaDirs() throws Exception { + String dbName = "default"; + String tableName = "testMinorCompaction"; + // Create test table + TestDataProvider dataProvider = new TestDataProvider(); + dataProvider.createTable(tableName, false, false); + // Find the location of the table + IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf); + Table table = metaStoreClient.getTable(dbName, tableName); + FileSystem fs = FileSystem.get(conf); + // Insert test data into test table + dataProvider.insertTestData(tableName, 10); + // Get all data before compaction is run + List expectedData = dataProvider.getAllData(tableName); + Collections.sort(expectedData); + // Verify deltas + List deltaNames = CompactorTestUtil + .getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null); + Assert.assertEquals(10, deltaNames.size()); + List deleteDeltaName = + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deleteEventDeltaDirFilter, table, null); + Assert.assertEquals(5, deleteDeltaName.size()); + // Run a compaction + CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true); + // Clean up resources + CompactorTestUtil.runCleaner(conf); + List compacts = + TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts(); + Assert.assertEquals("Completed compaction queue must contain 3 element", 1, compacts.size()); + compacts.forEach(c -> Assert.assertEquals("Compaction state is not succeeded", "succeeded", c.getState())); + // Verify delta directories after compaction + List actualDeltasAfterComp = + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null); + Assert.assertEquals(Collections.singletonList("delta_0000001_0000015_v0000019"), actualDeltasAfterComp); + List actualDeleteDeltasAfterComp = + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deleteEventDeltaDirFilter, table, null); + Assert + .assertEquals(Collections.singletonList("delete_delta_0000001_0000015_v0000019"), actualDeleteDeltasAfterComp); + // Verify bucket file in delta dir + List expectedBucketFile = Collections.singletonList("bucket_00000"); + Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFile, + CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeltasAfterComp.get(0))); + Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFile, + CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeleteDeltasAfterComp.get(0))); + // Verify contents of bucket file + List rsBucket0 = dataProvider.getBucketData(tableName, "536870912"); + Assert.assertEquals(5, rsBucket0.size()); + // Verify all contents + List actualData = dataProvider.getAllData(tableName); + Assert.assertEquals(expectedData, actualData); + // Clean up + dataProvider.dropTable(tableName); + } + + @Test + public void testMultipleMinorCompactions() throws Exception { + String dbName = "default"; + String tableName = "testMinorCompaction"; + // Create test table + TestDataProvider dataProvider = new TestDataProvider(); + dataProvider.createTable(tableName, false, true); + // Find the location of the table + IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf); + Table table = metaStoreClient.getTable(dbName, tableName); + FileSystem fs = FileSystem.get(conf); + // Insert test data into test table + dataProvider.insertTestData(tableName); + // Run a compaction + CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true); + // Clean up resources + CompactorTestUtil.runCleaner(conf); + // Only 1 compaction should be in the response queue with succeeded state + List compacts = + TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts(); + Assert.assertEquals("Completed compaction queue must contain one element", 1, compacts.size()); + Assert.assertEquals("Compaction state is not succeeded", "succeeded", compacts.get(0).getState()); + // Insert test data into test table + dataProvider.insertTestData(tableName); + // Run a compaction + CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true); + // Clean up resources + CompactorTestUtil.runCleaner(conf); + // 2 compaction should be in the response queue with succeeded state + compacts = TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts(); + Assert.assertEquals("Completed compaction queue must contain one element", 2, compacts.size()); + Assert.assertEquals("Compaction state is not succeeded", "succeeded", compacts.get(1).getState()); + // Insert test data into test table + dataProvider.insertTestData(tableName); + // Run a compaction + CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true); + // Clean up resources + CompactorTestUtil.runCleaner(conf); + // 3 compaction should be in the response queue with succeeded state + compacts = TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts(); + Assert.assertEquals("Completed compaction queue must contain one element", 3, compacts.size()); + Assert.assertEquals("Compaction state is not succeeded", "succeeded", compacts.get(2).getState()); + // Verify delta directories after compaction + List actualDeltasAfterComp = + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null); + Assert.assertEquals("Delta directories does not match after compaction", + Collections.singletonList("delta_0000001_0000015_v0000044"), actualDeltasAfterComp); + List actualDeleteDeltasAfterComp = + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deleteEventDeltaDirFilter, table, null); + Assert.assertEquals("Delete delta directories does not match after compaction", + Collections.singletonList("delete_delta_0000001_0000015_v0000044"), actualDeleteDeltasAfterComp); + + } + + @Test + public void testMinorCompactionWhileStreaming() throws Exception { + String dbName = "default"; + String tableName = "testMinorCompaction"; + executeStatementOnDriver("drop table if exists " + tableName, driver); + executeStatementOnDriver("CREATE TABLE " + tableName + "(a INT, b STRING) " + " CLUSTERED BY(a) INTO 1 BUCKETS" + + " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver); + StreamingConnection connection = null; + try { + // Write a couple of batches + for (int i = 0; i < 2; i++) { + CompactorTestUtil.writeBatch(conf, dbName, tableName, false, false); + } + + // Start a third batch, but don't close it. + connection = CompactorTestUtil.writeBatch(conf, dbName, tableName, false, true); + + // Now, compact + CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true); + + // Find the location of the table + IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf); + Table table = metaStoreClient.getTable(dbName, tableName); + FileSystem fs = FileSystem.get(conf); + Assert.assertEquals("Delta names does not match", Arrays + .asList("delta_0000001_0000002", "delta_0000001_0000005_v0000009", "delta_0000003_0000004", + "delta_0000005_0000006"), CompactorTestUtil.getBaseOrDeltaNames(fs, null, table, null)); + CompactorTestUtil.checkExpectedTxnsPresent(null, + new Path[] {new Path(table.getSd().getLocation(), "delta_0000001_0000005_v0000009")}, "a,b", "int:string", + 0, 1L, 4L, null, 1); + + } finally { + if (connection != null) { + connection.close(); + } } - Arrays.sort(deleteDeltas); - String[] expectedDeleteDeltas = new String[] { "delete_delta_0000003_0000003_0000" }; - if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) { - Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas)); + } + + @Test + public void testMinorCompactionWhileStreamingAfterAbort() throws Exception { + String dbName = "default"; + String tableName = "testMinorCompaction"; + executeStatementOnDriver("drop table if exists " + tableName, driver); + executeStatementOnDriver("CREATE TABLE " + tableName + "(a INT, b STRING) " + " CLUSTERED BY(a) INTO 1 BUCKETS" + + " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver); + CompactorTestUtil.runStreamingAPI(conf, dbName, tableName, Lists + .newArrayList(new CompactorTestUtil.StreamingConnectionOption(false, false), + new CompactorTestUtil.StreamingConnectionOption(false, false), + new CompactorTestUtil.StreamingConnectionOption(true, false))); + // Now, compact + CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true); + // Find the location of the table + IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf); + Table table = metaStoreClient.getTable(dbName, tableName); + FileSystem fs = FileSystem.get(conf); + Assert.assertEquals("Delta names does not match", + Arrays.asList("delta_0000001_0000002", "delta_0000001_0000006_v0000009", "delta_0000003_0000004"), + CompactorTestUtil.getBaseOrDeltaNames(fs, null, table, null)); + CompactorTestUtil.checkExpectedTxnsPresent(null, + new Path[] {new Path(table.getSd().getLocation(), "delta_0000001_0000006_v0000009")}, "a,b", "int:string", 0, + 1L, 4L, Lists.newArrayList(5, 6), 1); + } + + @Test + public void testMinorCompactionWhileStreamingWithAbort() throws Exception { + String dbName = "default"; + String tableName = "testMinorCompaction"; + executeStatementOnDriver("drop table if exists " + tableName, driver); + executeStatementOnDriver( + "CREATE TABLE " + tableName + "(a INT, b STRING) " + " STORED AS ORC TBLPROPERTIES ('transactional'='true')", + driver); + CompactorTestUtil.runStreamingAPI(conf, dbName, tableName, Lists + .newArrayList(new CompactorTestUtil.StreamingConnectionOption(false, false), + new CompactorTestUtil.StreamingConnectionOption(true, false), + new CompactorTestUtil.StreamingConnectionOption(false, false))); + // Now, copact + CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true); + // Find the location of the table + IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf); + Table table = metaStoreClient.getTable(dbName, tableName); + FileSystem fs = FileSystem.get(conf); + Assert.assertEquals("Delta names does not match", + Arrays.asList("delta_0000001_0000002", "delta_0000001_0000006_v0000009", "delta_0000005_0000006"), + CompactorTestUtil.getBaseOrDeltaNames(fs, null, table, null)); + CompactorTestUtil.checkExpectedTxnsPresent(null, + new Path[] {new Path(table.getSd().getLocation(), "delta_0000001_0000006_v0000009")}, "a,b", "int:string", 0, + 1L, 6L, Lists.newArrayList(3, 4), 1); + } + + @Test + public void testMinorCompactionWhileStreamingWithAbortInMiddle() throws Exception { + String dbName = "default"; + String tableName = "testMinorCompaction"; + executeStatementOnDriver("drop table if exists " + tableName, driver); + executeStatementOnDriver( + "CREATE TABLE " + tableName + "(a INT, b STRING) " + " STORED AS ORC TBLPROPERTIES ('transactional'='true')", + driver); + StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build(); + StreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable(tableName) + .withAgentInfo("UT_" + Thread.currentThread().getName()).withHiveConf(conf).withRecordWriter(writer).connect(); + connection.beginTransaction(); + connection.write("50,Kiev".getBytes()); + connection.write("51,St. Petersburg".getBytes()); + connection.write("52,Boston".getBytes()); + connection.commitTransaction(); + connection.beginTransaction(); + connection.write("60,Budapest".getBytes()); + connection.abortTransaction(); + connection.beginTransaction(); + connection.write("71,Szeged".getBytes()); + connection.write("72,Debrecen".getBytes()); + connection.commitTransaction(); + connection.close(); + CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true); + CompactorTestUtil.runCleaner(conf); + // Find the location of the table + IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf); + Table table = metaStoreClient.getTable(dbName, tableName); + FileSystem fs = FileSystem.get(conf); + Assert.assertEquals("Delta names does not match", Collections.singletonList("delta_0000001_0000003_v0000006"), + CompactorTestUtil.getBaseOrDeltaNames(fs, null, table, null)); + CompactorTestUtil.checkExpectedTxnsPresent(null, + new Path[] {new Path(table.getSd().getLocation(), "delta_0000001_0000003_v0000006")}, "a,b", "int:string", 0, + 1L, 3L, Lists.newArrayList(2), 1); + } + + @Test + public void testMajorCompactionAfterMinor() throws Exception { + String dbName = "default"; + String tableName = "testMinorCompaction"; + // Create test table + TestDataProvider dataProvider = new TestDataProvider(); + dataProvider.createTable(tableName, false, false); + // Find the location of the table + IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf); + Table table = metaStoreClient.getTable(dbName, tableName); + FileSystem fs = FileSystem.get(conf); + // Insert test data into test table + dataProvider.insertTestData(tableName); + // Get all data before compaction is run + List expectedData = dataProvider.getAllData(tableName); + Collections.sort(expectedData); + // Run a compaction + CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true); + // Clean up resources + CompactorTestUtil.runCleaner(conf); + // Only 1 compaction should be in the response queue with succeeded state + List compacts = + TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts(); + Assert.assertEquals("Completed compaction queue must contain one element", 1, compacts.size()); + Assert.assertEquals("Compaction state is not succeeded", "succeeded", compacts.get(0).getState()); + // Verify delta directories after compaction + Assert.assertEquals("Delta directories does not match after minor compaction", + Collections.singletonList("delta_0000001_0000005_v0000009"), + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null)); + Assert.assertEquals("Delete delta directories does not match after minor compaction", + Collections.singletonList("delete_delta_0000001_0000005_v0000009"), + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deleteEventDeltaDirFilter, table, null)); + // Verify all contents + List actualData = dataProvider.getAllData(tableName); + Assert.assertEquals(expectedData, actualData); + // Run a compaction + CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MAJOR, true); + // Clean up resources + CompactorTestUtil.runCleaner(conf); + // 2 compaction should be in the response queue with succeeded state + compacts = TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts(); + Assert.assertEquals("Completed compaction queue must contain one element", 2, compacts.size()); + Assert.assertEquals("Compaction state is not succeeded", "succeeded", compacts.get(1).getState()); + // Verify base directory after compaction + Assert.assertEquals("Base directory does not match after major compaction", + Collections.singletonList("base_0000005_v0000023"), + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, table, null)); + // Verify all contents + actualData = dataProvider.getAllData(tableName); + Assert.assertEquals(expectedData, actualData); + } + + @Test + public void testMinorCompactionWhileStreamingWithSplitUpdate() throws Exception { + String dbName = "default"; + String tableName = "testMinorCompaction"; + executeStatementOnDriver("drop table if exists " + tableName, driver); + executeStatementOnDriver("CREATE TABLE " + tableName + "(a INT, b STRING) " + " CLUSTERED BY(a) INTO 1 BUCKETS" + + " STORED AS ORC TBLPROPERTIES ('transactional'='true'," + "'transactional_properties'='default')", driver); + StreamingConnection connection = null; + // Write a couple of batches + try { + for (int i = 0; i < 2; i++) { + CompactorTestUtil.writeBatch(conf, dbName, tableName, false, false); + } + // Start a third batch, but don't close it. + connection = CompactorTestUtil.writeBatch(conf, dbName, tableName, false, true); + // Now, compact + CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true); + // Find the location of the table + IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf); + Table table = metaStoreClient.getTable(dbName, tableName); + FileSystem fs = FileSystem.get(conf); + Assert.assertEquals("Delta names does not match", Arrays + .asList("delta_0000001_0000002", "delta_0000001_0000005_v0000009", "delta_0000003_0000004", + "delta_0000005_0000006"), CompactorTestUtil.getBaseOrDeltaNames(fs, null, table, null)); + CompactorTestUtil.checkExpectedTxnsPresent(null, + new Path[] {new Path(table.getSd().getLocation(), "delta_0000001_0000005_v0000009")}, "a,b", "int:string", + 0, 1L, 4L, null, 1); + //Assert that we have no delete deltas if there are no input delete events. + Assert.assertEquals(0, + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deleteEventDeltaDirFilter, table, null).size()); + } finally { + if (connection != null) { + connection.close(); + } } - // Initiate a compaction, make sure it's not queued - runInitiator(conf); - TxnStore txnHandler = TxnUtils.getTxnStore(conf); - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List compacts = rsp.getCompacts(); - Assert.assertEquals(0, compacts.size()); - // Clean up - executeStatementOnDriver("drop table " + tblName, driver); + } @Test @@ -273,8 +853,9 @@ public void testCompactionWithSchemaEvolutionAndBuckets() throws Exception { + "(4,3,1004,'yesterday'),(4,4,1005,'today')", driver); executeStatementOnDriver("delete from " + tblName + " where b = 2", driver); // Run major compaction and cleaner - runCompaction(dbName, tblName, CompactionType.MAJOR, "ds=yesterday", "ds=today"); - runCleaner(conf); + CompactorTestUtil + .runCompaction(conf, dbName, tblName, CompactionType.MAJOR, true, "ds=yesterday", "ds=today"); + CompactorTestUtil.runCleaner(conf); List expectedRsBucket0PtnToday = new ArrayList<>(); expectedRsBucket0PtnToday.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\t3\tNULL\ttoday"); expectedRsBucket0PtnToday.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t4\tNULL\ttoday"); @@ -317,8 +898,9 @@ public void testCompactionWithSchemaEvolutionNoBucketsMultipleReducers() throws + "(4,3,1004,'yesterday'),(4,4,1005,'today')", driver); executeStatementOnDriver("delete from " + tblName + " where b = 2", driver); // Run major compaction and cleaner - runCompaction(dbName, tblName, CompactionType.MAJOR, "ds=yesterday", "ds=today"); - runCleaner(hiveConf); + CompactorTestUtil + .runCompaction(conf, dbName, tblName, CompactionType.MAJOR, true, "ds=yesterday", "ds=today"); + CompactorTestUtil.runCleaner(hiveConf); List expectedRsPtnToday = new ArrayList<>(); expectedRsPtnToday.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t3\tNULL\ttoday"); expectedRsPtnToday.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t2\t3\tNULL\ttoday"); @@ -341,86 +923,68 @@ public void testCompactionWithSchemaEvolutionNoBucketsMultipleReducers() throws executeStatementOnDriver("drop table " + tblName, driver); } - private void runCompaction(String dbName, String tblName, CompactionType compactionType, String... partNames) - throws Exception { - HiveConf hiveConf = new HiveConf(conf); - hiveConf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true); - TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); - Worker t = new Worker(); - t.setThreadId((int) t.getId()); - t.setConf(hiveConf); - t.init(new AtomicBoolean(true), new AtomicBoolean()); - if (partNames.length == 0) { - txnHandler.compact(new CompactionRequest(dbName, tblName, compactionType)); - t.run(); - } else { - for (String partName : partNames) { - CompactionRequest cr = new CompactionRequest(dbName, tblName, compactionType); - cr.setPartitionname(partName); - txnHandler.compact(cr); - t.run(); + private class TestDataProvider { + + private void createTable(String tblName, boolean isPartitioned, boolean isBucketed) throws Exception { + executeStatementOnDriver("drop table if exists " + tblName, driver); + StringBuilder query = new StringBuilder(); + query.append("create table ").append(tblName).append(" (a string, b int)"); + if (isPartitioned) { + query.append(" partitioned by (ds string)"); + } + if (isBucketed) { + query.append(" clustered by (a) into 2 buckets"); } + query.append(" stored as ORC TBLPROPERTIES('transactional'='true'," + " 'transactional_properties'='default')"); + executeStatementOnDriver(query.toString(), driver); } - } - static void runInitiator(HiveConf hConf) throws Exception { - HiveConf hiveConf = new HiveConf(hConf); - hiveConf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true); - AtomicBoolean stop = new AtomicBoolean(true); - Initiator t = new Initiator(); - t.setThreadId((int) t.getId()); - t.setConf(hiveConf); - AtomicBoolean looped = new AtomicBoolean(); - t.init(stop, looped); - t.run(); - } + private void insertTestDataPartitioned(String tblName) throws Exception { + executeStatementOnDriver("insert into " + tblName + + " values('1',2, 'today'),('1',3, 'today'),('1',4, 'yesterday'),('2',2, 'tomorrow')," + + "('2',3, 'yesterday'),('2',4, 'today')", driver); + executeStatementOnDriver("insert into " + tblName + + " values('3',2, 'tomorrow'),('3',3, 'today'),('3',4, 'yesterday'),('4',2, 'today')," + + "('4',3, 'tomorrow'),('4',4, 'today')", driver); + executeStatementOnDriver("delete from " + tblName + " where b = 2", driver); + executeStatementOnDriver("insert into " + tblName + " values('5',2, 'yesterday'),('5',3, 'yesterday')," + + "('5',4, 'today'),('6',2, 'today'),('6',3, 'today'),('6',4, 'today')", driver); + executeStatementOnDriver("delete from " + tblName + " where a = '1'", driver); + } - static void runWorker(HiveConf hConf) throws Exception { - HiveConf hiveConf = new HiveConf(hConf); - hiveConf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true); - AtomicBoolean stop = new AtomicBoolean(true); - Worker t = new Worker(); - t.setThreadId((int) t.getId()); - t.setConf(hiveConf); - AtomicBoolean looped = new AtomicBoolean(); - t.init(stop, looped); - t.run(); - } + private void insertTestData(String tblName) throws Exception { + executeStatementOnDriver("insert into " + tblName + " values('1',2),('1',3),('1',4),('2',2),('2',3),('2',4)", + driver); + executeStatementOnDriver("insert into " + tblName + " values('3',2),('3',3),('3',4),('4',2),('4',3),('4',4)", + driver); + executeStatementOnDriver("delete from " + tblName + " where b = 2", driver); + executeStatementOnDriver("insert into " + tblName + " values('5',2),('5',3),('5',4),('6',2),('6',3),('6',4)", + driver); + executeStatementOnDriver("delete from " + tblName + " where a = '1'", driver); + } - static void runCleaner(HiveConf hConf) throws Exception { - HiveConf hiveConf = new HiveConf(hConf); - AtomicBoolean stop = new AtomicBoolean(true); - Cleaner t = new Cleaner(); - t.setThreadId((int) t.getId()); - t.setConf(hiveConf); - AtomicBoolean looped = new AtomicBoolean(); - t.init(stop, looped); - t.run(); - } + private void insertTestData(String tblName, int iterations) throws Exception { + for (int i = 0; i < iterations; i++) { + executeStatementOnDriver("insert into " + tblName + " values('" + i + "'," + i + ")", driver); + } + for (int i = 0; i < iterations; i += 2) { + executeStatementOnDriver("delete from " + tblName + " where b = " + i, driver); + } + } - /** - * Execute Hive CLI statement - * - * @param cmd arbitrary statement to execute - */ - static void executeStatementOnDriver(String cmd, IDriver driver) throws Exception { - LOG.debug("Executing: " + cmd); - try { - driver.run(cmd); - } catch (CommandProcessorException e) { - throw new IOException("Failed to execute \"" + cmd + "\". Driver returned: " + e); + private List getAllData(String tblName) throws Exception { + List result = executeStatementOnDriverAndReturnResults("select * from " + tblName, driver); + Collections.sort(result); + return result; } - } - static List executeStatementOnDriverAndReturnResults(String cmd, IDriver driver) throws Exception { - LOG.debug("Executing: " + cmd); - try { - driver.run(cmd); - } catch (CommandProcessorException e) { - throw new IOException("Failed to execute \"" + cmd + "\". Driver returned: " + e); + private List getBucketData(String tblName, String bucketId) throws Exception { + return executeStatementOnDriverAndReturnResults( + "select ROW__ID, * from " + tblName + " where ROW__ID.bucketid = " + bucketId + " order by ROW__ID", driver); + } + + private void dropTable(String tblName) throws Exception { + executeStatementOnDriver("drop table " + tblName, driver); } - List rs = new ArrayList(); - driver.getResults(rs); - return rs; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java index 33d723a02e..076b77877a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.io.IOException; +import java.io.Serializable; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -28,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; @@ -56,6 +58,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Multimap; + /** * SplitGrouper is used to combine splits based on head room and locality. It * also enforces restrictions around schema, file format and bucketing. @@ -175,10 +178,10 @@ public Multimap generateGroupedSplits(JobConf jobConf, Configuration conf, InputSplit[] splits, float waves, int availableSlots, String inputName, boolean groupAcrossFiles, SplitLocationProvider locationProvider) throws Exception { + boolean isMinorCompaction = true; MapWork mapWork = populateMapWork(jobConf, inputName); // ArrayListMultimap is important here to retain the ordering for the splits. Multimap schemaGroupedSplitMultiMap = ArrayListMultimap. create(); - if (HiveConf.getVar(jobConf, HiveConf.ConfVars.SPLIT_GROUPING_MODE).equalsIgnoreCase("compactor")) { List paths = Utilities.getInputPathsTez(jobConf, mapWork); for (Path path : paths) { @@ -187,7 +190,9 @@ Operator op = mapWork.getAliasToWork().get(aliases.get(0)); if ((op != null) && (op instanceof TableScanOperator)) { TableScanOperator tableScan = (TableScanOperator) op; - if (!tableScan.getConf().isTranscationalTable()) { + PartitionDesc partitionDesc = mapWork.getAliasToPartnInfo().get(aliases.get(0)); + isMinorCompaction &= isQueryBasedMinorComp(partitionDesc); + if (!tableScan.getConf().isTranscationalTable() && !isMinorCompaction) { String splitPath = getFirstSplitPath(splits); String errorMessage = "Compactor split grouping is enabled only for transactional tables. Please check the path: " @@ -209,7 +214,7 @@ * Create a TezGroupedSplit for each bucketId and return. * TODO: Are there any other config values (split size etc) that can override this per writer split grouping? */ - return getCompactorSplitGroups(splits, conf); + return getCompactorSplitGroups(splits, conf, isMinorCompaction); } int i = 0; @@ -228,6 +233,25 @@ this.group(jobConf, schemaGroupedSplitMultiMap, availableSlots, waves, locationProvider); return groupedSplits; } + + /** + * Determines from the partition description, whether the table is used is used as compaction helper table. It looks + * for a table property 'queryminorcomp', which is set in + * {@link org.apache.hadoop.hive.ql.txn.compactor.MinorQueryCompactor} + * @param desc partition description of the table, must be not null + * @return true, if the table is a query based minor compaction helper table + */ + private boolean isQueryBasedMinorComp(PartitionDesc desc) { + if (desc != null) { + Properties tblProperties = desc.getTableDesc().getProperties(); + final String minorCompProperty = "queryminorcomp"; + if (tblProperties != null && tblProperties.containsKey(minorCompProperty) && tblProperties + .getProperty(minorCompProperty).equalsIgnoreCase("true")) { + return true; + } + } + return false; + } // Returns the path of the first split in this list for logging purposes private String getFirstSplitPath(InputSplit[] splits) { @@ -244,26 +268,40 @@ private String getFirstSplitPath(InputSplit[] splits) { * Takes a list of {@link org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit}s * and groups them for Acid Compactor, creating one TezGroupedSplit per bucket number. */ - Multimap getCompactorSplitGroups(InputSplit[] rawSplits, Configuration conf) { + Multimap getCompactorSplitGroups(InputSplit[] rawSplits, Configuration conf, + boolean isMinorCompaction) { // Note: For our case, this multimap will essentially contain one value (one TezGroupedSplit) per key Multimap bucketSplitMultiMap = ArrayListMultimap. create(); HiveInputFormat.HiveInputSplit[] splits = new HiveInputFormat.HiveInputSplit[rawSplits.length]; int i = 0; for (InputSplit is : rawSplits) { - splits[i++] = (HiveInputFormat.HiveInputSplit) is; + HiveInputFormat.HiveInputSplit hiveInputSplit = (HiveInputFormat.HiveInputSplit) is; + OrcSplit o1 = (OrcSplit) hiveInputSplit.getInputSplit(); + try { + if (isMinorCompaction) { + o1.parse(conf, o1.getRootDir().getParent()); + } else { + o1.parse(conf); + } + } catch (IOException e) { + throw new RuntimeException(); + } + splits[i++] = hiveInputSplit; } - Arrays.sort(splits, new ComparatorCompactor(conf)); + Arrays.sort(splits, new ComparatorCompactor()); TezGroupedSplit tgs = null; int previousWriterId = Integer.MIN_VALUE; Path rootDir = null; for (i = 0; i < splits.length; i++) { int writerId = ((OrcSplit) splits[i].getInputSplit()).getBucketId(); - if (rootDir == null) { - rootDir = ((OrcSplit) splits[i].getInputSplit()).getRootDir(); - } - Path rootDirFromCurrentSplit = ((OrcSplit) splits[i].getInputSplit()).getRootDir(); - // These splits should belong to the same partition - assert rootDir == rootDirFromCurrentSplit; + if (!isMinorCompaction) { + if (rootDir == null) { + rootDir = ((OrcSplit) splits[i].getInputSplit()).getRootDir(); + } + Path rootDirFromCurrentSplit = ((OrcSplit) splits[i].getInputSplit()).getRootDir(); + // These splits should belong to the same partition + assert rootDir.equals(rootDirFromCurrentSplit); + } if (writerId != previousWriterId) { // Create a new grouped split for this writerId tgs = new TezGroupedSplit(1, "org.apache.hadoop.hive.ql.io.HiveInputFormat", null, null); @@ -275,12 +313,8 @@ private String getFirstSplitPath(InputSplit[] splits) { return bucketSplitMultiMap; } - static class ComparatorCompactor implements Comparator { - private Configuration conf; - private ComparatorCompactor(Configuration conf) { - this.conf = conf; - } - + static class ComparatorCompactor implements Comparator, Serializable { + @Override public int compare(HiveInputFormat.HiveInputSplit h1, HiveInputFormat.HiveInputSplit h2) { //sort: bucketId,writeId,stmtId,rowIdOffset,splitStart @@ -289,12 +323,6 @@ public int compare(HiveInputFormat.HiveInputSplit h1, HiveInputFormat.HiveInputS } OrcSplit o1 = (OrcSplit)h1.getInputSplit(); OrcSplit o2 = (OrcSplit)h2.getInputSplit(); - try { - o1.parse(conf); - o2.parse(conf); - } catch(IOException ex) { - throw new RuntimeException(ex); - } // Note: this is the bucket number as seen in the file name. // Hive 3.0 encodes a bunch of info in the Acid schema's bucketId attribute. // See: {@link org.apache.hadoop.hive.ql.io.BucketCodec.V1} for details. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index dde8878769..812da83c4c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -32,6 +32,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.regex.Pattern; @@ -68,6 +69,7 @@ import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; import org.apache.hadoop.hive.ql.io.orc.Reader; +import org.apache.hadoop.hive.ql.io.orc.RecordReader; import org.apache.hadoop.hive.ql.io.orc.Writer; import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.lockmgr.LockException; @@ -79,9 +81,11 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.io.IntWritable; import org.apache.hive.common.util.Ref; import org.apache.orc.FileFormatException; import org.apache.orc.impl.OrcAcidUtils; @@ -422,6 +426,27 @@ public static int parseBucketId(Path bucketFile) { return -1; } + /** + * Read the first row of an ORC file and determine the bucket ID based on the bucket column. This only works with + * files with ACID schema. + * @param fs the resolved file system + * @param orcFile path to ORC file + * @return resolved bucket number + * @throws IOException during the parsing of the ORC file + */ + public static Optional parseBucketIdFromRow(FileSystem fs, Path orcFile) throws IOException { + Reader reader = OrcFile.createReader(fs, orcFile); + StructObjectInspector objectInspector = (StructObjectInspector)reader.getObjectInspector(); + RecordReader records = reader.rows(); + while(records.hasNext()) { + Object row = records.next(null); + List fields = objectInspector.getStructFieldsDataAsList(row); + int bucketProperty = ((IntWritable) fields.get(2)).get(); + return Optional.of(BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty)); + } + return Optional.empty(); + } + /** * Parse a bucket filename back into the options that would have created * the file. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 2ac6232460..b8a0f0465c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -1224,7 +1224,8 @@ static TransactionMetaData findWriteIDForSynthetcRowIDs(Path splitPath, Path roo } while(parent != null && !rootPath.equals(parent)) { boolean isBase = parent.getName().startsWith(AcidUtils.BASE_PREFIX); - boolean isDelta = parent.getName().startsWith(AcidUtils.DELTA_PREFIX); + boolean isDelta = parent.getName().startsWith(AcidUtils.DELTA_PREFIX) + || parent.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX); if(isBase || isDelta) { if(isBase) { return new TransactionMetaData(AcidUtils.ParsedBase.parseBase(parent).getWriteId(), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java index 0a96fc30b3..e71dc7d8b8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java @@ -338,8 +338,12 @@ public int getBucketId() { } public void parse(Configuration conf) throws IOException { + parse(conf, rootDir); + } + + public void parse(Configuration conf, Path rootPath) throws IOException { OrcRawRecordMerger.TransactionMetaData tmd = - OrcRawRecordMerger.TransactionMetaData.findWriteIDForSynthetcRowIDs(getPath(), rootDir, conf); + OrcRawRecordMerger.TransactionMetaData.findWriteIDForSynthetcRowIDs(getPath(), rootPath, conf); writeId = tmd.syntheticWriteId; stmtId = tmd.statementId; AcidOutputFormat.Options opt = AcidUtils.parseBaseOrDeltaBucketFilename(getPath(), conf); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index ad6817c32b..600ad323b0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -1211,12 +1211,6 @@ private void analyzeAlterTableCompact(ASTNode ast, TableName tableName, String type = unescapeSQLString(ast.getChild(0).getText()).toLowerCase(); - if (type.equalsIgnoreCase("minor") && HiveConf.getBoolVar(conf, ConfVars.COMPACTOR_CRUD_QUERY_BASED)) { - throw new SemanticException( - "Minor compaction is not currently supported for query based compaction (enabled by setting: " - + ConfVars.COMPACTOR_CRUD_QUERY_BASED + " to true)."); - } - if (!type.equals("minor") && !type.equals("major")) { throw new SemanticException(ErrorMsg.INVALID_COMPACTION_TYPE.getMsg()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index e26794355c..bb70db4524 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -25,7 +25,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.UUID; import java.util.regex.Matcher; import org.apache.hadoop.conf.Configuration; @@ -132,7 +131,7 @@ private JobConf createBaseJobConf(HiveConf conf, String jobName, Table t, Storag job.setOutputCommitter(CompactorOutputCommitter.class); job.set(FINAL_LOCATION, sd.getLocation()); - job.set(TMP_LOCATION, generateTmpPath(sd)); + job.set(TMP_LOCATION, QueryCompactor.Util.generateTmpPath(sd)); job.set(INPUT_FORMAT_CLASS_NAME, sd.getInputFormat()); job.set(OUTPUT_FORMAT_CLASS_NAME, sd.getOutputFormat()); job.setBoolean(IS_COMPRESSED, sd.isCompressed()); @@ -232,11 +231,10 @@ void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor } JobConf job = createBaseJobConf(conf, jobName, t, sd, writeIds, ci); + if (!QueryCompactor.Util.isEnoughToCompact(ci.isMajorCompaction(), dir, sd)) { + return; + } - // Figure out and encode what files we need to read. We do this before getSplits - // because as part of this we discover our minimum and maximum transactions, - // and discovering that in getSplits is too late as we then have no way to pass it to our - // mapper. List parsedDeltas = dir.getCurrentDirectories(); int maxDeltasToHandle = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_MAX_NUM_DELTA); if (parsedDeltas.size() > maxDeltasToHandle) { @@ -296,10 +294,6 @@ void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor su.gatherStats(); } - private String generateTmpPath(StorageDescriptor sd) { - return sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString(); - } - /** * @param baseDir if not null, it's either table/partition root folder or base_xxxx. * If it's base_xxxx, it's in dirsToSearch, else the actual original files diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 2253fda6c6..6017fd31b1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -106,12 +106,6 @@ public void run() { LOG.debug("Found " + potentials.size() + " potential compactions, " + "checking to see if we should compact any of them"); for (CompactionInfo ci : potentials) { - // Disable minor compaction for query based compactor - if (!ci.isMajorCompaction() && HiveConf.getBoolVar(conf, HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED)) { - LOG.debug("Not compacting: " + ci.getFullPartitionName() - + ", as query based compaction currently does not " + "support minor compactions."); - continue; - } LOG.info("Checking to see if we should compact " + ci.getFullPartitionName()); try { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java index 38689ef86c..f238eb5dd0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java @@ -17,112 +17,73 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; -import org.apache.commons.lang.exception.ExceptionUtils; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; +import com.google.common.collect.Lists; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; -import org.apache.hadoop.hive.ql.DriverUtils; -import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.security.UserGroupInformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.util.List; /** * Class responsible of running query based major compaction. */ -class MajorQueryCompactor extends QueryCompactor { - - private static final Logger LOG = LoggerFactory.getLogger(MajorQueryCompactor.class.getName()); +final class MajorQueryCompactor extends QueryCompactor { - @Override - void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor, + @Override void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor, ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException { AcidUtils .setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(table.getParameters())); - String user = UserGroupInformation.getCurrentUser().getShortUserName(); - SessionState sessionState = DriverUtils.setUpSessionState(hiveConf, user, true); - // Set up the session for driver. HiveConf conf = new HiveConf(hiveConf); + // Set up the session for driver. conf.set(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column"); /* * For now, we will group splits on tez so that we end up with all bucket files, * with same bucket number in one map task. */ conf.set(HiveConf.ConfVars.SPLIT_GROUPING_MODE.varname, "compactor"); + String tmpPrefix = table.getDbName() + "_tmp_compactor_" + table.getTableName() + "_"; String tmpTableName = tmpPrefix + System.currentTimeMillis(); - long compactorTxnId = CompactorMR.CompactorMap.getCompactorTxnId(conf); - try { - // Create a temporary table under the temp location --> db/tbl/ptn/_tmp_1234/db.tmp_compactor_tbl_1234 - String query = buildCrudMajorCompactionCreateTableQuery(tmpTableName, table); - LOG.info("Running major compaction query into temp table with create definition: {}", query); - try { - DriverUtils.runOnDriver(conf, user, sessionState, query); - } catch (Exception ex) { - Throwable cause = ex; - while (cause != null && !(cause instanceof AlreadyExistsException)) { - cause = cause.getCause(); - } - if (cause == null) { - throw new IOException(ex); - } - } - query = buildCrudMajorCompactionQuery(table, partition, tmpTableName); - LOG.info("Running major compaction via query: {}", query); - /* - * This will create bucket files like: - * db/db_tmp_compactor_tbl_1234/00000_0 - * db/db_tmp_compactor_tbl_1234/00001_0 - */ - DriverUtils.runOnDriver(conf, user, sessionState, query, writeIds, compactorTxnId); - /* - * This achieves a final layout like (wid is the highest valid write id for this major compaction): - * db/tbl/ptn/base_wid/bucket_00000 - * db/tbl/ptn/base_wid/bucket_00001 - */ - org.apache.hadoop.hive.ql.metadata.Table tempTable = Hive.get().getTable(tmpTableName); - String tmpLocation = tempTable.getSd().getLocation(); - commitCrudMajorCompaction(tmpLocation, tmpTableName, storageDescriptor.getLocation(), conf, writeIds, - compactorTxnId); - } catch (HiveException e) { - LOG.error("Error doing query based major compaction", e); - throw new IOException(e); - } finally { - try { - DriverUtils.runOnDriver(conf, user, sessionState, "drop table if exists " + tmpTableName); - } catch (HiveException e) { - LOG.error("Unable to delete drop temp table {} which was created for running major compaction", tmpTableName); - LOG.error(ExceptionUtils.getStackTrace(e)); - } - } + List createQueries = getCreateQueries(tmpTableName, table); + List compactionQueries = getCompactionQueries(table, partition, tmpTableName); + List dropQueries = getDropQueries(tmpTableName); + runCompactionQueries(conf, tmpTableName, storageDescriptor, writeIds, compactionInfo, createQueries, + compactionQueries, dropQueries); + } + + /** + * Move and rename bucket files from the temp table (tmpTableName), to the new base path under the source table/ptn. + * Since the temp table is a non-transactional table, it has file names in the "original" format. + * Also, due to split grouping in + * {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorSplitGroups(InputSplit[], + * Configuration, boolean)}, we will end up with one file per bucket. + */ + @Override protected void commitCompaction(String dest, String tmpTableName, HiveConf conf, + ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException, HiveException { + org.apache.hadoop.hive.ql.metadata.Table tempTable = Hive.get().getTable(tmpTableName); + Util.moveContents(new Path(tempTable.getSd().getLocation()), new Path(dest), true, false, conf, actualWriteIds, + compactorTxnId); } /** * Note on ordering of rows in the temp table: - * We need each final bucket file soreted by original write id (ascending), bucket (ascending) and row id (ascending). + * We need each final bucket file sorted by original write id (ascending), bucket (ascending) and row id (ascending). * (current write id will be the same as original write id). * We will be achieving the ordering via a custom split grouper for compactor. * See {@link org.apache.hadoop.hive.conf.HiveConf.ConfVars#SPLIT_GROUPING_MODE} for the config description. * See {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorSplitGroups(InputSplit[], Configuration)} * for details on the mechanism. */ - private String buildCrudMajorCompactionCreateTableQuery(String fullName, Table t) { + private List getCreateQueries(String fullName, Table t) { StringBuilder query = new StringBuilder("create temporary table ").append(fullName).append(" ("); // Acid virtual columns query.append( @@ -141,10 +102,10 @@ private String buildCrudMajorCompactionCreateTableQuery(String fullName, Table t query.append(">)"); query.append(" stored as orc"); query.append(" tblproperties ('transactional'='false')"); - return query.toString(); + return Lists.newArrayList(query.toString()); } - private String buildCrudMajorCompactionQuery(Table t, Partition p, String tmpName) { + private List getCompactionQueries(Table t, Partition p, String tmpName) { String fullName = t.getDbName() + "." + t.getTableName(); StringBuilder query = new StringBuilder("insert into table " + tmpName + " "); StringBuilder filter = new StringBuilder(); @@ -165,53 +126,10 @@ private String buildCrudMajorCompactionQuery(Table t, Partition p, String tmpNam query.append(i == 0 ? "'" : ", '").append(cols.get(i).getName()).append("', ").append(cols.get(i).getName()); } query.append(") from ").append(fullName).append(filter); - return query.toString(); + return Lists.newArrayList(query.toString()); } - /** - * Move and rename bucket files from the temp table (tmpTableName), to the new base path under the source table/ptn. - * Since the temp table is a non-transactional table, it has file names in the "original" format. - * Also, due to split grouping in - * {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorSplitGroups(InputSplit[], Configuration)}, - * we will end up with one file per bucket. - */ - private void commitCrudMajorCompaction(String from, String tmpTableName, String to, HiveConf conf, - ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException, HiveException { - Path fromPath = new Path(from); - Path toPath = new Path(to); - Path tmpTablePath = new Path(fromPath, tmpTableName); - FileSystem fs = fromPath.getFileSystem(conf); - // Assume the high watermark can be used as maximum transaction ID. - long maxTxn = actualWriteIds.getHighWatermark(); - // Get a base_wid path which will be the new compacted base - AcidOutputFormat.Options options = - new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false).maximumWriteId(maxTxn).bucket(0) - .statementId(-1); - Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent(); - if (!fs.exists(fromPath)) { - LOG.info("{} not found. Assuming 0 splits. Creating {}", from, newBaseDir); - fs.mkdirs(newBaseDir); - return; - } - LOG.info("Moving contents of {} to {}", tmpTablePath, to); - /* - * Currently mapping file with name 0000_0 to bucket_00000, 0000_1 to bucket_00001 and so on - * TODO/ToThink: - * Q. Can file with name 0000_0 under temp table be deterministically renamed to bucket_00000 in the destination? - */ - // List buckCols = t.getSd().getBucketCols(); - FileStatus[] children = fs.listStatus(fromPath); - for (FileStatus filestatus : children) { - String originalFileName = filestatus.getPath().getName(); - // This if() may not be required I think... - if (AcidUtils.ORIGINAL_PATTERN.matcher(originalFileName).matches()) { - int bucketId = AcidUtils.parseBucketId(filestatus.getPath()); - options = new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false).maximumWriteId(maxTxn) - .bucket(bucketId).statementId(-1).visibilityTxnId(compactorTxnId); - Path finalBucketFile = AcidUtils.createFilename(toPath, options); - Hive.moveFile(conf, filestatus.getPath(), finalBucketFile, true, false, false); - } - } - fs.delete(fromPath, true); + private List getDropQueries(String tmpTableName) { + return Lists.newArrayList("drop table if exists " + tmpTableName); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java new file mode 100644 index 0000000000..0eee0161a8 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ValidWriteIdList; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hive.common.util.Ref; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * Class responsible for handling query based minor compaction. + */ +final class MinorQueryCompactor extends QueryCompactor { + + public static final String MINOR_COMP_TBL_PROP = "queryminorcomp"; + private static final Logger LOG = LoggerFactory.getLogger(MinorQueryCompactor.class.getName()); + + @Override void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor, + ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException { + LOG.info("Running query based minor compaction"); + AcidUtils + .setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(table.getParameters())); + AcidUtils.Directory dir = AcidUtils + .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false, + table.getParameters(), false); + if (!Util.isEnoughToCompact(compactionInfo.isMajorCompaction(), dir, storageDescriptor)) { + return; + } + // Set up the session for driver. + HiveConf conf = new HiveConf(hiveConf); + conf.set(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column"); + conf.set(HiveConf.ConfVars.SPLIT_GROUPING_MODE.varname, "compactor"); + conf.setBoolVar(HiveConf.ConfVars.HIVE_STATS_FETCH_COLUMN_STATS, false); + conf.setBoolVar(HiveConf.ConfVars.HIVE_STATS_ESTIMATE_STATS, false); + String tmpTableName = + table.getDbName() + "_tmp_compactor_" + table.getTableName() + "_" + System.currentTimeMillis(); + List createQueries = getCreateQueries(table, tmpTableName, dir, writeIds); + List compactionQueries = getCompactionQueries(tmpTableName, writeIds.getInvalidWriteIds()); + List dropQueries = getDropQueries(tmpTableName); + + runCompactionQueries(conf, tmpTableName, storageDescriptor, writeIds, compactionInfo, createQueries, + compactionQueries, dropQueries); + } + + @Override protected void commitCompaction(String dest, String tmpTableName, HiveConf conf, + ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException, HiveException { + // get result temp tables; + String deltaTableName = AcidUtils.DELTA_PREFIX + tmpTableName + "_result"; + commitCompaction(deltaTableName, dest, false, conf, actualWriteIds, compactorTxnId); + + String deleteDeltaTableName = AcidUtils.DELETE_DELTA_PREFIX + tmpTableName + "_result"; + commitCompaction(deleteDeltaTableName, dest, true, conf, actualWriteIds, compactorTxnId); + } + + /** + * Get a list of create/alter table queries. These tables serves as temporary data source for query based + * minor compaction. The following tables are created: + *
    + *
  1. tmpDelta, tmpDeleteDelta - temporary, external, partitioned table, having the schema of an ORC ACID file. + * Each partition corresponds to exactly one delta/delete-delta directory
  2. + *
  3. tmpDeltaResult, tmpDeleteDeltaResult - temporary table which stores the aggregated results of the minor + * compaction query
  4. + *
+ * @param table the source table, where the compaction is running on + * @param tempTableBase an unique identifier which is used to create delta/delete-delta temp tables + * @param dir the directory, where the delta directories resides + * @param writeIds list of valid write ids, used to filter out delta directories which are not relevant for compaction + * @return list of create/alter queries, always non-null + */ + private List getCreateQueries(Table table, String tempTableBase, AcidUtils.Directory dir, + ValidWriteIdList writeIds) { + List queries = new ArrayList<>(); + // create delta temp table + String tmpTableName = AcidUtils.DELTA_PREFIX + tempTableBase; + queries.add(buildCreateTableQuery(table, tmpTableName, true, true, false)); + buildAlterTableQuery(tmpTableName, dir, writeIds, false).ifPresent(queries::add); + // create delta result temp table + queries.add(buildCreateTableQuery(table, tmpTableName + "_result", false, false, true)); + + // create delete delta temp tables + String tmpDeleteTableName = AcidUtils.DELETE_DELTA_PREFIX + tempTableBase; + queries.add(buildCreateTableQuery(table, tmpDeleteTableName, true, true, false)); + buildAlterTableQuery(tmpDeleteTableName, dir, writeIds, true).ifPresent(queries::add); + // create delete delta result temp table + queries.add(buildCreateTableQuery(table, tmpDeleteTableName + "_result", false, false, true)); + return queries; + } + + /** + * Helper method, which builds a create table query. The create query is customized based on the input arguments, but + * the schema of the table is the same as an ORC ACID file schema. + * @param table he source table, where the compaction is running on + * @param newTableName name of the table to be created + * @param isExternal true, if new table should be external + * @param isPartitioned true, if new table should be partitioned + * @param isBucketed true, if the new table should be bucketed + * @return a create table statement, always non-null. Example: + *

+ * if source table schema is: (a:int, b:int) + *

+ * the corresponding create statement is: + *

+ * CREATE TEMPORARY EXTERNAL TABLE tmp_table (`operation` int, `originalTransaction` bigint, `bucket` int, + * `rowId` bigint, `currentTransaction` bigint, `row` struct<`a` :int, `b` :int> PARTITIONED BY (`file_name` string) + * STORED AS ORC TBLPROPERTIES ('transactional'='false','queryminorcomp'='true'); + *

+ */ + private String buildCreateTableQuery(Table table, String newTableName, boolean isExternal, boolean isPartitioned, + boolean isBucketed) { + StringBuilder query = new StringBuilder("create temporary "); + if (isExternal) { + query.append("external "); + } + query.append("table ").append(newTableName).append(" ("); + // Acid virtual columns + query.append( + "`operation` int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, `currentTransaction` bigint, " + + "`row` struct<"); + List cols = table.getSd().getCols(); + boolean isFirst = true; + // Actual columns + for (FieldSchema col : cols) { + if (!isFirst) { + query.append(", "); + } + isFirst = false; + query.append("`").append(col.getName()).append("` ").append(":").append(col.getType()); + } + query.append(">)"); + if (isPartitioned) { + query.append(" partitioned by (`file_name` string)"); + } + int bucketingVersion = 0; + if (isBucketed) { + int numBuckets = 1; + try { + org.apache.hadoop.hive.ql.metadata.Table t = Hive.get().getTable(table.getDbName(), table.getTableName()); + numBuckets = Math.max(t.getNumBuckets(), numBuckets); + bucketingVersion = t.getBucketingVersion(); + } catch (HiveException e) { + LOG.info("Error finding table {}. Minor compaction result will use 0 buckets.", table.getTableName()); + } finally { + query.append(" clustered by (`bucket`)").append(" sorted by (`bucket`, `originalTransaction`, `rowId`)") + .append(" into ").append(numBuckets).append(" buckets"); + } + } + + query.append(" stored as orc"); + query.append(" tblproperties ('transactional'='false'"); + query.append(", '"); + query.append(MINOR_COMP_TBL_PROP); + query.append("'='true'"); + if (isBucketed) { + query.append(", 'bucketing_version'='") + .append(bucketingVersion) + .append("')"); + } else { + query.append(")"); + } + return query.toString(); + } + + /** + * Builds an alter table query, which adds partitions pointing to location of delta directories. + * @param tableName name of the to be altered table + * @param dir the parent directory of delta directories + * @param validWriteIdList list of valid write IDs + * @param isDeleteDelta if true, only the delete delta directories will be mapped as new partitions, otherwise only + * the delta directories + * @return alter table statement wrapped in {@link Optional}. + */ + private Optional buildAlterTableQuery(String tableName, AcidUtils.Directory dir, + ValidWriteIdList validWriteIdList, boolean isDeleteDelta) { + // add partitions + if (!dir.getCurrentDirectories().isEmpty()) { + long minWriteID = validWriteIdList.getMinOpenWriteId() == null ? 1 : validWriteIdList.getMinOpenWriteId(); + long highWatermark = validWriteIdList.getHighWatermark(); + List deltas = dir.getCurrentDirectories().stream().filter( + delta -> delta.isDeleteDelta() == isDeleteDelta && delta.getMaxWriteId() <= highWatermark + && delta.getMinWriteId() >= minWriteID) + .collect(Collectors.toList()); + if (!deltas.isEmpty()) { + StringBuilder query = new StringBuilder().append("alter table ").append(tableName); + query.append(" add "); + deltas.forEach( + delta -> query.append("partition (file_name='").append(delta.getPath().getName()).append("') location '") + .append(delta.getPath()).append("' ")); + return Optional.of(query.toString()); + } + } + return Optional.empty(); + } + + /** + * Get a list of compaction queries which fills up the delta/delete-delta temporary result tables. + * @param tmpTableBase an unique identifier, which helps to find all the temporary tables + * @param invalidWriteIds list of invalid write IDs. This list is used to filter out aborted/open transactions + * @return list of compaction queries, always non-null + */ + private List getCompactionQueries(String tmpTableBase, long[] invalidWriteIds) { + List queries = new ArrayList<>(); + String sourceTableName = AcidUtils.DELTA_PREFIX + tmpTableBase; + String resultTableName = sourceTableName + "_result"; + queries.add(buildCompactionQuery(sourceTableName, resultTableName, invalidWriteIds)); + String sourceDeleteTableName = AcidUtils.DELETE_DELTA_PREFIX + tmpTableBase; + String resultDeleteTableName = sourceDeleteTableName + "_result"; + queries.add(buildCompactionQuery(sourceDeleteTableName, resultDeleteTableName, invalidWriteIds)); + return queries; + } + + /** + * Build a minor compaction query. A compaction query selects the content of the source temporary table and inserts + * it into the result table, filtering out all rows which belong to open/aborted transactions. + * @param sourceTableName the name of the source table + * @param resultTableName the name of the result table + * @param invalidWriteIds list of invalid write IDs + * @return compaction query, always non-null + */ + private String buildCompactionQuery(String sourceTableName, String resultTableName, long[] invalidWriteIds) { + StringBuilder query = new StringBuilder().append("insert into table ").append(resultTableName) + .append(" select `operation`, `originalTransaction`, `bucket`, `rowId`, `currentTransaction`, `row` from ") + .append(sourceTableName); + if (invalidWriteIds.length > 0) { + query.append(" where `originalTransaction` not in (") + .append(StringUtils.join(ArrayUtils.toObject(invalidWriteIds), ",")).append(")"); + } + + return query.toString(); + } + + /** + * Get list of drop table statements. + * @param tmpTableBase an unique identifier, which helps to find all the tables used in query based minor compaction + * @return list of drop table statements, always non-null + */ + private List getDropQueries(String tmpTableBase) { + List queries = new ArrayList<>(); + String dropStm = "drop table if exists "; + queries.add(dropStm + AcidUtils.DELTA_PREFIX + tmpTableBase); + queries.add(dropStm + AcidUtils.DELETE_DELTA_PREFIX + tmpTableBase); + queries.add(dropStm + AcidUtils.DELTA_PREFIX + tmpTableBase + "_result"); + queries.add(dropStm + AcidUtils.DELETE_DELTA_PREFIX + tmpTableBase + "_result"); + return queries; + } + + /** + * Creates the delta directory and moves the result files. + * @param deltaTableName name of the temporary table, where the results are stored + * @param dest destination path, where the result should be moved + * @param isDeleteDelta is the destination a delete delta directory + * @param conf hive configuration + * @param actualWriteIds list of valid write Ids + * @param compactorTxnId transaction Id of the compaction + * @throws HiveException the result files cannot be moved + * @throws IOException the destination delta directory cannot be created + */ + private void commitCompaction(String deltaTableName, String dest, boolean isDeleteDelta, HiveConf conf, + ValidWriteIdList actualWriteIds, long compactorTxnId) throws HiveException, IOException { + org.apache.hadoop.hive.ql.metadata.Table deltaTable = Hive.get().getTable(deltaTableName); + Util.moveContents(new Path(deltaTable.getSd().getLocation()), new Path(dest), false, isDeleteDelta, conf, + actualWriteIds, compactorTxnId); + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java index 9b8420902f..bad5d00a8d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java @@ -17,15 +17,12 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; +import com.google.common.collect.Lists; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.StatsSetupConst; -import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.Partition; @@ -35,14 +32,12 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; -import org.apache.hadoop.hive.ql.DriverUtils; import org.apache.hadoop.hive.ql.ddl.table.create.show.ShowCreateTableOperation; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.util.DirectionUtils; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.apache.hive.common.util.HiveStringUtils; import org.apache.hive.common.util.Ref; @@ -52,7 +47,6 @@ import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.Modifier; -import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -61,12 +55,11 @@ /** * Class responsible to run query based major compaction on insert only tables. */ -class MmMajorQueryCompactor extends QueryCompactor { +final class MmMajorQueryCompactor extends QueryCompactor { private static final Logger LOG = LoggerFactory.getLogger(MmMajorQueryCompactor.class.getName()); - @Override - void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor, + @Override void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor, ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException { LOG.debug("Going to delete directories for aborted transactions for MM table " + table.getDbName() + "." + table .getTableName()); @@ -82,52 +75,58 @@ void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageD return; } - try { - String tmpLocation = Util.generateTmpPath(storageDescriptor); - Path baseLocation = new Path(tmpLocation, "_base"); + if (!Util.isEnoughToCompact(compactionInfo.isMajorCompaction(), dir, storageDescriptor)) { + return; + } - // Set up the session for driver. - HiveConf driverConf = new HiveConf(hiveConf); - driverConf.set(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column"); - driverConf.unset(ValidTxnList.VALID_TXNS_KEY); //so Driver doesn't get confused - //thinking it already has a txn opened + String tmpLocation = Util.generateTmpPath(storageDescriptor); + Path baseLocation = new Path(tmpLocation, "_base"); - String user = UserGroupInformation.getCurrentUser().getShortUserName(); - SessionState sessionState = DriverUtils.setUpSessionState(driverConf, user, true); + // Set up the session for driver. + HiveConf driverConf = new HiveConf(hiveConf); + driverConf.set(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column"); - // Note: we could skip creating the table and just add table type stuff directly to the - // "insert overwrite directory" command if there were no bucketing or list bucketing. - String tmpPrefix = table.getDbName() + ".tmp_compactor_" + table.getTableName() + "_"; - String tmpTableName; - while (true) { - tmpTableName = tmpPrefix + System.currentTimeMillis(); - String query = - buildMmCompactionCtQuery(tmpTableName, table, partition == null ? table.getSd() : partition.getSd(), - baseLocation.toString()); - LOG.info("Compacting a MM table into " + query); - try { - DriverUtils.runOnDriver(driverConf, user, sessionState, query); - break; - } catch (Exception ex) { - Throwable cause = ex; - while (cause != null && !(cause instanceof AlreadyExistsException)) { - cause = cause.getCause(); - } - if (cause == null) { - throw new IOException(ex); - } - } - } - String query = buildMmCompactionQuery(table, partition, tmpTableName); - LOG.info("Compacting a MM table via " + query); - long compactorTxnId = CompactorMR.CompactorMap.getCompactorTxnId(hiveConf); - DriverUtils.runOnDriver(driverConf, user, sessionState, query, writeIds, compactorTxnId); - commitMmCompaction(tmpLocation, storageDescriptor.getLocation(), hiveConf, writeIds, compactorTxnId); - DriverUtils.runOnDriver(driverConf, user, sessionState, "drop table if exists " + tmpTableName); - } catch (HiveException e) { - LOG.error("Error compacting a MM table", e); - throw new IOException(e); + // Note: we could skip creating the table and just add table type stuff directly to the + // "insert overwrite directory" command if there were no bucketing or list bucketing. + String tmpPrefix = table.getDbName() + ".tmp_compactor_" + table.getTableName() + "_"; + String tmpTableName = tmpPrefix + System.currentTimeMillis(); + List createTableQueries = + getCreateQueries(tmpTableName, table, partition == null ? table.getSd() : partition.getSd(), + baseLocation.toString()); + List compactionQueries = getCompactionQueries(table, partition, tmpTableName); + List dropQueries = getDropQueries(tmpTableName); + runCompactionQueries(driverConf, tmpTableName, storageDescriptor, writeIds, compactionInfo, + createTableQueries, compactionQueries, dropQueries); + } + + /** + * Note: similar logic to the main committer; however, no ORC versions and stuff like that. + * @param dest The final directory; basically a SD directory. Not the actual base/delta. + * @param compactorTxnId txn that the compactor started + */ + @Override + protected void commitCompaction(String dest, String tmpTableName, HiveConf conf, + ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException, HiveException { + org.apache.hadoop.hive.ql.metadata.Table tempTable = Hive.get().getTable(tmpTableName); + String from = tempTable.getSd().getLocation(); + Path fromPath = new Path(from), toPath = new Path(dest); + FileSystem fs = fromPath.getFileSystem(conf); + // Assume the high watermark can be used as maximum transaction ID. + //todo: is that true? can it be aborted? does it matter for compaction? probably OK since + //getAcidState() doesn't check if X is valid in base_X_vY for compacted base dirs. + long maxTxn = actualWriteIds.getHighWatermark(); + AcidOutputFormat.Options options = + new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false).maximumWriteId(maxTxn).bucket(0) + .statementId(-1).visibilityTxnId(compactorTxnId); + Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent(); + if (!fs.exists(fromPath)) { + LOG.info(from + " not found. Assuming 0 splits. Creating " + newBaseDir); + fs.mkdirs(newBaseDir); + return; } + LOG.info("Moving contents of " + from + " to " + dest); + fs.rename(fromPath, newBaseDir); + fs.delete(fromPath, true); } // Remove the directories for aborted transactions only @@ -145,7 +144,7 @@ private void removeFilesForMmTable(HiveConf conf, AcidUtils.Directory dir) throw } } - private String buildMmCompactionCtQuery(String fullName, Table t, StorageDescriptor sd, String location) { + private List getCreateQueries(String fullName, Table t, StorageDescriptor sd, String location) { StringBuilder query = new StringBuilder("create temporary table ").append(fullName).append("("); List cols = t.getSd().getCols(); boolean isFirst = true; @@ -229,11 +228,11 @@ private String buildMmCompactionCtQuery(String fullName, Table t, StorageDescrip query.append(", "); } query.append("'transactional'='false')"); - return query.toString(); + return Lists.newArrayList(query.toString()); } - private String buildMmCompactionQuery(Table t, Partition p, String tmpName) { + private List getCompactionQueries(Table t, Partition p, String tmpName) { String fullName = t.getDbName() + "." + t.getTableName(); // ideally we should make a special form of insert overwrite so that we: // 1) Could use fast merge path for ORC and RC. @@ -260,40 +259,11 @@ private String buildMmCompactionQuery(Table t, Partition p, String tmpName) { query.append("select *"); } query.append(" from ").append(fullName).append(filter); - return query.toString(); + return Lists.newArrayList(query.toString()); } - /** - * Note: similar logic to the main committer; however, no ORC versions and stuff like that. - * @param from The temp directory used for compactor output. Not the actual base/delta. - * @param to The final directory; basically a SD directory. Not the actual base/delta. - * @param compactorTxnId txn that the compactor started - */ - private void commitMmCompaction(String from, String to, Configuration conf, ValidWriteIdList actualWriteIds, - long compactorTxnId) throws IOException { - Path fromPath = new Path(from), toPath = new Path(to); - FileSystem fs = fromPath.getFileSystem(conf); - // Assume the high watermark can be used as maximum transaction ID. - //todo: is that true? can it be aborted? does it matter for compaction? probably OK since - //getAcidState() doesn't check if X is valid in base_X_vY for compacted base dirs. - long maxTxn = actualWriteIds.getHighWatermark(); - AcidOutputFormat.Options options = - new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false).maximumWriteId(maxTxn).bucket(0) - .statementId(-1).visibilityTxnId(compactorTxnId); - Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent(); - if (!fs.exists(fromPath)) { - LOG.info(from + " not found. Assuming 0 splits. Creating " + newBaseDir); - fs.mkdirs(newBaseDir); - return; - } - LOG.info("Moving contents of " + from + " to " + to); - FileStatus[] children = fs.listStatus(fromPath); - if (children.length != 1) { - throw new IOException("Unexpected files in the source: " + Arrays.toString(children)); - } - FileStatus dirPath = children[0]; - fs.rename(dirPath.getPath(), newBaseDir); - fs.delete(fromPath, true); + private List getDropQueries(String tmpTableName) { + return Lists.newArrayList("drop table if exists " + tmpTableName); } private static Set getHiveMetastoreConstants() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java index 1eab5b888d..9896df3bb6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java @@ -17,18 +17,31 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.ql.DriverUtils; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors; @@ -53,6 +66,78 @@ abstract void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor, ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException; + /** + * This is the final step of the compaction, which can vary based on compaction type. Usually this involves some file + * operation. + * @param dest The final directory; basically an SD directory. + * @param tmpTableName The name of the temporary table. + * @param conf hive configuration. + * @param actualWriteIds valid write Ids used to fetch the high watermark Id. + * @param compactorTxnId transaction, that the compacter started. + * @throws IOException failed to execute file system operation. + * @throws HiveException failed to execute file operation within hive. + */ + protected abstract void commitCompaction(String dest, String tmpTableName, HiveConf conf, + ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException, HiveException; + + /** + * Run all the queries which performs the compaction. + * @param conf hive configuration, must be not null. + * @param tmpTableName The name of the temporary table. + * @param storageDescriptor this is the resolved storage descriptor. + * @param writeIds valid write IDs used to filter rows while they're being read for compaction. + * @param compactionInfo provides info about the type of compaction. + * @param createQueries collection of queries which creates the temporary tables. + * @param compactionQueries collection of queries which uses data from the original table and writes in temporary + * tables. + * @param dropQueries queries which drops the temporary tables. + * @throws IOException error during the run of the compaction. + */ + protected void runCompactionQueries(HiveConf conf, String tmpTableName, StorageDescriptor storageDescriptor, + ValidWriteIdList writeIds, CompactionInfo compactionInfo, List createQueries, + List compactionQueries, List dropQueries) throws IOException { + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + SessionState sessionState = DriverUtils.setUpSessionState(conf, user, true); + long compactorTxnId = CompactorMR.CompactorMap.getCompactorTxnId(conf); + try { + for (String query : createQueries) { + try { + LOG.info("Running {} compaction query into temp table with query: {}", + compactionInfo.isMajorCompaction() ? "major" : "minor", query); + DriverUtils.runOnDriver(conf, user, sessionState, query); + } catch (Exception ex) { + Throwable cause = ex; + while (cause != null && !(cause instanceof AlreadyExistsException)) { + cause = cause.getCause(); + } + if (cause == null) { + throw new IOException(ex); + } + } + } + for (String query : compactionQueries) { + LOG.info("Running {} compaction via query: {}", compactionInfo.isMajorCompaction() ? "major" : "minor", query); + DriverUtils.runOnDriver(conf, user, sessionState, query, writeIds, compactorTxnId); + } + commitCompaction(storageDescriptor.getLocation(), tmpTableName, conf, writeIds, compactorTxnId); + } catch (HiveException e) { + LOG.error("Error doing query based {} compaction", compactionInfo.isMajorCompaction() ? "major" : "minor", e); + throw new IOException(e); + } finally { + try { + for (String query : dropQueries) { + LOG.info("Running {} compaction query into temp table with query: {}", + compactionInfo.isMajorCompaction() ? "major" : "minor", query); + DriverUtils.runOnDriver(conf, user, sessionState, query); + } + } catch (HiveException e) { + LOG.error("Unable to drop temp table {} which was created for running {} compaction", tmpTableName, + compactionInfo.isMajorCompaction() ? "major" : "minor"); + LOG.error(ExceptionUtils.getStackTrace(e)); + } + } + } + /** * Collection of some helper functions. */ @@ -120,5 +205,76 @@ public static boolean needsCleaning(AcidUtils.Directory dir, StorageDescriptor s static String generateTmpPath(StorageDescriptor sd) { return sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString(); } + + /** + * Check whether the result directory exits and contains compacted result files. If no splits are found, create + * an empty directory at the destination path, matching a base/delta directory naming convention. + * @param sourcePath the checked source location + * @param destPath the destination, where the new directory should be created + * @param isMajorCompaction is called from a major compaction + * @param isDeleteDelta is the output used as delete delta directory + * @param conf hive configuration + * @param validWriteIdList maximum transaction id + * @return true, if the check was successful + * @throws IOException the new directory cannot be created + */ + private static boolean resultHasSplits(Path sourcePath, Path destPath, boolean isMajorCompaction, + boolean isDeleteDelta, HiveConf conf, ValidWriteIdList validWriteIdList) throws IOException { + FileSystem fs = sourcePath.getFileSystem(conf); + long minOpenWriteId = validWriteIdList.getMinOpenWriteId() == null ? 1 : validWriteIdList.getMinOpenWriteId(); + long highWatermark = validWriteIdList.getHighWatermark(); + AcidOutputFormat.Options options = + new AcidOutputFormat.Options(conf).writingBase(isMajorCompaction).writingDeleteDelta(isDeleteDelta) + .isCompressed(false).minimumWriteId(minOpenWriteId) + .maximumWriteId(highWatermark).bucket(0).statementId(-1); + Path newDeltaDir = AcidUtils.createFilename(destPath, options).getParent(); + if (!fs.exists(sourcePath)) { + LOG.info("{} not found. Assuming 0 splits. Creating {}", sourcePath, newDeltaDir); + fs.mkdirs(newDeltaDir); + return false; + } + return true; + } + + /** + * Create the base/delta directory matching the naming conventions and move the result files of the compaction + * into it. + * @param sourcePath location of the result files + * @param destPath destination path of the result files, without the base/delta directory + * @param isMajorCompaction is this called from a major compaction + * @param isDeleteDelta is the destination is a delete delta directory + * @param conf hive configuration + * @param validWriteIdList list of valid write Ids + * @param compactorTxnId transaction Id of the compaction + * @throws IOException the destination directory cannot be created + * @throws HiveException the result files cannot be moved to the destination directory + */ + static void moveContents(Path sourcePath, Path destPath, boolean isMajorCompaction, boolean isDeleteDelta, + HiveConf conf, ValidWriteIdList validWriteIdList, long compactorTxnId) throws IOException, HiveException { + if (!resultHasSplits(sourcePath, destPath, isMajorCompaction, isDeleteDelta, conf, validWriteIdList)) { + return; + } + LOG.info("Moving contents of {} to {}", sourcePath, destPath); + FileSystem fs = sourcePath.getFileSystem(conf); + long minOpenWriteId = validWriteIdList.getMinOpenWriteId() == null ? 1 : validWriteIdList.getMinOpenWriteId(); + long highWatermark = validWriteIdList.getHighWatermark(); + for (FileStatus fileStatus : fs.listStatus(sourcePath)) { + String originalFileName = fileStatus.getPath().getName(); + if (AcidUtils.ORIGINAL_PATTERN.matcher(originalFileName).matches()) { + Optional bucketId = AcidUtils.parseBucketIdFromRow(fs, fileStatus.getPath()); + if (bucketId.isPresent()) { + AcidOutputFormat.Options options = + new AcidOutputFormat.Options(conf).writingBase(isMajorCompaction).writingDeleteDelta(isDeleteDelta) + .isCompressed(false).minimumWriteId(minOpenWriteId) + .maximumWriteId(highWatermark).bucket(bucketId.get()).statementId(-1) + .visibilityTxnId(compactorTxnId); + Path finalBucketFile = AcidUtils.createFilename(destPath, options); + Hive.moveFile(conf, fileStatus.getPath(), finalBucketFile, true, false, false); + } + } + } + fs.delete(sourcePath, true); + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java index 41cb4b64fb..2f2bb21a13 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java @@ -27,11 +27,22 @@ */ final class QueryCompactorFactory { + /** + * Factory class, no need to expose constructor. + */ private QueryCompactorFactory() { } /** - * Get an instance of {@link QueryCompactor}. + * Get an instance of {@link QueryCompactor}. At the moment the following implementors can be fetched: + *

+ * {@link MajorQueryCompactor} - handles query based major compaction + *
+ * {@link MinorQueryCompactor} - handles query based minor compaction + *
+ * {@link MmMajorQueryCompactor} - handles query based minor compaction for micro-managed tables + *
+ *

* @param table the table, on which the compaction should be running, must be not null. * @param configuration the hive configuration, must be not null. * @param compactionInfo provides insight about the type of compaction, must be not null. @@ -42,8 +53,10 @@ static QueryCompactor getQueryCompactor(Table table, HiveConf configuration, Com .getBoolVar(configuration, HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED)) { if (compactionInfo.isMajorCompaction()) { return new MajorQueryCompactor(); - } else { - throw new RuntimeException("Query based compaction is not currently supported for minor compactions"); + } else if (!compactionInfo.isMajorCompaction() && "tez" + .equalsIgnoreCase(HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE))) { + // query based minor compaction is only supported on tez + return new MinorQueryCompactor(); } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java index d4c9121c9f..aabf15c8d4 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java @@ -506,11 +506,12 @@ public void testOperationsOnCompletedTxnComponentsForMmTable() throws Exception Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXNS"), 0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS")); - // Initiate a minor compaction request on the table. + // Initiate a major compaction request on the table. runStatementOnDriver("alter table " + TableExtended.MMTBL + " compact 'MAJOR'"); // Run worker. runWorker(hiveConf); + verifyDirAndResult(2, true); // Run Cleaner. runCleaner(hiveConf); @@ -519,6 +520,7 @@ public void testOperationsOnCompletedTxnComponentsForMmTable() throws Exception TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from COMPLETED_TXN_COMPONENTS")); Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXNS"), 0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS")); + verifyDirAndResult(0, true); } @Test @@ -612,19 +614,32 @@ public void testSnapshotIsolationWithAbortedTxnOnMmTable() throws Exception { } private void verifyDirAndResult(int expectedDeltas) throws Exception { + verifyDirAndResult(expectedDeltas, false); + } + private void verifyDirAndResult(int expectedDeltas, boolean expectBaseDir) throws Exception { FileSystem fs = FileSystem.get(hiveConf); // Verify the content of subdirs FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + (TableExtended.MMTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); int sawDeltaTimes = 0; + int sawBaseTimes = 0; for (int i = 0; i < status.length; i++) { - Assert.assertTrue(status[i].getPath().getName().matches("delta_.*")); - sawDeltaTimes++; - FileStatus[] files = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - Assert.assertEquals(1, files.length); - Assert.assertTrue(files[0].getPath().getName().equals("000000_0")); + if (status[i].getPath().getName().matches("delta_.*")) { + sawDeltaTimes++; + FileStatus[] files = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); + Assert.assertEquals(1, files.length); + Assert.assertEquals("000000_0", files[0].getPath().getName()); + } else { + sawBaseTimes++; + } } + Assert.assertEquals(expectedDeltas, sawDeltaTimes); + if (expectBaseDir) { + Assert.assertEquals("1 base directory expected", 1, sawBaseTimes); + } else { + Assert.assertEquals("0 base directories expected", 0, sawBaseTimes); + } // Verify query result int [][] resultData = new int[][] {{1,2}, {3,4}}; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForOrcMmTable.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForOrcMmTable.java index d6435342aa..1599548da0 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForOrcMmTable.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForOrcMmTable.java @@ -18,9 +18,7 @@ package org.apache.hadoop.hive.ql; -import org.apache.hadoop.hive.conf.HiveConf; import org.junit.Before; -import org.junit.Test; /** * Same as TestTxnCommands2 but tests ACID tables with vectorization turned on by