diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index e92466f..657175b 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -18,7 +18,32 @@ package org.apache.hadoop.hive.conf; -import com.google.common.base.Joiner; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintStream; +import java.io.UnsupportedEncodingException; +import java.net.URI; +import java.net.URL; +import java.net.URLDecoder; +import java.net.URLEncoder; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import javax.security.auth.login.LoginException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -42,27 +67,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.security.auth.login.LoginException; - -import java.io.*; -import java.net.URI; -import java.net.URL; -import java.net.URLDecoder; -import java.net.URLEncoder; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; +import com.google.common.base.Joiner; /** * Hive Configuration. @@ -260,6 +265,7 @@ private static URL checkConfigFile(File f) { HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, HiveConf.ConfVars.HIVE_TXN_MANAGER, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, + HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES, HiveConf.ConfVars.HIVE_TXN_HEARTBEAT_THREADPOOL_SIZE, HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH, HiveConf.ConfVars.HIVE_TXN_RETRYABLE_SQLEX_REGEX, @@ -1757,6 +1763,12 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal " of the lock manager is dumped to log file. This is for debugging. See also " + "hive.lock.numretries and hive.lock.sleep.between.retries."), + HIVE_TXN_OPERATIONAL_PROPERTIES("hive.txn.operational.properties", 0, + "Sets the operational properties that control the appropriate behavior for various " + + "versions of Hive ACID subsystem. Setting it to zero will turn on the legacy mode for " + + "ACID, while setting it to one will enable split-update feature found in newer version " + + "of Hive ACID subsystem. (See HIVE-14035 for details)"), + HIVE_MAX_OPEN_TXNS("hive.max.open.txns", 100000, "Maximum number of open transactions. If \n" + "current open transactions reach this limit, future open transaction requests will be \n" + "rejected, until this number goes below the limit."), diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java index 14f7316..b970153 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java @@ -133,6 +133,9 @@ public void configureInputJobProperties(TableDesc tableDesc, boolean isAcidTable = AcidUtils.isTablePropertyTransactional(tableProperties); AcidUtils.setTransactionalTableScan(jobProperties, isAcidTable); + AcidUtils.AcidOperationalProperties acidOperationalProperties = + AcidUtils.getAcidOperationalProperties(tableProperties); + AcidUtils.setAcidOperationalProperties(jobProperties, acidOperationalProperties); } } catch (IOException e) { throw new IllegalStateException("Failed to set output path", e); diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java index 974c6b8..b8615cb 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java @@ -48,6 +48,7 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.Properties; public abstract class AbstractRecordWriter implements RecordWriter { @@ -265,10 +266,16 @@ public void closeBatch() throws StreamingIOFailure { private RecordUpdater createRecordUpdater(int bucketId, Long minTxnId, Long maxTxnID) throws IOException, SerializationError { try { + // Initialize table properties from the table parameters. This is required because the table + // may define certain table parameters that may be required while writing. The table parameter + // 'transactional_properties' is one such example. + Properties tblProperties = new Properties(); + tblProperties.putAll(tbl.getParameters()); return outf.getRecordUpdater(partitionPath, new AcidOutputFormat.Options(conf) .inspector(getSerde().getObjectInspector()) .bucket(bucketId) + .tableProperties(tblProperties) .minimumTransactionId(minTxnId) .maximumTransactionId(maxTxnID) .statementId(-1) diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index ca2a912..b37fe65 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -17,6 +17,19 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -68,20 +81,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.TimeUnit; - /** */ public class TestCompactor { @@ -857,6 +856,297 @@ public void majorCompactAfterAbort() throws Exception { } } + @Test + public void majorCompactWhileStreamingForSplitUpdate() throws Exception { + String dbName = "default"; + String tblName = "cws"; + List colNames = Arrays.asList("a", "b"); + String columnNamesProperty = "a,b"; + String columnTypesProperty = "int:string"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + + " CLUSTERED BY(a) INTO 2 BUCKETS" + //currently ACID requires table to be bucketed + " STORED AS ORC TBLPROPERTIES ('transactional'='true', " + + "'transactional_properties'='default') ", driver); // this turns on split-update U=D+I + + HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null); + DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); + try { + // Write a couple of batches + for (int i = 0; i < 2; i++) { + writeBatch(connection, writer, false); + } + + // Start a third batch, but don't close it. + writeBatch(connection, writer, true); + + // Now, compact + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR)); + Worker t = new Worker(); + t.setThreadId((int) t.getId()); + t.setHiveConf(conf); + AtomicBoolean stop = new AtomicBoolean(true); + AtomicBoolean looped = new AtomicBoolean(); + t.init(stop, looped); + t.run(); + + // Find the location of the table + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + Table table = msClient.getTable(dbName, tblName); + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = + fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.baseFileFilter); + if (1 != stat.length) { + Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat)); + } + String name = stat[0].getPath().getName(); + Assert.assertEquals(name, "base_0000004"); + checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L); + } finally { + connection.close(); + } + } + + @Test + public void testMinorCompactionForSplitUpdateWithInsertsAndDeletes() throws Exception { + String agentInfo = "UT_" + Thread.currentThread().getName(); + String dbName = "default"; + String tblName = "cws"; + List colNames = Arrays.asList("a", "b"); + String columnNamesProperty = "a,b"; + String columnTypesProperty = "int:string"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed + " STORED AS ORC TBLPROPERTIES ('transactional'='true'," + + "'transactional_properties'='default')", driver); + + // Insert some data -> this will generate only insert deltas and no delete deltas: delta_1_1 + executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(1, 'foo')", driver); + + // Insert some data -> this will again generate only insert deltas and no delete deltas: delta_2_2 + executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(2, 'bar')", driver); + + // Delete some data -> this will generate only delete deltas and no insert deltas: delete_delta_3_3 + executeStatementOnDriver("DELETE FROM " + tblName +" WHERE a = 2", driver); + + // Now, compact -> Compaction produces a single range for both delta and delete delta + // That is, both delta and delete_deltas would be compacted into delta_1_3 and delete_delta_1_3 + // even though there are only two delta_1_1, delta_2_2 and one delete_delta_3_3. + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR)); + Worker t = new Worker(); + t.setThreadId((int) t.getId()); + t.setHiveConf(conf); + AtomicBoolean stop = new AtomicBoolean(true); + AtomicBoolean looped = new AtomicBoolean(); + t.init(stop, looped); + t.run(); + + // Find the location of the table + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + Table table = msClient.getTable(dbName, tblName); + FileSystem fs = FileSystem.get(conf); + + // Verify that we have got correct set of deltas. + FileStatus[] stat = + fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deltaFileFilter); + String[] deltas = new String[stat.length]; + Path minorCompactedDelta = null; + for (int i = 0; i < deltas.length; i++) { + deltas[i] = stat[i].getPath().getName(); + if (deltas[i].equals("delta_0000001_0000003")) { + minorCompactedDelta = stat[i].getPath(); + } + } + Arrays.sort(deltas); + String[] expectedDeltas = new String[]{"delta_0000001_0000001_0000", "delta_0000001_0000003", "delta_0000002_0000002_0000"}; + if (!Arrays.deepEquals(expectedDeltas, deltas)) { + Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas)); + } + checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 1L, 2L); + + // Verify that we have got correct set of delete_deltas. + FileStatus[] deleteDeltaStat = + fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deleteEventDeltaDirFilter); + String[] deleteDeltas = new String[deleteDeltaStat.length]; + Path minorCompactedDeleteDelta = null; + for (int i = 0; i < deleteDeltas.length; i++) { + deleteDeltas[i] = deleteDeltaStat[i].getPath().getName(); + if (deleteDeltas[i].equals("delete_delta_0000001_0000003")) { + minorCompactedDeleteDelta = deleteDeltaStat[i].getPath(); + } + } + Arrays.sort(deleteDeltas); + String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000003", "delete_delta_0000003_0000003_0000"}; + if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) { + Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas)); + } + checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 2L, 2L); + } + + @Test + public void testMinorCompactionForSplitUpdateWithOnlyInserts() throws Exception { + String agentInfo = "UT_" + Thread.currentThread().getName(); + String dbName = "default"; + String tblName = "cws"; + List colNames = Arrays.asList("a", "b"); + String columnNamesProperty = "a,b"; + String columnTypesProperty = "int:string"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed + " STORED AS ORC TBLPROPERTIES ('transactional'='true'," + + "'transactional_properties'='default')", driver); + + // Insert some data -> this will generate only insert deltas and no delete deltas: delta_1_1 + executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(1, 'foo')", driver); + + // Insert some data -> this will again generate only insert deltas and no delete deltas: delta_2_2 + executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(2, 'bar')", driver); + + // Now, compact + // One important thing to note in this test is that minor compaction always produces + // delta_x_y and a counterpart delete_delta_x_y, even when there are no delete_delta events. + // Such a choice has been made to simplify processing of AcidUtils.getAcidState(). + + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR)); + Worker t = new Worker(); + t.setThreadId((int) t.getId()); + t.setHiveConf(conf); + AtomicBoolean stop = new AtomicBoolean(true); + AtomicBoolean looped = new AtomicBoolean(); + t.init(stop, looped); + t.run(); + + // Find the location of the table + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + Table table = msClient.getTable(dbName, tblName); + FileSystem fs = FileSystem.get(conf); + + // Verify that we have got correct set of deltas. + FileStatus[] stat = + fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deltaFileFilter); + String[] deltas = new String[stat.length]; + Path minorCompactedDelta = null; + for (int i = 0; i < deltas.length; i++) { + deltas[i] = stat[i].getPath().getName(); + if (deltas[i].equals("delta_0000001_0000002")) { + minorCompactedDelta = stat[i].getPath(); + } + } + Arrays.sort(deltas); + String[] expectedDeltas = new String[]{"delta_0000001_0000001_0000", "delta_0000001_0000002", "delta_0000002_0000002_0000"}; + if (!Arrays.deepEquals(expectedDeltas, deltas)) { + Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas)); + } + checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 1L, 2L); + + // Verify that we have got correct set of delete_deltas. + FileStatus[] deleteDeltaStat = + fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deleteEventDeltaDirFilter); + String[] deleteDeltas = new String[deleteDeltaStat.length]; + Path minorCompactedDeleteDelta = null; + for (int i = 0; i < deleteDeltas.length; i++) { + deleteDeltas[i] = deleteDeltaStat[i].getPath().getName(); + if (deleteDeltas[i].equals("delete_delta_0000001_0000002")) { + minorCompactedDeleteDelta = deleteDeltaStat[i].getPath(); + } + } + Arrays.sort(deleteDeltas); + String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000002"}; + if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) { + Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas)); + } + // There should be no rows in the delete_delta because there have been no delete events. + checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 0L, 0L); + } + + @Test + public void minorCompactWhileStreamingWithSplitUpdate() throws Exception { + String dbName = "default"; + String tblName = "cws"; + List colNames = Arrays.asList("a", "b"); + String columnNamesProperty = "a,b"; + String columnTypesProperty = "int:string"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed + " STORED AS ORC TBLPROPERTIES ('transactional'='true'," + + "'transactional_properties'='default')", driver); + + HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null); + DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); + try { + // Write a couple of batches + for (int i = 0; i < 2; i++) { + writeBatch(connection, writer, false); + } + + // Start a third batch, but don't close it. + writeBatch(connection, writer, true); + + // Now, compact + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR)); + Worker t = new Worker(); + t.setThreadId((int) t.getId()); + t.setHiveConf(conf); + AtomicBoolean stop = new AtomicBoolean(true); + AtomicBoolean looped = new AtomicBoolean(); + t.init(stop, looped); + t.run(); + + // Find the location of the table + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + Table table = msClient.getTable(dbName, tblName); + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = + fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deltaFileFilter); + String[] names = new String[stat.length]; + Path resultFile = null; + for (int i = 0; i < names.length; i++) { + names[i] = stat[i].getPath().getName(); + if (names[i].equals("delta_0000001_0000004")) { + resultFile = stat[i].getPath(); + } + } + Arrays.sort(names); + String[] expected = new String[]{"delta_0000001_0000002", + "delta_0000001_0000004", "delta_0000003_0000004", "delta_0000005_0000006"}; + if (!Arrays.deepEquals(expected, names)) { + Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names)); + } + checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty, 0, 1L, 4L); + + // Verify that we have got correct set of delete_deltas also + FileStatus[] deleteDeltaStat = + fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deleteEventDeltaDirFilter); + String[] deleteDeltas = new String[deleteDeltaStat.length]; + Path minorCompactedDeleteDelta = null; + for (int i = 0; i < deleteDeltas.length; i++) { + deleteDeltas[i] = deleteDeltaStat[i].getPath().getName(); + if (deleteDeltas[i].equals("delete_delta_0000001_0000004")) { + minorCompactedDeleteDelta = deleteDeltaStat[i].getPath(); + } + } + Arrays.sort(deleteDeltas); + String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000004"}; + if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) { + Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas)); + } + // There should be no rows in the delete_delta because there have been no delete events. + checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 0L, 0L); + + } finally { + connection.close(); + } + } + /** * Users have the choice of specifying compaction related tblproperties either in CREATE TABLE * statement or in ALTER TABLE .. COMPACT statement. This tests both cases. diff --git metastore/if/hive_metastore.thrift metastore/if/hive_metastore.thrift index 4d92b73..84cc4f6 100755 --- metastore/if/hive_metastore.thrift +++ metastore/if/hive_metastore.thrift @@ -1474,5 +1474,6 @@ const string FILE_OUTPUT_FORMAT = "file.outputformat", const string META_TABLE_STORAGE = "storage_handler", const string TABLE_IS_TRANSACTIONAL = "transactional", const string TABLE_NO_AUTO_COMPACT = "no_auto_compaction", +const string TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties", diff --git metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp index f982bf2..1cbd176 100644 --- metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp +++ metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp @@ -57,6 +57,8 @@ hive_metastoreConstants::hive_metastoreConstants() { TABLE_NO_AUTO_COMPACT = "no_auto_compaction"; + TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties"; + } }}} // namespace diff --git metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.h metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.h index ae14bd1..3d068c3 100644 --- metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.h +++ metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.h @@ -38,6 +38,7 @@ class hive_metastoreConstants { std::string META_TABLE_STORAGE; std::string TABLE_IS_TRANSACTIONAL; std::string TABLE_NO_AUTO_COMPACT; + std::string TABLE_TRANSACTIONAL_PROPERTIES; }; extern const hive_metastoreConstants g_hive_metastore_constants; diff --git metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java index 5a666f2..8de8896 100644 --- metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java +++ metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java @@ -82,4 +82,6 @@ public static final String TABLE_NO_AUTO_COMPACT = "no_auto_compaction"; + public static final String TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties"; + } diff --git metastore/src/gen/thrift/gen-php/metastore/Types.php metastore/src/gen/thrift/gen-php/metastore/Types.php index f505208..10b1d97 100644 --- metastore/src/gen/thrift/gen-php/metastore/Types.php +++ metastore/src/gen/thrift/gen-php/metastore/Types.php @@ -18842,6 +18842,7 @@ final class Constant extends \Thrift\Type\TConstant { static protected $META_TABLE_STORAGE; static protected $TABLE_IS_TRANSACTIONAL; static protected $TABLE_NO_AUTO_COMPACT; + static protected $TABLE_TRANSACTIONAL_PROPERTIES; static protected function init_DDL_TIME() { return "transient_lastDdlTime"; @@ -18934,6 +18935,10 @@ final class Constant extends \Thrift\Type\TConstant { static protected function init_TABLE_NO_AUTO_COMPACT() { return "no_auto_compaction"; } + + static protected function init_TABLE_TRANSACTIONAL_PROPERTIES() { + return "transactional_properties"; + } } diff --git metastore/src/gen/thrift/gen-py/hive_metastore/constants.py metastore/src/gen/thrift/gen-py/hive_metastore/constants.py index d1c07a5..5100236 100644 --- metastore/src/gen/thrift/gen-py/hive_metastore/constants.py +++ metastore/src/gen/thrift/gen-py/hive_metastore/constants.py @@ -32,3 +32,4 @@ META_TABLE_STORAGE = "storage_handler" TABLE_IS_TRANSACTIONAL = "transactional" TABLE_NO_AUTO_COMPACT = "no_auto_compaction" +TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties" diff --git metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb index eeccc84..6aa7143 100644 --- metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb +++ metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb @@ -53,3 +53,5 @@ TABLE_IS_TRANSACTIONAL = %q"transactional" TABLE_NO_AUTO_COMPACT = %q"no_auto_compaction" +TABLE_TRANSACTIONAL_PROPERTIES = %q"transactional_properties" + diff --git metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java index 3e74675..7151c22 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java @@ -17,21 +17,31 @@ */ package org.apache.hadoop.hive.metastore; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.events.PreAlterTableEvent; import org.apache.hadoop.hive.metastore.events.PreCreateTableEvent; import org.apache.hadoop.hive.metastore.events.PreEventContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - final class TransactionalValidationListener extends MetaStorePreEventListener { public static final Logger LOG = LoggerFactory.getLogger(TransactionalValidationListener.class); + // Values mirrored from org.apache.hadoop.hive.ql.io.AcidUtils. + // We could have imported the constants but that would create a cyclic dependency + // between hive.metastore and hive.ql, hence is duplicated to avoid that. + public static final String[] VALID_TRANSACTIONAL_PROPERTIES = { "default", "legacy" }; + TransactionalValidationListener(Configuration conf) { super(conf); } @@ -60,6 +70,8 @@ private void handle(PreCreateTableEvent context) throws MetaException { /** * once a table is marked transactional, you cannot go back. Enforce this. + * Also in current version, 'transactional_properties' of the table cannot be altered after + * the table is created. Any attempt to alter it will throw a MetaException. */ private void handleAlterTableTransactionalProp(PreAlterTableEvent context) throws MetaException { Table newTable = context.getNewTable(); @@ -70,12 +82,22 @@ private void handleAlterTableTransactionalProp(PreAlterTableEvent context) throw Set keys = new HashSet<>(parameters.keySet()); String transactionalValue = null; boolean transactionalValuePresent = false; + boolean isTransactionalPropertiesPresent = false; + String transactionalPropertiesValue = null; + boolean hasValidTransactionalValue = false; + for (String key : keys) { if(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.equalsIgnoreCase(key)) { transactionalValuePresent = true; transactionalValue = parameters.get(key); parameters.remove(key); } + if(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES.equalsIgnoreCase(key)) { + isTransactionalPropertiesPresent = true; + transactionalPropertiesValue = parameters.get(key); + // Do not remove the parameter yet, because we have separate initialization routine + // that will use it down below. + } } if (transactionalValuePresent) { //normalize prop name @@ -91,24 +113,52 @@ private void handleAlterTableTransactionalProp(PreAlterTableEvent context) throw throw new MetaException(newTable.getDbName() + "." + newTable.getTableName() + " cannot be declared transactional because it's an external table"); } - - return; + hasValidTransactionalValue = true; } + Table oldTable = context.getOldTable(); String oldTransactionalValue = null; + String oldTransactionalPropertiesValue = null; for (String key : oldTable.getParameters().keySet()) { if (hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.equalsIgnoreCase(key)) { oldTransactionalValue = oldTable.getParameters().get(key); } + if (hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES.equalsIgnoreCase(key)) { + oldTransactionalPropertiesValue = oldTable.getParameters().get(key); + } } + + if (oldTransactionalValue == null ? transactionalValue == null : oldTransactionalValue.equalsIgnoreCase(transactionalValue)) { //this covers backward compat cases where this prop may have been set already - return; + hasValidTransactionalValue = true; + } + + if (!hasValidTransactionalValue) { + // if here, there is attempt to set transactional to something other than 'true' + // and NOT the same value it was before + throw new MetaException("TBLPROPERTIES with 'transactional'='true' cannot be unset"); + } + + if (isTransactionalPropertiesPresent) { + // Now validate transactional_properties for the table. + if (oldTransactionalValue == null) { + // If this is the first time the table is being initialized to 'transactional=true', + // any valid value can be set for the 'transactional_properties'. + initializeTransactionalProperties(newTable); + } else { + // If the table was already marked as 'transactional=true', then the new value of + // 'transactional_properties' must match the old value. Any attempt to alter the previous + // value will throw an error. An exception will still be thrown if the previous value was + // null and an attempt is made to set it. This behaviour can be changed in the future. + if (oldTransactionalPropertiesValue == null + || !oldTransactionalPropertiesValue.equalsIgnoreCase(transactionalPropertiesValue) ) { + throw new MetaException("TBLPROPERTIES with 'transactional_properties' cannot be " + + "altered after the table is created"); + } + } } - // if here, there is attempt to set transactional to something other than 'true' - // and NOT the same value it was before - throw new MetaException("TBLPROPERTIES with 'transactional'='true' cannot be unset"); } /** @@ -157,6 +207,7 @@ private void handleCreateTableTransactionalProp(PreCreateTableEvent context) thr // normalize prop name parameters.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.TRUE.toString()); + initializeTransactionalProperties(newTable); return; } @@ -187,4 +238,51 @@ private boolean conformToAcid(Table table) throws MetaException { return true; } -} \ No newline at end of file + + private void initializeTransactionalProperties(Table table) throws MetaException { + // All new versions of Acid tables created after the introduction of Acid version/type system + // can have TRANSACTIONAL_PROPERTIES property defined. This parameter can be used to change + // the operational behavior of ACID. However if this parameter is not defined, the new Acid + // tables will still behave as the old ones. This is done so to preserve the behavior + // in case of rolling downgrade. + + // Initialize transaction table properties with default string value. + String tableTransactionalProperties = null; + + Map parameters = table.getParameters(); + if (parameters != null) { + Set keys = new HashSet<>(parameters.keySet()); + for (String key : keys) { + if (hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES.equalsIgnoreCase(key)) { + tableTransactionalProperties = parameters.get(key).toLowerCase(); + parameters.remove(key); + String validationError = validateTransactionalProperties(tableTransactionalProperties); + if (validationError != null) { + throw new MetaException("Invalid transactional properties specified for the " + + "table with the error " + validationError); + } + break; + } + } + } + + if (tableTransactionalProperties != null) { + parameters.put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, + tableTransactionalProperties); + } + } + + private String validateTransactionalProperties(String transactionalProperties) { + boolean isValid = false; + for (String validString : VALID_TRANSACTIONAL_PROPERTIES) { + if (transactionalProperties.equals(validString)) { + isValid = true; + break; + } + } + if (!isValid) { + return "unknown value for transactional_properties"; + } + return null; // All checks passed, return null. + } +} diff --git orc/src/java/org/apache/orc/impl/TreeReaderFactory.java orc/src/java/org/apache/orc/impl/TreeReaderFactory.java index c4a2093..5f3ea53 100644 --- orc/src/java/org/apache/orc/impl/TreeReaderFactory.java +++ orc/src/java/org/apache/orc/impl/TreeReaderFactory.java @@ -2037,7 +2037,7 @@ public static TreeReader createTreeReader(TypeDescription readerType, return new NullTreeReader(0); } TypeDescription.Category readerTypeCategory = readerType.getCategory(); - if (!fileType.equals(readerType) && + if (!fileType.getCategory().equals(readerTypeCategory) && (readerTypeCategory != TypeDescription.Category.STRUCT && readerTypeCategory != TypeDescription.Category.MAP && readerTypeCategory != TypeDescription.Category.LIST && diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java index db6848a..8c7d99d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java @@ -81,6 +81,7 @@ public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext HiveInputFormat.pushFilters(job, ts); AcidUtils.setTransactionalTableScan(job, ts.getConf().isAcidTable()); + AcidUtils.setAcidOperationalProperties(job, ts.getConf().getAcidOperationalProperties()); } sink = work.getSink(); fetch = new FetchOperator(work, job, source, getVirtualColumns(source)); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java index 57b6c67..584eff4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java @@ -212,6 +212,7 @@ public void initializeMapredLocalWork(MapJoinDesc mjConf, Configuration hconf, HiveInputFormat.pushFilters(jobClone, ts); AcidUtils.setTransactionalTableScan(jobClone, ts.getConf().isAcidTable()); + AcidUtils.setAcidOperationalProperties(jobClone, ts.getConf().getAcidOperationalProperties()); ts.passExecContext(getExecContext()); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java index 23a13d6..b1a1c34 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java @@ -472,6 +472,7 @@ private void initializeOperators(Map fetchOpJobConfMap) HiveInputFormat.pushFilters(jobClone, ts); AcidUtils.setTransactionalTableScan(job, ts.getConf().isAcidTable()); + AcidUtils.setAcidOperationalProperties(job, ts.getConf().getAcidOperationalProperties()); // create a fetch operator FetchOperator fetchOp = new FetchOperator(entry.getValue(), jobClone); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java index dd90a95..b85b827 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java @@ -18,6 +18,10 @@ package org.apache.hadoop.hive.ql.io; +import java.io.IOException; +import java.io.PrintStream; +import java.util.Properties; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -26,10 +30,6 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.Reporter; -import java.io.IOException; -import java.io.PrintStream; -import java.util.Properties; - /** * An extension for OutputFormats that want to implement ACID transactions. * @param the row type of the file @@ -44,6 +44,7 @@ private FileSystem fs; private ObjectInspector inspector; private boolean writingBase = false; + private boolean writingDeleteDelta = false; private boolean isCompressed = false; private Properties properties; private Reporter reporter; @@ -98,6 +99,16 @@ public Options writingBase(boolean val) { } /** + * Is this writing a delete delta directory? + * @param val is this a delete delta file? + * @return this + */ + public Options writingDeleteDelta(boolean val) { + this.writingDeleteDelta = val; + return this; + } + + /** * Provide a file system to the writer. Otherwise, the filesystem for the * path will be used. * @param fs the file system that corresponds to the the path @@ -223,7 +234,7 @@ public Options finalDestination(Path p) { this.finalDestination = p; return this; } - + public Configuration getConfiguration() { return configuration; } @@ -260,6 +271,10 @@ public boolean isWritingBase() { return writingBase; } + public boolean isWritingDeleteDelta() { + return writingDeleteDelta; + } + public int getBucket() { return bucket; } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index c150ec5..2b08c0c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -18,10 +18,14 @@ package org.apache.hadoop.hive.ql.io; -import org.apache.hadoop.hive.metastore.api.DataOperationType; -import org.apache.hadoop.hive.ql.ErrorMsg; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.regex.Pattern; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -30,22 +34,18 @@ import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.api.DataOperationType; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.shims.HadoopShims; -import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.regex.Pattern; - /** * Utilities that are shared by all of the ACID input and output formats. They * are used by the compactor and cleaner and thus must be format agnostic. @@ -61,6 +61,7 @@ public boolean accept(Path path) { } }; public static final String DELTA_PREFIX = "delta_"; + public static final String DELETE_DELTA_PREFIX = "delete_delta_"; public static final String DELTA_SIDE_FILE_SUFFIX = "_flush_length"; public static final PathFilter deltaFileFilter = new PathFilter() { @Override @@ -68,6 +69,12 @@ public boolean accept(Path path) { return path.getName().startsWith(DELTA_PREFIX); } }; + public static final PathFilter deleteEventDeltaDirFilter = new PathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().startsWith(DELETE_DELTA_PREFIX); + } + }; public static final String BUCKET_PREFIX = "bucket_"; public static final PathFilter bucketFileFilter = new PathFilter() { @Override @@ -142,6 +149,23 @@ public static String deltaSubdir(long min, long max, int statementId) { return deltaSubdir(min, max) + "_" + String.format(STATEMENT_DIGITS, statementId); } + /** + * This is format of delete delta dir name prior to Hive 1.3.x + */ + public static String deleteDeltaSubdir(long min, long max) { + return DELETE_DELTA_PREFIX + String.format(DELTA_DIGITS, min) + "_" + + String.format(DELTA_DIGITS, max); + } + + /** + * Each write statement in a transaction creates its own delete delta dir, + * when split-update acid operational property is turned on. + * @since 1.3.x + */ + public static String deleteDeltaSubdir(long min, long max, int statementId) { + return deleteDeltaSubdir(min, max) + "_" + String.format(STATEMENT_DIGITS, statementId); + } + public static String baseDir(long txnId) { return BASE_PREFIX + String.format(DELTA_DIGITS, txnId); } @@ -163,12 +187,19 @@ public static Path createFilename(Path directory, } else if(options.getStatementId() == -1) { //when minor compaction runs, we collapse per statement delta files inside a single //transaction so we no longer need a statementId in the file name - subdir = deltaSubdir(options.getMinimumTransactionId(), - options.getMaximumTransactionId()); + subdir = options.isWritingDeleteDelta() ? + deleteDeltaSubdir(options.getMinimumTransactionId(), + options.getMaximumTransactionId()) + : deltaSubdir(options.getMinimumTransactionId(), + options.getMaximumTransactionId()); } else { - subdir = deltaSubdir(options.getMinimumTransactionId(), - options.getMaximumTransactionId(), - options.getStatementId()); + subdir = options.isWritingDeleteDelta() ? + deleteDeltaSubdir(options.getMinimumTransactionId(), + options.getMaximumTransactionId(), + options.getStatementId()) + : deltaSubdir(options.getMinimumTransactionId(), + options.getMaximumTransactionId(), + options.getStatementId()); } return createBucketFile(new Path(directory, subdir), options.getBucket()); } @@ -195,11 +226,10 @@ static long parseBase(Path path) { * @return the options used to create that filename */ public static AcidOutputFormat.Options - parseBaseBucketFilename(Path bucketFile, - Configuration conf) { + parseBaseOrDeltaBucketFilename(Path bucketFile, + Configuration conf) { AcidOutputFormat.Options result = new AcidOutputFormat.Options(conf); String filename = bucketFile.getName(); - result.writingBase(true); if (ORIGINAL_PATTERN.matcher(filename).matches()) { int bucket = Integer.parseInt(filename.substring(0, filename.indexOf('_'))); @@ -207,18 +237,37 @@ static long parseBase(Path path) { .setOldStyle(true) .minimumTransactionId(0) .maximumTransactionId(0) - .bucket(bucket); + .bucket(bucket) + .writingBase(true); } else if (filename.startsWith(BUCKET_PREFIX)) { int bucket = Integer.parseInt(filename.substring(filename.indexOf('_') + 1)); - result - .setOldStyle(false) - .minimumTransactionId(0) - .maximumTransactionId(parseBase(bucketFile.getParent())) - .bucket(bucket); + if (bucketFile.getParent().getName().startsWith(BASE_PREFIX)) { + result + .setOldStyle(false) + .minimumTransactionId(0) + .maximumTransactionId(parseBase(bucketFile.getParent())) + .bucket(bucket) + .writingBase(true); + } else if (bucketFile.getParent().getName().startsWith(DELTA_PREFIX)) { + ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELTA_PREFIX); + result + .setOldStyle(false) + .minimumTransactionId(parsedDelta.minTransaction) + .maximumTransactionId(parsedDelta.maxTransaction) + .bucket(bucket); + } else if (bucketFile.getParent().getName().startsWith(DELETE_DELTA_PREFIX)) { + ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELETE_DELTA_PREFIX); + result + .setOldStyle(false) + .minimumTransactionId(parsedDelta.minTransaction) + .maximumTransactionId(parsedDelta.maxTransaction) + .bucket(bucket); + } } else { result.setOldStyle(true).bucket(-1).minimumTransactionId(0) - .maximumTransactionId(0); + .maximumTransactionId(0) + .writingBase(true); } return result; } @@ -248,6 +297,139 @@ public static DataOperationType toDataOperationType(Operation op) { } } + public static class AcidOperationalProperties { + private int description = 0x00; + public static final int SPLIT_UPDATE_BIT = 0x01; + public static final String SPLIT_UPDATE_STRING = "split_update"; + public static final int HASH_BASED_MERGE_BIT = 0x02; + public static final String HASH_BASED_MERGE_STRING = "hash_merge"; + public static final String DEFAULT_VALUE_STRING = "default"; + public static final String LEGACY_VALUE_STRING = "legacy"; + + private AcidOperationalProperties() { + } + + /** + * Returns an acidOperationalProperties object that represents ACID behavior for legacy tables + * that were created before ACID type system using operational properties was put in place. + * @return the acidOperationalProperties object + */ + public static AcidOperationalProperties getLegacy() { + AcidOperationalProperties obj = new AcidOperationalProperties(); + // In legacy mode, none of these properties are turned on. + return obj; + } + + /** + * Returns an acidOperationalProperties object that represents default ACID behavior for tables + * that do no explicitly specify/override the default behavior. + * @return the acidOperationalProperties object. + */ + public static AcidOperationalProperties getDefault() { + AcidOperationalProperties obj = new AcidOperationalProperties(); + obj.setSplitUpdate(true); + obj.setHashBasedMerge(false); + return obj; + } + + /** + * Returns an acidOperationalProperties object that is represented by an encoded string. + * @param propertiesStr an encoded string representing the acidOperationalProperties. + * @return the acidOperationalProperties object. + */ + public static AcidOperationalProperties parseString(String propertiesStr) { + if (propertiesStr == null) { + return AcidOperationalProperties.getLegacy(); + } + if (propertiesStr.equalsIgnoreCase(DEFAULT_VALUE_STRING)) { + return AcidOperationalProperties.getDefault(); + } + if (propertiesStr.equalsIgnoreCase(LEGACY_VALUE_STRING)) { + return AcidOperationalProperties.getLegacy(); + } + AcidOperationalProperties obj = new AcidOperationalProperties(); + String[] options = propertiesStr.split("\\|"); + for (String option : options) { + switch (option) { + case SPLIT_UPDATE_STRING: + obj.setSplitUpdate(true); + break; + case HASH_BASED_MERGE_STRING: + obj.setHashBasedMerge(true); + break; + default: + break; + } + } + return obj; + } + + /** + * Returns an acidOperationalProperties object that is represented by an encoded 32-bit integer. + * @param properties an encoded 32-bit representing the acidOperationalProperties. + * @return the acidOperationalProperties object. + */ + public static AcidOperationalProperties parseInt(int properties) { + AcidOperationalProperties obj = new AcidOperationalProperties(); + if ((properties & SPLIT_UPDATE_BIT) > 0) { + obj.setSplitUpdate(true); + } + if ((properties & HASH_BASED_MERGE_BIT) > 0) { + obj.setHashBasedMerge(true); + } + return obj; + } + + /** + * Sets the split update property for ACID operations based on the boolean argument. + * When split update is turned on, an update ACID event is interpreted as a combination of + * delete event followed by an update event. + * @param isSplitUpdate a boolean property that turns on split update when true. + * @return the acidOperationalProperties object. + */ + public AcidOperationalProperties setSplitUpdate(boolean isSplitUpdate) { + description = (isSplitUpdate + ? (description | SPLIT_UPDATE_BIT) : (description & ~SPLIT_UPDATE_BIT)); + return this; + } + + /** + * Sets the hash-based merge property for ACID operations that combines delta files using + * GRACE hash join based approach, when turned on. (Currently unimplemented!) + * @param isHashBasedMerge a boolean property that turns on hash-based merge when true. + * @return the acidOperationalProperties object. + */ + public AcidOperationalProperties setHashBasedMerge(boolean isHashBasedMerge) { + description = (isHashBasedMerge + ? (description | HASH_BASED_MERGE_BIT) : (description & ~HASH_BASED_MERGE_BIT)); + return this; + } + + public boolean isSplitUpdate() { + return (description & SPLIT_UPDATE_BIT) > 0; + } + + public boolean isHashBasedMerge() { + return (description & HASH_BASED_MERGE_BIT) > 0; + } + + public int toInt() { + return description; + } + + @Override + public String toString() { + StringBuilder str = new StringBuilder(); + if (isSplitUpdate()) { + str.append("|" + SPLIT_UPDATE_STRING); + } + if (isHashBasedMerge()) { + str.append("|" + HASH_BASED_MERGE_STRING); + } + return str.toString(); + } + } + public static interface Directory { /** @@ -256,6 +438,8 @@ public static DataOperationType toDataOperationType(Operation op) { */ Path getBaseDirectory(); + List getDeltaFiles(); + /** * Get the list of original files. Not {@code null}. * @return the list of original files (eg. 000000_0) @@ -382,6 +566,35 @@ else if(statementId != parsedDelta.statementId) { List result = new ArrayList<>(deltas.size()); AcidInputFormat.DeltaMetaData last = null; for(ParsedDelta parsedDelta : deltas) { + if (!parsedDelta.getPath().getName().startsWith(DELTA_PREFIX)) { + continue; // when serializing the deltas, skip anything which is not a delta file. + } + if(last != null && last.getMinTxnId() == parsedDelta.getMinTransaction() && last.getMaxTxnId() == parsedDelta.getMaxTransaction()) { + last.getStmtIds().add(parsedDelta.getStatementId()); + continue; + } + last = new AcidInputFormat.DeltaMetaData(parsedDelta.getMinTransaction(), parsedDelta.getMaxTransaction(), new ArrayList()); + result.add(last); + if(parsedDelta.statementId >= 0) { + last.getStmtIds().add(parsedDelta.getStatementId()); + } + } + return result; + } + + /** + * Convert the list of delete deltas into an equivalent list of begin/end + * transaction id pairs. Assumes {@code deltas} is sorted. + * @param deleteDeltas + * @return the list of transaction ids to serialize + */ + public static List serializeDeleteDeltas(List deleteDeltas) { + List result = new ArrayList<>(deleteDeltas.size()); + AcidInputFormat.DeltaMetaData last = null; + for(ParsedDelta parsedDelta : deleteDeltas) { + if (!parsedDelta.getPath().getName().startsWith(DELETE_DELTA_PREFIX)) { + continue; // when serializing the deltas, skip anything which is not a delta file. + } if(last != null && last.getMinTxnId() == parsedDelta.getMinTransaction() && last.getMaxTxnId() == parsedDelta.getMaxTransaction()) { last.getStmtIds().add(parsedDelta.getStatementId()); continue; @@ -418,15 +631,47 @@ else if(statementId != parsedDelta.statementId) { return results.toArray(new Path[results.size()]); } - private static ParsedDelta parseDelta(FileStatus path) { - ParsedDelta p = parsedDelta(path.getPath()); + /** + * Convert the list of begin/end transaction id pairs to a list of delete delta + * directories. Note that there may be multiple delta files for the exact same txn range starting + * with 1.3.x; + * see {@link org.apache.hadoop.hive.ql.io.AcidUtils#deltaSubdir(long, long, int)} + * @param root the root directory + * @param deltas list of begin/end transaction id pairs + * @return the list of delta paths + */ + public static Path[] deserializeDeleteDeltas(Path root, final List deltas) throws IOException { + List results = new ArrayList(deltas.size()); + for(AcidInputFormat.DeltaMetaData dmd : deltas) { + if(dmd.getStmtIds().isEmpty()) { + results.add(new Path(root, deleteDeltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId()))); + continue; + } + for(Integer stmtId : dmd.getStmtIds()) { + results.add(new Path(root, deleteDeltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId(), stmtId))); + } + } + return results.toArray(new Path[results.size()]); + } + + public static ParsedDelta parsedDelta(Path deltaDir) { + String deltaDirName = deltaDir.getName(); + if (deltaDirName.startsWith(DELETE_DELTA_PREFIX)) { + return parsedDelta(deltaDir, DELETE_DELTA_PREFIX); + } + return parsedDelta(deltaDir, DELTA_PREFIX); // default prefix is delta_prefix + } + + private static ParsedDelta parseDelta(FileStatus path, String deltaPrefix) { + ParsedDelta p = parsedDelta(path.getPath(), deltaPrefix); return new ParsedDelta(p.getMinTransaction(), p.getMaxTransaction(), path, p.statementId); } - public static ParsedDelta parsedDelta(Path deltaDir) { + + public static ParsedDelta parsedDelta(Path deltaDir, String deltaPrefix) { String filename = deltaDir.getName(); - if (filename.startsWith(DELTA_PREFIX)) { - String rest = filename.substring(DELTA_PREFIX.length()); + if (filename.startsWith(deltaPrefix)) { + String rest = filename.substring(deltaPrefix.length()); int split = rest.indexOf('_'); int split2 = rest.indexOf('_', split + 1);//may be -1 if no statementId long min = Long.parseLong(rest.substring(0, split)); @@ -440,7 +685,7 @@ public static ParsedDelta parsedDelta(Path deltaDir) { return new ParsedDelta(min, max, null, statementId); } throw new IllegalArgumentException(deltaDir + " does not start with " + - DELTA_PREFIX); + deltaPrefix); } /** @@ -456,7 +701,8 @@ public static boolean isAcid(Path directory, for(FileStatus file: fs.listStatus(directory)) { String filename = file.getPath().getName(); if (filename.startsWith(BASE_PREFIX) || - filename.startsWith(DELTA_PREFIX)) { + filename.startsWith(DELTA_PREFIX) || + filename.startsWith(DELETE_DELTA_PREFIX)) { if (file.isDir()) { return true; } @@ -481,28 +727,41 @@ public static Directory getAcidState(Path directory, private Path oldestBase = null; } + public static Directory getAcidState(Path directory, + Configuration conf, + ValidTxnList txnList, + boolean useFileIds, + boolean ignoreEmptyFiles) throws IOException { + FileSystem fs = directory.getFileSystem(conf); + return getAcidState(fs, directory, conf, txnList, useFileIds, ignoreEmptyFiles); + } + + + /** * Get the ACID state of the given directory. It finds the minimal set of * base and diff directories. Note that because major compactions don't * preserve the history, we can't use a base directory that includes a * transaction id that we must exclude. + * @param fs the filesystem to be used to retrieve the file status * @param directory the partition directory to analyze * @param conf the configuration * @param txnList the list of transactions that we are reading * @return the state of the directory * @throws IOException */ - public static Directory getAcidState(Path directory, + public static Directory getAcidState(FileSystem fs, + Path directory, Configuration conf, ValidTxnList txnList, boolean useFileIds, boolean ignoreEmptyFiles ) throws IOException { - FileSystem fs = directory.getFileSystem(conf); final List deltas = new ArrayList(); List working = new ArrayList(); List originalDirectories = new ArrayList(); final List obsolete = new ArrayList(); + final List deltaFilesWithId = new ArrayList(); List childrenWithId = null; if (useFileIds) { try { @@ -517,13 +776,13 @@ public static Directory getAcidState(Path directory, if (childrenWithId != null) { for (HdfsFileStatusWithId child : childrenWithId) { getChildState(child.getFileStatus(), child, txnList, working, - originalDirectories, original, obsolete, bestBase, ignoreEmptyFiles); + originalDirectories, original, deltaFilesWithId, obsolete, bestBase, ignoreEmptyFiles); } } else { List children = HdfsUtils.listLocatedStatus(fs, directory, hiddenFileFilter); for (FileStatus child : children) { - getChildState( - child, null, txnList, working, originalDirectories, original, obsolete, bestBase, ignoreEmptyFiles); + getChildState(child, null, txnList, working, + originalDirectories, original, deltaFilesWithId, obsolete, bestBase, ignoreEmptyFiles); } } @@ -553,14 +812,18 @@ public static Directory getAcidState(Path directory, //subject to list of 'exceptions' in 'txnList' (not show in above example). long current = bestBase.txn; int lastStmtId = -1; + ParsedDelta prev = null; for(ParsedDelta next: working) { + boolean isDeltaAdded = false; if (next.maxTransaction > current) { // are any of the new transactions ones that we care about? if (txnList.isTxnRangeValid(current+1, next.maxTransaction) != ValidTxnList.RangeResponse.NONE) { deltas.add(next); + isDeltaAdded = true; current = next.maxTransaction; lastStmtId = next.statementId; + prev = next; } } else if(next.maxTransaction == current && lastStmtId >= 0) { @@ -568,10 +831,44 @@ else if(next.maxTransaction == current && lastStmtId >= 0) { //generate multiple delta files with the same txnId range //of course, if maxTransaction has already been minor compacted, all per statement deltas are obsolete deltas.add(next); + isDeltaAdded = true; + prev = next; + } + else if (prev != null && next.maxTransaction == prev.maxTransaction + && next.minTransaction == prev.minTransaction + && next.statementId == prev.statementId) { + // The 'next' parsedDelta may have everything equal to the 'prev' parsedDelta, except + // the path. This may happen when we have split update and we have two types of delta + // directories- 'delta_x_y' and 'delete_delta_x_y' for same the txn range. + + // Also note that any delete_deltas in between a given delta_x_y range would be made + // obsolete. For example, a delta_30_50 would make delete_delta_40_40 obsolete. + // This is valid because minor compaction always compacts the normal deltas and the delete + // deltas for the same range. That is, if we had 3 directories, delta_30_30, + // delete_delta_40_40 and delta_50_50, then running minor compaction would produce + // delta_30_50 and delete_delta_30_50. + + deltas.add(next); + isDeltaAdded = true; + prev = next; } else { obsolete.add(next.path); } + if (isDeltaAdded) { + try { + List deltaHdfsFileStatuses + = SHIMS.listLocatedHdfsStatus(fs, next.getPath(), hiddenFileFilter); + deltaFilesWithId.addAll(deltaHdfsFileStatuses); + } catch (Throwable t) { + // Failed to get files with ID; using regular API + List deltaFileStatuses = + HdfsUtils.listLocatedStatus(fs, next.getPath(), hiddenFileFilter); + for (FileStatus deltaFileStatus : deltaFileStatuses) { + deltaFilesWithId.add(createOriginalObj(null, deltaFileStatus)); + } + } + } } if(bestBase.oldestBase != null && bestBase.status == null) { @@ -614,6 +911,11 @@ public Path getBaseDirectory() { public List getObsolete() { return obsolete; } + + @Override + public List getDeltaFiles() { + return deltaFilesWithId; + } }; } /** @@ -638,7 +940,8 @@ private static boolean isValidBase(long baseTxnId, ValidTxnList txnList) { } private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId, ValidTxnList txnList, List working, List originalDirectories, - List original, List obsolete, TxnBase bestBase, boolean ignoreEmptyFiles) { + List original, List deltaFilesWithId, + List obsolete, TxnBase bestBase, boolean ignoreEmptyFiles) throws IOException { Path p = child.getPath(); String fn = p.getName(); if (fn.startsWith(BASE_PREFIX) && child.isDir()) { @@ -662,8 +965,11 @@ private static void getChildState(FileStatus child, HdfsFileStatusWithId childWi } else { obsolete.add(child); } - } else if (fn.startsWith(DELTA_PREFIX) && child.isDir()) { - ParsedDelta delta = parseDelta(child); + } else if ((fn.startsWith(DELTA_PREFIX) || fn.startsWith(DELETE_DELTA_PREFIX)) + && child.isDir()) { + String deltaPrefix = + (fn.startsWith(DELTA_PREFIX)) ? DELTA_PREFIX : DELETE_DELTA_PREFIX; + ParsedDelta delta = parseDelta(child, deltaPrefix); if (txnList.isTxnRangeValid(delta.minTransaction, delta.maxTransaction) != ValidTxnList.RangeResponse.NONE) { @@ -791,4 +1097,84 @@ public static boolean isAcidTable(Table table) { return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true"); } + + /** + * Sets the acidOperationalProperties in the configuration object argument. + * @param conf Mutable configuration object + * @param properties An acidOperationalProperties object to initialize from. + */ + public static void setAcidOperationalProperties(Configuration conf, + AcidOperationalProperties properties) { + if (properties != null) { + HiveConf.setIntVar(conf, ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES, properties.toInt()); + } + } + + /** + * Sets the acidOperationalProperties in the map object argument. + * @param parameters Mutable map object + * @param properties An acidOperationalProperties object to initialize from. + */ + public static void setAcidOperationalProperties( + Map parameters, AcidOperationalProperties properties) { + if (properties != null) { + parameters.put(ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, properties.toString()); + } + } + + /** + * Returns the acidOperationalProperties for a given table. + * @param table A table object + * @return the acidOperationalProperties object for the corresponding table. + */ + public static AcidOperationalProperties getAcidOperationalProperties(Table table) { + String transactionalProperties = table.getProperty( + hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); + if (transactionalProperties == null) { + // If the table does not define any transactional properties, we return a legacy type. + return AcidOperationalProperties.getLegacy(); + } + return AcidOperationalProperties.parseString(transactionalProperties); + } + + /** + * Returns the acidOperationalProperties for a given configuration. + * @param conf A configuration object + * @return the acidOperationalProperties object for the corresponding configuration. + */ + public static AcidOperationalProperties getAcidOperationalProperties(Configuration conf) { + // If the conf does not define any transactional properties, the parseInt() should receive + // a value of zero, which will set AcidOperationalProperties to a legacy type and return that. + return AcidOperationalProperties.parseInt( + HiveConf.getIntVar(conf, ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES)); + } + + /** + * Returns the acidOperationalProperties for a given set of properties. + * @param props A properties object + * @return the acidOperationalProperties object for the corresponding properties. + */ + public static AcidOperationalProperties getAcidOperationalProperties(Properties props) { + String resultStr = props.getProperty(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); + if (resultStr == null) { + // If the properties does not define any transactional properties, we return a legacy type. + return AcidOperationalProperties.getLegacy(); + } + return AcidOperationalProperties.parseString(resultStr); + } + + /** + * Returns the acidOperationalProperties for a given map. + * @param parameters A parameters object + * @return the acidOperationalProperties object for the corresponding map. + */ + public static AcidOperationalProperties getAcidOperationalProperties( + Map parameters) { + String resultStr = parameters.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); + if (resultStr == null) { + // If the parameters does not define any transactional properties, we return a legacy type. + return AcidOperationalProperties.getLegacy(); + } + return AcidOperationalProperties.parseString(resultStr); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 945b828..c4b9940 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -637,6 +637,7 @@ protected void pushProjectionsAndFilters(JobConf jobConf, Class inputFormatClass pushFilters(jobConf, ts); AcidUtils.setTransactionalTableScan(job, ts.getConf().isAcidTable()); + AcidUtils.setAcidOperationalProperties(job, ts.getConf().getAcidOperationalProperties()); } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 63d02fb..2405dc1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -81,6 +81,7 @@ import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.AcidUtils.Directory; +import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta; import org.apache.hadoop.hive.ql.io.BatchToRowInputFormat; import org.apache.hadoop.hive.ql.io.BatchToRowReader; import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; @@ -1034,7 +1035,7 @@ public AcidDirInfo run() throws Exception { } private AcidDirInfo callInternal() throws IOException { - AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir, + AcidUtils.Directory dirInfo = AcidUtils.getAcidState(fs, dir, context.conf, context.transactionList, useFileIds, true); Path base = dirInfo.getBaseDirectory(); // find the base files (original or new style) @@ -1775,9 +1776,18 @@ public float getProgress() throws IOException { final OrcSplit split = (OrcSplit) inputSplit; final Path path = split.getPath(); + + // Infer whether its an original file or base/delta file based on the path. + boolean isOriginal = true; + if (path.getParent().getName().startsWith(AcidUtils.BASE_PREFIX) + || path.getParent().getName().startsWith(AcidUtils.DELTA_PREFIX) + || path.getParent().getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)) { + isOriginal = false; + } + Path root; if (split.hasBase()) { - if (split.isOriginal()) { + if (isOriginal) { root = path.getParent(); } else { root = path.getParent().getParent(); @@ -1785,7 +1795,20 @@ public float getProgress() throws IOException { } else { root = path; } - final Path[] deltas = AcidUtils.deserializeDeltas(root, split.getDeltas()); + + // Retrieve the acidOperationalProperties for the table, initialized in HiveInputFormat. + AcidUtils.AcidOperationalProperties acidOperationalProperties + = AcidUtils.getAcidOperationalProperties(options.getConfiguration()); + + // The deltas are decided based on whether split-update has been turned on for the table or not. + // When split-update is turned off, everything in the delta_x_y/ directory should be treated + // as delta. However if split-update is turned on, only the files in delete_delta_x_y/ directory + // need to be considered as delta, because files in delta_x_y/ will be processed as base files + // since they only have insert events in them. + final Path[] deltas = + acidOperationalProperties.isSplitUpdate() ? + AcidUtils.deserializeDeleteDeltas(root, split.getDeltas()) + : AcidUtils.deserializeDeltas(root, split.getDeltas()); final Configuration conf = options.getConfiguration(); @@ -1805,7 +1828,7 @@ public float getProgress() throws IOException { setSearchArgument(readOptions, schemaTypes, conf, SCHEMA_TYPES_IS_ORIGINAL); if (split.hasBase()) { - bucket = AcidUtils.parseBaseBucketFilename(split.getPath(), conf) + bucket = AcidUtils.parseBaseOrDeltaBucketFilename(split.getPath(), conf) .getBucket(); OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf) .maxLength(split.getFileLength()); @@ -1821,7 +1844,7 @@ public float getProgress() throws IOException { Long.MAX_VALUE + ":"); ValidTxnList validTxnList = new ValidReadTxnList(txnString); final OrcRawRecordMerger records = - new OrcRawRecordMerger(conf, true, reader, split.isOriginal(), bucket, + new OrcRawRecordMerger(conf, true, reader, isOriginal, bucket, validTxnList, readOptions, deltas); return new RowReader() { OrcStruct innerRecord = records.createValue(); @@ -1961,15 +1984,59 @@ private static boolean isStripeSatisfyPredicate( List original = dirInfo.getOriginalFiles(); List deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories()); boolean[] covered = new boolean[context.numBuckets]; - boolean isOriginal = base == null; + boolean isOriginal = (base == null) && (!original.isEmpty()); + + // Determine the transactional_properties of the table from the job conf stored in context. + // The table properties are copied to job conf at HiveInputFormat::addSplitsForGroup(), + // & therefore we should be able to retrieve them here and determine appropriate behavior. + String transactional_properties = + context.conf.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); + AcidUtils.AcidOperationalProperties acidOperationalProperties + = AcidUtils.AcidOperationalProperties.parseString(transactional_properties); + + if (acidOperationalProperties.isSplitUpdate()) { + // If we have split-update turned on for this table, then the delta events have already been + // split into two directories- delta_x_y/ and delete_delta_x_y/. + // When you have split-update turned on, the insert events go to delta_x_y/ directory and all + // the delete events go to delete_x_y/. An update event will generate two events- + // a delete event for the old record that is put into delete_delta_x_y/, + // followed by an insert event for the updated record put into the usual delta_x_y/. + // Therefore, everything inside delta_x_y/ is an insert event and all the files in delta_x_y/ + // can be treated like base files. Hence, each of these are added to baseOrOriginalFiles list. + + List deltaFiles = dirInfo.getDeltaFiles(); + // We make a copy and do not mutate the original list. + baseOrOriginalFiles = new ArrayList(baseOrOriginalFiles); + for (HdfsFileStatusWithId deltaFile : deltaFiles) { + // All the delta_x_y files are being considered as base files for split-update. + // The path structure of the deltaFile is assumed to be ../delta_x_y/bucket_a. + Path deltaDirRoot = deltaFile.getFileStatus().getPath().getParent(); + if (deltaDirRoot.getName().startsWith(AcidUtils.DELTA_PREFIX)) { + baseOrOriginalFiles.add(deltaFile); + } + } + + // Now, once we have considered the delta_x_y as base, we need to remove them from the + // original deltas, so that the 'deltas' only contain delete_delta_x_y directories. + List parsedDeltas = dirInfo.getCurrentDirectories(); + List deleteParsedDeltas = new ArrayList(); + for (ParsedDelta parsedDelta : parsedDeltas) { + if (parsedDelta.getPath().getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)) { + deleteParsedDeltas.add(parsedDelta); + } + } + // Update the 'deltas' to include only the delete_deltas. + deltas = AcidUtils.serializeDeleteDeltas(deleteParsedDeltas); + } // if we have a base to work from - if (base != null || !original.isEmpty()) { + if (base != null || !original.isEmpty() || !baseOrOriginalFiles.isEmpty()) { long totalFileSize = 0; for (HdfsFileStatusWithId child : baseOrOriginalFiles) { totalFileSize += child.getFileStatus().getLen(); - AcidOutputFormat.Options opts = AcidUtils.parseBaseBucketFilename + AcidOutputFormat.Options opts = AcidUtils.parseBaseOrDeltaBucketFilename (child.getFileStatus().getPath(), context.conf); + opts.writingBase(true); int b = opts.getBucket(); // If the bucket is in the valid range, mark it as covered. // I wish Hive actually enforced bucketing all of the time. @@ -2020,6 +2087,10 @@ private static boolean isStripeSatisfyPredicate( Path bucketFile; if (baseDirectory.getName().startsWith(AcidUtils.BASE_PREFIX)) { bucketFile = AcidUtils.createBucketFile(baseDirectory, bucket); + } else if (baseDirectory.getName().startsWith(AcidUtils.DELTA_PREFIX)) { + // This is invoked when we have split-update and delta files consisting of only + // the insert events are also considered base files. + bucketFile = AcidUtils.createBucketFile(baseDirectory, bucket); } else { isOriginal = true; bucketFile = findOriginalBucket(baseDirectory.getFileSystem(conf), diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 1a1af28..f2db813 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -25,11 +25,6 @@ import java.util.ArrayList; import java.util.List; -import org.apache.orc.impl.AcidStats; -import org.apache.orc.impl.OrcAcidUtils; -import org.apache.orc.OrcConf; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -42,10 +37,16 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; +import org.apache.orc.OrcConf; +import org.apache.orc.impl.AcidStats; +import org.apache.orc.impl.OrcAcidUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; @@ -79,9 +80,12 @@ private static final Charset UTF8 = Charset.forName("UTF-8"); private final AcidOutputFormat.Options options; + private final AcidUtils.AcidOperationalProperties acidOperationalProperties; private final Path path; + private Path deleteEventPath; private final FileSystem fs; private Writer writer; + private Writer deleteEventWriter; private final FSDataOutputStream flushLengths; private final OrcStruct item; private final IntWritable operation = new IntWritable(); @@ -95,13 +99,16 @@ // because that is monotonically increasing to give new unique row ids. private long rowCountDelta = 0; private final KeyIndexBuilder indexBuilder = new KeyIndexBuilder(); + private final KeyIndexBuilder deleteEventIndexBuilder = new KeyIndexBuilder(); private StructField recIdField = null; // field to look for the record identifier in private StructField rowIdField = null; // field inside recId to look for row id in private StructField originalTxnField = null; // field inside recId to look for original txn in + private StructField bucketField = null; // field inside recId to look for bucket in private StructObjectInspector rowInspector; // OI for the original row private StructObjectInspector recIdInspector; // OI for the record identifier struct private LongObjectInspector rowIdInspector; // OI for the long row id inside the recordIdentifier private LongObjectInspector origTxnInspector; // OI for the original txn inside the record + private IntObjectInspector bucketInspector; // OI for the bucket inside the record // identifer static int getOperation(OrcStruct struct) { @@ -180,8 +187,19 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { OrcRecordUpdater(Path path, AcidOutputFormat.Options options) throws IOException { this.options = options; + // Initialize acidOperationalProperties based on table properties, and + // if they are not available, see if we can find it in the job configuration. + if (options.getTableProperties() != null) { + this.acidOperationalProperties = + AcidUtils.getAcidOperationalProperties(options.getTableProperties()); + } else { + this.acidOperationalProperties = + AcidUtils.getAcidOperationalProperties(options.getConfiguration()); + } this.bucket.set(options.getBucket()); this.path = AcidUtils.createFilename(path, options); + this.deleteEventWriter = null; + this.deleteEventPath = null; FileSystem fs = options.getFilesystem(); if (fs == null) { fs = path.getFileSystem(options.getConfiguration()); @@ -242,6 +260,14 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { writerOptions.inspector(createEventSchema(findRecId(options.getInspector(), options.getRecordIdColumn()))); this.writer = OrcFile.createWriter(this.path, writerOptions); + if (this.acidOperationalProperties.isSplitUpdate()) { + // If this is a split-update, we also open a writer that would write update/delete events to + // a separate file. This writes to a file in directory which starts with "delete_delta_..." + this.deleteEventPath = AcidUtils.createFilename(path, options.writingDeleteDelta(true)); + // Change the indexBuilder callback too for the deleteEvent file. + writerOptions.fileSystem(fs).callback(deleteEventIndexBuilder); + this.deleteEventWriter = OrcFile.createWriter(deleteEventPath, writerOptions); + } item = new OrcStruct(FIELDS); item.setFieldValue(OPERATION, operation); item.setFieldValue(CURRENT_TRANSACTION, currentTransaction); @@ -250,6 +276,7 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { item.setFieldValue(ROW_ID, rowId); } + @Override public String toString() { return getClass().getName() + "[" + path +"]"; } @@ -266,12 +293,13 @@ private long findRowIdOffsetForInsert() throws IOException { * Then, * 1. find the same bucket file in previous delta dir for this txn * 2. read the footer and get AcidStats which has insert count - * 2.1 if AcidStats.inserts>0 done + * 2.1 if AcidStats.inserts>0 add to the insert count. * else go to previous delta file * For example, consider insert/update/insert case...*/ if(options.getStatementId() <= 0) { return 0;//there is only 1 statement in this transaction (so far) } + long totalInserts = 0; for(int pastStmt = options.getStatementId() - 1; pastStmt >= 0; pastStmt--) { Path matchingBucket = AcidUtils.createFilename(options.getFinalDestination(), options.clone().statementId(pastStmt)); if(!fs.exists(matchingBucket)) { @@ -281,12 +309,10 @@ private long findRowIdOffsetForInsert() throws IOException { //no close() on Reader?! AcidStats acidStats = OrcAcidUtils.parseAcidStats(reader); if(acidStats.inserts > 0) { - return acidStats.inserts; + totalInserts += acidStats.inserts; } } - //if we got here, we looked at all delta files in this txn, prior to current statement and didn't - //find any inserts... - return 0; + return totalInserts; } // Find the record identifier column (if there) and return a possibly new ObjectInspector that // will strain out the record id for the underlying writer. @@ -307,6 +333,8 @@ private ObjectInspector findRecId(ObjectInspector inspector, int rowIdColNum) { // in RecordIdentifier is transactionId, bucketId, rowId originalTxnField = fields.get(0); origTxnInspector = (LongObjectInspector)originalTxnField.getFieldObjectInspector(); + bucketField = fields.get(1); + bucketInspector = (IntObjectInspector) bucketField.getFieldObjectInspector(); rowIdField = fields.get(2); rowIdInspector = (LongObjectInspector)rowIdField.getFieldObjectInspector(); @@ -316,7 +344,7 @@ private ObjectInspector findRecId(ObjectInspector inspector, int rowIdColNum) { } } - private void addEvent(int operation, long currentTransaction, long rowId, Object row) + private void addSimpleEvent(int operation, long currentTransaction, long rowId, Object row) throws IOException { this.operation.set(operation); this.currentTransaction.set(currentTransaction); @@ -334,11 +362,43 @@ else if(operation == INSERT_OPERATION) { } this.rowId.set(rowId); this.originalTransaction.set(originalTransaction); + item.setFieldValue(OrcRecordUpdater.OPERATION, new IntWritable(operation)); item.setFieldValue(OrcRecordUpdater.ROW, (operation == DELETE_OPERATION ? null : row)); indexBuilder.addKey(operation, originalTransaction, bucket.get(), rowId); writer.addRow(item); } + private void addSplitUpdateEvent(int operation, long currentTransaction, long rowId, Object row) + throws IOException { + if (operation == INSERT_OPERATION) { + // Just insert the record in the usual way, i.e., default to the simple behavior. + addSimpleEvent(operation, currentTransaction, rowId, row); + return; + } + this.operation.set(operation); + this.currentTransaction.set(currentTransaction); + Object rowValue = rowInspector.getStructFieldData(row, recIdField); + long originalTransaction = origTxnInspector.get( + recIdInspector.getStructFieldData(rowValue, originalTxnField)); + long prevRowId = rowIdInspector.get( + recIdInspector.getStructFieldData(rowValue, rowIdField)); + + if (operation == DELETE_OPERATION || operation == UPDATE_OPERATION) { + // A delete/update generates a delete event for the original row. + this.rowId.set(prevRowId); + this.originalTransaction.set(originalTransaction); + item.setFieldValue(OrcRecordUpdater.OPERATION, new IntWritable(DELETE_OPERATION)); + item.setFieldValue(OrcRecordUpdater.ROW, null); // ROW is null for delete events. + deleteEventIndexBuilder.addKey(DELETE_OPERATION, originalTransaction, bucket.get(), rowId); + deleteEventWriter.addRow(item); + } + + if (operation == UPDATE_OPERATION) { + // A new row is also inserted in the usual delta file for an update event. + addSimpleEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row); + } + } + @Override public void insert(long currentTransaction, Object row) throws IOException { if (this.currentTransaction.get() != currentTransaction) { @@ -347,7 +407,11 @@ public void insert(long currentTransaction, Object row) throws IOException { //always true in that case rowIdOffset = findRowIdOffsetForInsert(); } - addEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row); + if (acidOperationalProperties.isSplitUpdate()) { + addSplitUpdateEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row); + } else { + addSimpleEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row); + } rowCountDelta++; } @@ -355,8 +419,14 @@ public void insert(long currentTransaction, Object row) throws IOException { public void update(long currentTransaction, Object row) throws IOException { if (this.currentTransaction.get() != currentTransaction) { insertedRows = 0; + rowIdOffset = findRowIdOffsetForInsert(); + } + if (acidOperationalProperties.isSplitUpdate()) { + addSplitUpdateEvent(UPDATE_OPERATION, currentTransaction, -1L, row); + rowCountDelta++; // An update event creates a new row-id, in case of split-update. + } else { + addSimpleEvent(UPDATE_OPERATION, currentTransaction, -1L, row); } - addEvent(UPDATE_OPERATION, currentTransaction, -1L, row); } @Override @@ -364,9 +434,12 @@ public void delete(long currentTransaction, Object row) throws IOException { if (this.currentTransaction.get() != currentTransaction) { insertedRows = 0; } - addEvent(DELETE_OPERATION, currentTransaction, -1, row); + if (acidOperationalProperties.isSplitUpdate()) { + addSplitUpdateEvent(DELETE_OPERATION, currentTransaction, -1L, row); + } else { + addSimpleEvent(DELETE_OPERATION, currentTransaction, -1L, row); + } rowCountDelta--; - } @Override @@ -381,6 +454,12 @@ public void flush() throws IOException { long len = writer.writeIntermediateFooter(); flushLengths.writeLong(len); OrcInputFormat.SHIMS.hflush(flushLengths); + // Flush deleteEvent writers too, if any. + if (deleteEventWriter != null && deleteEventIndexBuilder.acidStats.deletes > 0) { + len = deleteEventWriter.writeIntermediateFooter(); + flushLengths.writeLong(len); + OrcInputFormat.SHIMS.hflush(flushLengths); + } } @Override @@ -388,15 +467,46 @@ public void close(boolean abort) throws IOException { if (abort) { if (flushLengths == null) { fs.delete(path, false); + if (deleteEventPath != null) { + fs.delete(deleteEventPath, false); + } } } else { - if (writer != null) writer.close(); + if (writer != null) { + if (acidOperationalProperties.isSplitUpdate()) { + // When split-update is enabled, we can choose not to write + // any delta files when there are no inserts. In such cases only the delete_deltas + // would be written & they are closed separately below. + if (indexBuilder.acidStats.inserts > 0) { + writer.close(); // normal close, when there are inserts. + } else { + // Just remove insert delta paths, when there are no insert events. + fs.delete(path, false); + } + } else { + writer.close(); // normal close. + } + } + if (deleteEventWriter != null) { + if (deleteEventIndexBuilder.acidStats.deletes > 0) { + // Only need to write out & close the delete_delta if there have been any. + deleteEventWriter.close(); + } else { + // Just remove delete_delta, if there have been no delete events. + fs.delete(deleteEventPath, false); + } + } + } if (flushLengths != null) { flushLengths.close(); fs.delete(OrcAcidUtils.getSideFile(path), false); + if (deleteEventPath != null) { + fs.delete(OrcAcidUtils.getSideFile(deleteEventPath), false); + } } writer = null; + deleteEventWriter = null; } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 9d927bd..de3a90e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -59,6 +59,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.common.ObjectPair; @@ -3076,10 +3077,11 @@ static protected void copyFiles(HiveConf conf, Path srcf, Path destf, private static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst, List newFiles) throws HiveException { - // The layout for ACID files is table|partname/base|delta/bucket + // The layout for ACID files is table|partname/base|delta|delete_delta/bucket // We will always only be writing delta files. In the buckets created by FileSinkOperator - // it will look like bucket/delta/bucket. So we need to move that into the above structure. - // For the first mover there will be no delta directory, so we can move the whole directory. + // it will look like bucket/delta|delete_delta/bucket. So we need to move that into + // the above structure. For the first mover there will be no delta directory, + // so we can move the whole directory. // For everyone else we will need to just move the buckets under the existing delta // directory. @@ -3102,49 +3104,58 @@ private static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst, for (FileStatus origBucketStat : origBucketStats) { Path origBucketPath = origBucketStat.getPath(); - LOG.debug("Acid move looking for delta files in bucket " + origBucketPath); + moveAcidDeltaFiles(AcidUtils.DELTA_PREFIX, AcidUtils.deltaFileFilter, + fs, dst, origBucketPath, createdDeltaDirs, newFiles); + moveAcidDeltaFiles(AcidUtils.DELETE_DELTA_PREFIX, AcidUtils.deleteEventDeltaDirFilter, + fs, dst,origBucketPath, createdDeltaDirs, newFiles); + } + } + } - FileStatus[] deltaStats = null; - try { - deltaStats = fs.listStatus(origBucketPath, AcidUtils.deltaFileFilter); - } catch (IOException e) { - throw new HiveException("Unable to look for delta files in original bucket " + - origBucketPath.toUri().toString(), e); - } - LOG.debug("Acid move found " + deltaStats.length + " delta files"); - - for (FileStatus deltaStat : deltaStats) { - Path deltaPath = deltaStat.getPath(); - // Create the delta directory. Don't worry if it already exists, - // as that likely means another task got to it first. Then move each of the buckets. - // it would be more efficient to try to move the delta with it's buckets but that is - // harder to make race condition proof. - Path deltaDest = new Path(dst, deltaPath.getName()); + private static void moveAcidDeltaFiles(String deltaFileType, PathFilter pathFilter, FileSystem fs, + Path dst, Path origBucketPath, Set createdDeltaDirs, + List newFiles) throws HiveException { + LOG.debug("Acid move looking for " + deltaFileType + " files in bucket " + origBucketPath); + + FileStatus[] deltaStats = null; + try { + deltaStats = fs.listStatus(origBucketPath, pathFilter); + } catch (IOException e) { + throw new HiveException("Unable to look for " + deltaFileType + " files in original bucket " + + origBucketPath.toUri().toString(), e); + } + LOG.debug("Acid move found " + deltaStats.length + " " + deltaFileType + " files"); + + for (FileStatus deltaStat : deltaStats) { + Path deltaPath = deltaStat.getPath(); + // Create the delta directory. Don't worry if it already exists, + // as that likely means another task got to it first. Then move each of the buckets. + // it would be more efficient to try to move the delta with it's buckets but that is + // harder to make race condition proof. + Path deltaDest = new Path(dst, deltaPath.getName()); + try { + if (!createdDeltaDirs.contains(deltaDest)) { try { - if (!createdDeltaDirs.contains(deltaDest)) { - try { - fs.mkdirs(deltaDest); - createdDeltaDirs.add(deltaDest); - } catch (IOException swallowIt) { - // Don't worry about this, as it likely just means it's already been created. - LOG.info("Unable to create delta directory " + deltaDest + - ", assuming it already exists: " + swallowIt.getMessage()); - } - } - FileStatus[] bucketStats = fs.listStatus(deltaPath, AcidUtils.bucketFileFilter); - LOG.debug("Acid move found " + bucketStats.length + " bucket files"); - for (FileStatus bucketStat : bucketStats) { - Path bucketSrc = bucketStat.getPath(); - Path bucketDest = new Path(deltaDest, bucketSrc.getName()); - LOG.info("Moving bucket " + bucketSrc.toUri().toString() + " to " + - bucketDest.toUri().toString()); - fs.rename(bucketSrc, bucketDest); - if (newFiles != null) newFiles.add(bucketDest); - } - } catch (IOException e) { - throw new HiveException("Error moving acid files " + e.getMessage(), e); + fs.mkdirs(deltaDest); + createdDeltaDirs.add(deltaDest); + } catch (IOException swallowIt) { + // Don't worry about this, as it likely just means it's already been created. + LOG.info("Unable to create " + deltaFileType + " directory " + deltaDest + + ", assuming it already exists: " + swallowIt.getMessage()); } } + FileStatus[] bucketStats = fs.listStatus(deltaPath, AcidUtils.bucketFileFilter); + LOG.debug("Acid move found " + bucketStats.length + " bucket files"); + for (FileStatus bucketStat : bucketStats) { + Path bucketSrc = bucketStat.getPath(); + Path bucketDest = new Path(deltaDest, bucketSrc.getName()); + LOG.info("Moving bucket " + bucketSrc.toUri().toString() + " to " + + bucketDest.toUri().toString()); + fs.rename(bucketSrc, bucketDest); + if (newFiles != null) newFiles.add(bucketDest); + } + } catch (IOException e) { + throw new HiveException("Error moving acid files " + e.getMessage(), e); } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java index 8cf261d..0353ef6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java @@ -101,6 +101,8 @@ private boolean isAcidTable; + private AcidUtils.AcidOperationalProperties acidOperationalProperties; + private transient TableSample tableSample; private transient Table tableMetadata; @@ -127,6 +129,11 @@ public TableScanDesc(final String alias, List vcs, Table tblMetad this.virtualCols = vcs; this.tableMetadata = tblMetadata; isAcidTable = AcidUtils.isAcidTable(this.tableMetadata); + if (isAcidTable) { + acidOperationalProperties = AcidUtils.getAcidOperationalProperties(this.tableMetadata); + } else { + acidOperationalProperties = null; + } } @Override @@ -159,6 +166,10 @@ public boolean isAcidTable() { return isAcidTable; } + public AcidUtils.AcidOperationalProperties getAcidOperationalProperties() { + return acidOperationalProperties; + } + @Explain(displayName = "Output", explainLevels = { Level.USER }) public List getOutputColumnNames() { return this.neededColumns; diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 6caca98..39324b2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -17,9 +17,16 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; -import org.apache.hadoop.hive.common.ValidCompactorTxnList; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.regex.Matcher; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; @@ -27,6 +34,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.StringableMap; +import org.apache.hadoop.hive.common.ValidCompactorTxnList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.CompactionType; @@ -61,12 +69,8 @@ import org.apache.hadoop.mapred.TaskAttemptContext; import org.apache.hadoop.mapred.lib.NullOutputFormat; import org.apache.hadoop.util.StringUtils; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.*; -import java.util.regex.Matcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Class to do compactions via an MR job. This has to be in the ql package rather than metastore @@ -128,7 +132,7 @@ private JobConf createBaseJobConf(HiveConf conf, String jobName, Table t, Storag job.set(TABLE_PROPS, new StringableMap(t.getParameters()).toString()); job.setInt(NUM_BUCKETS, sd.getNumBuckets()); job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString()); - overrideMRProps(job, t.getParameters()); // override MR properties from tblproperties if applicable + overrideMRProps(job, t.getParameters()); if (ci.properties != null) { // override MR properties and general tblproperties if applicable overrideTblProps(job, t.getParameters(), ci.properties); } @@ -137,6 +141,11 @@ private JobConf createBaseJobConf(HiveConf conf, String jobName, Table t, Storag //to generate the target dir in the Map task, there is no easy way to pass it to OutputCommitter //to do the final move job.setBoolean("mapreduce.map.speculative", false); + + // Set appropriate Acid readers/writers based on the table properties. + AcidUtils.setAcidOperationalProperties(job, + AcidUtils.getAcidOperationalProperties(t.getParameters())); + return job; } @@ -494,6 +503,8 @@ public String toString() { @Override public InputSplit[] getSplits(JobConf entries, int i) throws IOException { Path baseDir = null; + AcidUtils.AcidOperationalProperties acidOperationalProperties = + AcidUtils.getAcidOperationalProperties(entries); if (entries.get(BASE_DIR) != null) baseDir = new Path(entries.get(BASE_DIR)); StringableList tmpDeltaDirs = new StringableList(entries.get(DELTA_DIRS)); Path[] deltaDirs = tmpDeltaDirs.toArray(new Path[tmpDeltaDirs.size()]); @@ -502,23 +513,56 @@ public String toString() { for (Path dir : dirsToSearch) { FileSystem fs = dir.getFileSystem(entries); - // If this is a base or delta directory, then we need to be looking for the bucket files. - // But if it's a legacy file then we need to add it directly. - if (dir.getName().startsWith(AcidUtils.BASE_PREFIX) || - dir.getName().startsWith(AcidUtils.DELTA_PREFIX)) { - boolean sawBase = dir.getName().startsWith(AcidUtils.BASE_PREFIX); - FileStatus[] files = fs.listStatus(dir, AcidUtils.bucketFileFilter); - for(FileStatus f : files) { - // For each file, figure out which bucket it is. - Matcher matcher = AcidUtils.BUCKET_DIGIT_PATTERN.matcher(f.getPath().getName()); - addFileToMap(matcher, f.getPath(), sawBase, splitToBucketMap); + if (acidOperationalProperties.isSplitUpdate()) { + // When we have split-update and there are two kinds of delta directories- + // the delta_x_y/ directory one which has only insert events and + // the delete_delta_x_y/ directory which has only the delete events. + // The clever thing about this kind of splitting is that everything in the delta_x_y/ + // directory can be processed as base files. However, this is left out currently + // as an improvement for the future. + + if (dir.getName().startsWith(AcidUtils.BASE_PREFIX) || + dir.getName().startsWith(AcidUtils.DELTA_PREFIX) || + dir.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)) { + boolean sawBase = + dir.getName().startsWith(AcidUtils.BASE_PREFIX) ? true : false; + + FileStatus[] files = fs.listStatus(dir, AcidUtils.bucketFileFilter); + for(FileStatus f : files) { + // For each file, figure out which bucket it is. + Matcher matcher = AcidUtils.BUCKET_DIGIT_PATTERN.matcher(f.getPath().getName()); + addFileToMap(matcher, f.getPath(), sawBase, splitToBucketMap); + } + } else { + // Legacy file, see if it's a bucket file + Matcher matcher = AcidUtils.LEGACY_BUCKET_DIGIT_PATTERN.matcher(dir.getName()); + addFileToMap(matcher, dir, true, splitToBucketMap); } + } else { - // Legacy file, see if it's a bucket file - Matcher matcher = AcidUtils.LEGACY_BUCKET_DIGIT_PATTERN.matcher(dir.getName()); - addFileToMap(matcher, dir, true, splitToBucketMap); + // There is only kind of delta directory and therefore all the delta files are treated + // the usual way. + + // If this is a base or delta directory, then we need to be looking for the bucket files. + // But if it's a legacy file then we need to add it directly. + if (dir.getName().startsWith(AcidUtils.BASE_PREFIX) || + dir.getName().startsWith(AcidUtils.DELTA_PREFIX)) { + boolean sawBase = dir.getName().startsWith(AcidUtils.BASE_PREFIX); + FileStatus[] files = fs.listStatus(dir, AcidUtils.bucketFileFilter); + for(FileStatus f : files) { + // For each file, figure out which bucket it is. + Matcher matcher = AcidUtils.BUCKET_DIGIT_PATTERN.matcher(f.getPath().getName()); + addFileToMap(matcher, f.getPath(), sawBase, splitToBucketMap); + } + } else { + // Legacy file, see if it's a bucket file + Matcher matcher = AcidUtils.LEGACY_BUCKET_DIGIT_PATTERN.matcher(dir.getName()); + addFileToMap(matcher, dir, true, splitToBucketMap); + } } } + + List splits = new ArrayList(splitToBucketMap.size()); for (Map.Entry e : splitToBucketMap.entrySet()) { BucketTracker bt = e.getValue(); @@ -613,7 +657,8 @@ public float getProgress() throws IOException { implements Mapper { JobConf jobConf; - RecordWriter writer; + RecordWriter writer = null; + RecordWriter deleteEventWriter = null; @Override public void map(WritableComparable key, CompactorInputSplit split, @@ -636,10 +681,30 @@ public void map(WritableComparable key, CompactorInputSplit split, RecordIdentifier identifier = reader.createKey(); V value = reader.createValue(); getWriter(reporter, reader.getObjectInspector(), split.getBucket()); + + AcidUtils.AcidOperationalProperties acidOperationalProperties + = AcidUtils.getAcidOperationalProperties(jobConf); + + if (!isMajor && acidOperationalProperties.isSplitUpdate()) { + // When split-update is enabled for ACID, we initialize a separate deleteEventWriter + // that is used to write all the delete events (in case of minor compaction only). For major + // compaction, history is not required to be maintained hence the delete events are processed + // but not re-written separately. + getDeleteEventWriter(reporter, reader.getObjectInspector(), split.getBucket()); + } + while (reader.next(identifier, value)) { - if (isMajor && reader.isDelete(value)) continue; - writer.write(value); - reporter.progress(); + boolean sawDeleteRecord = reader.isDelete(value); + if (isMajor && sawDeleteRecord) continue; + if (!isMajor && sawDeleteRecord && acidOperationalProperties.isSplitUpdate()) { + // When minor compacting, write delete events to a separate file when split-update is + // turned on. + deleteEventWriter.write(value); + reporter.progress(); + } else { + writer.write(value); + reporter.progress(); + } } } @@ -653,6 +718,9 @@ public void close() throws IOException { if (writer != null) { writer.close(false); } + if (deleteEventWriter != null) { + deleteEventWriter.close(false); + } } private void getWriter(Reporter reporter, ObjectInspector inspector, @@ -679,6 +747,30 @@ private void getWriter(Reporter reporter, ObjectInspector inspector, } } + private void getDeleteEventWriter(Reporter reporter, ObjectInspector inspector, + int bucket) throws IOException { + if (deleteEventWriter == null) { + AcidOutputFormat.Options options = new AcidOutputFormat.Options(jobConf); + options.inspector(inspector) + .writingBase(false) + .writingDeleteDelta(true) // this is the option which will make it a delete writer + .isCompressed(jobConf.getBoolean(IS_COMPRESSED, false)) + .tableProperties(new StringableMap(jobConf.get(TABLE_PROPS)).toProperties()) + .reporter(reporter) + .minimumTransactionId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE)) + .maximumTransactionId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE)) + .bucket(bucket) + .statementId(-1);//setting statementId == -1 makes compacted delta files use + //delta_xxxx_yyyy format + + // Instantiate the underlying output format + @SuppressWarnings("unchecked")//since there is no way to parametrize instance of Class + AcidOutputFormat aof = + instantiate(AcidOutputFormat.class, jobConf.get(OUTPUT_FORMAT_CLASS_NAME)); + + deleteEventWriter = aof.getRawRecordWriter(new Path(jobConf.get(TMP_LOCATION)), options); + } + } } static class StringableList extends ArrayList { diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index d48e441..53c0fd7 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -18,6 +18,17 @@ package org.apache.hadoop.hive.ql; +import java.io.File; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; @@ -36,9 +47,9 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; -import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.txn.AcidCompactionHistoryService; @@ -52,20 +63,9 @@ import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.rules.TestName; -import java.io.File; -import java.io.IOException; -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - /** * TODO: this should be merged with TestTxnCommands once that is checked in * specifically the tests; the supporting code here is just a clone of TestTxnCommands @@ -80,6 +80,10 @@ private static int BUCKET_COUNT = 2; @Rule public TestName testName = new TestName(); + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + private HiveConf hiveConf; private Driver d; private static enum Table { @@ -703,6 +707,474 @@ public void testNonAcidToAcidConversion3() throws Exception { Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); } + /** + * In current implementation of ACID, altering the value of transactional_properties or trying to + * set a value for previously unset value for an acid table will throw an exception. + * @throws Exception + */ + @Test + public void testFailureOnAlteringTransactionalProperties() throws Exception { + expectedException.expect(RuntimeException.class); + expectedException.expectMessage("TBLPROPERTIES with 'transactional_properties' cannot be altered after the table is created"); + runStatementOnDriver("alter table " + Table.ACIDTBL + " SET TBLPROPERTIES ('transactional_properties' = 'default')"); + } + + /** + * Test the query correctness and directory layout for ACID table conversion with split-update + * enabled. + * 1. Insert a row to Non-ACID table + * 2. Convert Non-ACID to ACID table with split-update enabled + * 3. Insert a row to ACID table + * 4. Perform Major compaction + * 5. Clean + * @throws Exception + */ + @Test + public void testNonAcidToAcidSplitUpdateConversion1() throws Exception { + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] status; + + // 1. Insert a row to Non-ACID table + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 original bucket files in the location (000000_0 and 000001_0) + Assert.assertEquals(BUCKET_COUNT, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + List rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + int [][] resultData = new int[][] {{1, 2}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + int resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 2. Convert NONACIDORCTBL to ACID table + runStatementOnDriver("alter table " + Table.NONACIDORCTBL + + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // Everything should be same as before + Assert.assertEquals(BUCKET_COUNT, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 2}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 3. Insert another row to newly-converted ACID table + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(3,4)"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 original bucket files (000000_0 and 000001_0), plus a new delta directory. + // The delta directory should also have only 1 bucket file (bucket_00001) + Assert.assertEquals(3, status.length); + boolean sawNewDelta = false; + for (int i = 0; i < status.length; i++) { + if (status[i].getPath().getName().matches("delta_.*")) { + sawNewDelta = true; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(1, buckets.length); // only one bucket file + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001")); + } else { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + } + Assert.assertTrue(sawNewDelta); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 2}, {3, 4}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 2; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 4. Perform a major compaction + runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'"); + runWorker(hiveConf); + // There should be 1 new directory: base_xxxxxxx. + // Original bucket files and delta directory should stay until Cleaner kicks in. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(4, status.length); + boolean sawNewBase = false; + for (int i = 0; i < status.length; i++) { + if (status[i].getPath().getName().matches("base_.*")) { + sawNewBase = true; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(1, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001")); + } + } + Assert.assertTrue(sawNewBase); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 2}, {3, 4}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 2; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 5. Let Cleaner delete obsolete files/dirs + // Note, here we create a fake directory along with fake files as original directories/files + String fakeFile0 = TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase() + + "/subdir/000000_0"; + String fakeFile1 = TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase() + + "/subdir/000000_1"; + fs.create(new Path(fakeFile0)); + fs.create(new Path(fakeFile1)); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // Before Cleaner, there should be 5 items: + // 2 original files, 1 original directory, 1 base directory and 1 delta directory + Assert.assertEquals(5, status.length); + runCleaner(hiveConf); + // There should be only 1 directory left: base_xxxxxxx. + // Original bucket files and delta directory should have been cleaned up. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(1, status.length); + Assert.assertTrue(status[0].getPath().getName().matches("base_.*")); + FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(1, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001")); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 2}, {3, 4}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 2; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + } + + /** + * Test the query correctness and directory layout for ACID table conversion with split-update + * enabled. + * 1. Insert a row to Non-ACID table + * 2. Convert Non-ACID to ACID table with split update enabled. + * 3. Update the existing row in ACID table + * 4. Perform Major compaction + * 5. Clean + * @throws Exception + */ + @Test + public void testNonAcidToAcidSplitUpdateConversion2() throws Exception { + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] status; + + // 1. Insert a row to Non-ACID table + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 original bucket files in the location (000000_0 and 000001_0) + Assert.assertEquals(BUCKET_COUNT, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + List rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + int [][] resultData = new int[][] {{1, 2}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + int resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 2. Convert NONACIDORCTBL to ACID table + runStatementOnDriver("alter table " + Table.NONACIDORCTBL + + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // Everything should be same as before + Assert.assertEquals(BUCKET_COUNT, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 2}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 3. Update the existing row in newly-converted ACID table + runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where a=1"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 original bucket files (000000_0 and 000001_0), plus one delta directory + // and one delete_delta directory. When split-update is enabled, an update event is split into + // a combination of delete and insert, that generates the delete_delta directory. + // The delta directory should also have 2 bucket files (bucket_00000 and bucket_00001) + // and so should the delete_delta directory. + Assert.assertEquals(4, status.length); + boolean sawNewDelta = false; + boolean sawNewDeleteDelta = false; + for (int i = 0; i < status.length; i++) { + if (status[i].getPath().getName().matches("delta_.*")) { + sawNewDelta = true; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + } else if (status[i].getPath().getName().matches("delete_delta_.*")) { + sawNewDeleteDelta = true; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + } else { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + } + Assert.assertTrue(sawNewDelta); + Assert.assertTrue(sawNewDeleteDelta); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 3}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 4. Perform a major compaction + runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'"); + runWorker(hiveConf); + // There should be 1 new directory: base_0000001. + // Original bucket files and delta directory should stay until Cleaner kicks in. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(5, status.length); + boolean sawNewBase = false; + for (int i = 0; i < status.length; i++) { + if (status[i].getPath().getName().matches("base_.*")) { + sawNewBase = true; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001")); + } + } + Assert.assertTrue(sawNewBase); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 3}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 5. Let Cleaner delete obsolete files/dirs + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // Before Cleaner, there should be 5 items: + // 2 original files, 1 delta directory, 1 delete_delta directory and 1 base directory + Assert.assertEquals(5, status.length); + runCleaner(hiveConf); + // There should be only 1 directory left: base_0000001. + // Original bucket files, delta directory and delete_delta should have been cleaned up. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(1, status.length); + Assert.assertTrue(status[0].getPath().getName().matches("base_.*")); + FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001")); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 3}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + } + + /** + * Test the query correctness and directory layout for ACID table conversion with split-update + * enabled. + * 1. Insert a row to Non-ACID table + * 2. Convert Non-ACID to ACID table with split-update enabled + * 3. Perform Major compaction + * 4. Insert a new row to ACID table + * 5. Perform another Major compaction + * 6. Clean + * @throws Exception + */ + @Test + public void testNonAcidToAcidSplitUpdateConversion3() throws Exception { + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] status; + + // 1. Insert a row to Non-ACID table + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 original bucket files in the location (000000_0 and 000001_0) + Assert.assertEquals(BUCKET_COUNT, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + List rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + int [][] resultData = new int[][] {{1, 2}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + int resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 2. Convert NONACIDORCTBL to ACID table with split_update enabled. (txn_props=default) + runStatementOnDriver("alter table " + Table.NONACIDORCTBL + + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // Everything should be same as before + Assert.assertEquals(BUCKET_COUNT, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 2}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 3. Perform a major compaction + runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'"); + runWorker(hiveConf); + // There should be 1 new directory: base_-9223372036854775808 + // Original bucket files should stay until Cleaner kicks in. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(3, status.length); + boolean sawNewBase = false; + for (int i = 0; i < status.length; i++) { + if (status[i].getPath().getName().matches("base_.*")) { + Assert.assertEquals("base_-9223372036854775808", status[i].getPath().getName()); + sawNewBase = true; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); + } + } + Assert.assertTrue(sawNewBase); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 2}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 4. Update the existing row, and insert another row to newly-converted ACID table + runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where a=1"); + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(3,4)"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Arrays.sort(status); // make sure delta_0000001_0000001_0000 appears before delta_0000002_0000002_0000 + // There should be 2 original bucket files (000000_0 and 000001_0), a base directory, + // plus two new delta directories and one delete_delta directory that would be created due to + // the update statement (remember split-update U=D+I)! + Assert.assertEquals(6, status.length); + int numDelta = 0; + int numDeleteDelta = 0; + sawNewBase = false; + for (int i = 0; i < status.length; i++) { + if (status[i].getPath().getName().matches("delta_.*")) { + numDelta++; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Arrays.sort(buckets); + if (numDelta == 1) { + Assert.assertEquals("delta_0000001_0000001_0000", status[i].getPath().getName()); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); + } else if (numDelta == 2) { + Assert.assertEquals("delta_0000002_0000002_0000", status[i].getPath().getName()); + Assert.assertEquals(1, buckets.length); + Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); + } + } else if (status[i].getPath().getName().matches("delete_delta_.*")) { + numDeleteDelta++; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Arrays.sort(buckets); + if (numDeleteDelta == 1) { + Assert.assertEquals("delete_delta_0000001_0000001_0000", status[i].getPath().getName()); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); + } + } else if (status[i].getPath().getName().matches("base_.*")) { + Assert.assertEquals("base_-9223372036854775808", status[i].getPath().getName()); + sawNewBase = true; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); + } else { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + } + Assert.assertEquals(2, numDelta); + Assert.assertEquals(1, numDeleteDelta); + Assert.assertTrue(sawNewBase); + + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 3}, {3, 4}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 2; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 5. Perform another major compaction + runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'"); + runWorker(hiveConf); + // There should be 1 new base directory: base_0000001 + // Original bucket files, delta directories, delete_delta directories and the + // previous base directory should stay until Cleaner kicks in. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Arrays.sort(status); + Assert.assertEquals(7, status.length); + int numBase = 0; + for (int i = 0; i < status.length; i++) { + if (status[i].getPath().getName().matches("base_.*")) { + numBase++; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Arrays.sort(buckets); + if (numBase == 1) { + Assert.assertEquals("base_-9223372036854775808", status[i].getPath().getName()); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); + } else if (numBase == 2) { + // The new base dir now has two bucket files, since the delta dir has two bucket files + Assert.assertEquals("base_0000002", status[i].getPath().getName()); + Assert.assertEquals(1, buckets.length); + Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); + } + } + } + Assert.assertEquals(2, numBase); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 3}, {3, 4}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 2; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 6. Let Cleaner delete obsolete files/dirs + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // Before Cleaner, there should be 6 items: + // 2 original files, 2 delta directories, 1 delete_delta directory and 2 base directories + Assert.assertEquals(7, status.length); + runCleaner(hiveConf); + // There should be only 1 directory left: base_0000001. + // Original bucket files, delta directories and previous base directory should have been cleaned up. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(1, status.length); + Assert.assertEquals("base_0000002", status[0].getPath().getName()); + FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Arrays.sort(buckets); + Assert.assertEquals(1, buckets.length); + Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 3}, {3, 4}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 2; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + } + @Test public void testValidTxnsBookkeeping() throws Exception { // 1. Run a query against a non-ACID table, and we shouldn't have txn logged in conf @@ -831,11 +1303,18 @@ private static void checkCompactionState(CompactionsByState expected, Compaction */ @Test public void testInitiatorWithMultipleFailedCompactions() throws Exception { + // Test with split-update turned off. + testInitiatorWithMultipleFailedCompactionsForVariousTblProperties("'transactional'='true'"); + // Test with split-update turned on. + testInitiatorWithMultipleFailedCompactionsForVariousTblProperties("'transactional'='true','transactional_properties'='default'"); + } + + private void testInitiatorWithMultipleFailedCompactionsForVariousTblProperties(String tblProperties) throws Exception { String tblName = "hive12353"; runStatementOnDriver("drop table if exists " + tblName); runStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed - " STORED AS ORC TBLPROPERTIES ('transactional'='true')"); + " STORED AS ORC TBLPROPERTIES ( " + tblProperties + " )"); hiveConf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 4); for(int i = 0; i < 5; i++) { //generate enough delta files so that Initiator can trigger auto compaction @@ -1046,11 +1525,18 @@ public static void runHouseKeeperService(HouseKeeperService houseKeeperService, */ @Test public void writeBetweenWorkerAndCleaner() throws Exception { + // Test with split-update turned off. + writeBetweenWorkerAndCleanerForVariousTblProperties("'transactional'='true'"); + // Test with split-update turned on. + writeBetweenWorkerAndCleanerForVariousTblProperties("'transactional'='true','transactional_properties'='default'"); + } + + private void writeBetweenWorkerAndCleanerForVariousTblProperties(String tblProperties) throws Exception { String tblName = "hive12352"; runStatementOnDriver("drop table if exists " + tblName); runStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed - " STORED AS ORC TBLPROPERTIES ('transactional'='true')"); + " STORED AS ORC TBLPROPERTIES ( " + tblProperties + " )"); //create some data runStatementOnDriver("insert into " + tblName + " values(1, 'foo'),(2, 'bar'),(3, 'baz')"); @@ -1097,7 +1583,6 @@ public void writeBetweenWorkerAndCleaner() throws Exception { Assert.assertEquals("", expected, runStatementOnDriver("select a,b from " + tblName + " order by a")); } - /** * Simulate the scenario when a heartbeat failed due to client errors such as no locks or no txns being found. * When a heartbeat fails, the query should be failed too. diff --git ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java index b83cea4..d7cfdd5 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java @@ -17,11 +17,20 @@ */ package org.apache.hadoop.hive.ql.io; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidCompactorTxnList; import org.apache.hadoop.hive.common.ValidReadTxnList; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat; import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFile; import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFileSystem; @@ -30,13 +39,6 @@ import org.junit.Assert; import org.junit.Test; -import java.io.IOException; -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - public class TestAcidUtils { @Test @@ -60,12 +62,23 @@ public void testCreateFilename() throws Exception { options.writingBase(false); assertEquals("/tmp/delta_0000100_0000200_0000/bucket_00023", AcidUtils.createFilename(p, options).toString()); + options.writingDeleteDelta(true); + assertEquals("/tmp/delete_delta_0000100_0000200_0000/bucket_00023", + AcidUtils.createFilename(p, options).toString()); + options.writingDeleteDelta(false); options.statementId(-1); assertEquals("/tmp/delta_0000100_0000200/bucket_00023", AcidUtils.createFilename(p, options).toString()); + options.writingDeleteDelta(true); + assertEquals("/tmp/delete_delta_0000100_0000200/bucket_00023", + AcidUtils.createFilename(p, options).toString()); + options.writingDeleteDelta(false); options.statementId(7); assertEquals("/tmp/delta_0000100_0000200_0007/bucket_00023", AcidUtils.createFilename(p, options).toString()); + options.writingDeleteDelta(true); + assertEquals("/tmp/delete_delta_0000100_0000200_0007/bucket_00023", + AcidUtils.createFilename(p, options).toString()); } @Test public void testCreateFilenameLargeIds() throws Exception { @@ -86,7 +99,6 @@ public void testCreateFilenameLargeIds() throws Exception { assertEquals("/tmp/delta_1234567880_1234567890_0000/bucket_00023", AcidUtils.createFilename(p, options).toString()); } - @Test public void testParsing() throws Exception { @@ -94,19 +106,34 @@ public void testParsing() throws Exception { Path dir = new Path("/tmp/tbl"); Configuration conf = new Configuration(); AcidOutputFormat.Options opts = - AcidUtils.parseBaseBucketFilename(new Path(dir, "base_567/bucket_123"), + AcidUtils.parseBaseOrDeltaBucketFilename(new Path(dir, "base_567/bucket_123"), conf); assertEquals(false, opts.getOldStyle()); assertEquals(true, opts.isWritingBase()); assertEquals(567, opts.getMaximumTransactionId()); assertEquals(0, opts.getMinimumTransactionId()); assertEquals(123, opts.getBucket()); - opts = AcidUtils.parseBaseBucketFilename(new Path(dir, "000123_0"), conf); + opts = AcidUtils.parseBaseOrDeltaBucketFilename(new Path(dir, "delta_000005_000006/bucket_00001"), + conf); + assertEquals(false, opts.getOldStyle()); + assertEquals(false, opts.isWritingBase()); + assertEquals(6, opts.getMaximumTransactionId()); + assertEquals(5, opts.getMinimumTransactionId()); + assertEquals(1, opts.getBucket()); + opts = AcidUtils.parseBaseOrDeltaBucketFilename(new Path(dir, "delete_delta_000005_000006/bucket_00001"), + conf); + assertEquals(false, opts.getOldStyle()); + assertEquals(false, opts.isWritingBase()); + assertEquals(6, opts.getMaximumTransactionId()); + assertEquals(5, opts.getMinimumTransactionId()); + assertEquals(1, opts.getBucket()); + opts = AcidUtils.parseBaseOrDeltaBucketFilename(new Path(dir, "000123_0"), conf); assertEquals(true, opts.getOldStyle()); assertEquals(true, opts.isWritingBase()); assertEquals(123, opts.getBucket()); assertEquals(0, opts.getMinimumTransactionId()); assertEquals(0, opts.getMaximumTransactionId()); + } @Test @@ -470,5 +497,228 @@ public void deltasWithOpenTxnsNotInCompact2() throws Exception { assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString()); } + @Test + public void testBaseWithDeleteDeltas() throws Exception { + Configuration conf = new Configuration(); + conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, 1); + MockFileSystem fs = new MockFileSystem(conf, + new MockFile("mock:/tbl/part1/base_5/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/base_10/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/base_49/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_025_025/bucket_0", 0, new byte[0]), + new MockFile("mock:/tbl/part1/delta_029_029/bucket_0", 0, new byte[0]), + new MockFile("mock:/tbl/part1/delete_delta_029_029/bucket_0", 0, new byte[0]), + new MockFile("mock:/tbl/part1/delta_025_030/bucket_0", 0, new byte[0]), + new MockFile("mock:/tbl/part1/delete_delta_025_030/bucket_0", 0, new byte[0]), + new MockFile("mock:/tbl/part1/delta_050_105/bucket_0", 0, new byte[0]), + new MockFile("mock:/tbl/part1/delete_delta_050_105/bucket_0", 0, new byte[0]), + new MockFile("mock:/tbl/part1/delete_delta_110_110/bucket_0", 0, new byte[0])); + AcidUtils.Directory dir = + AcidUtils.getAcidState(new TestInputOutputFormat.MockPath(fs, + "mock:/tbl/part1"), conf, new ValidReadTxnList("100:")); + assertEquals("mock:/tbl/part1/base_49", dir.getBaseDirectory().toString()); + List obsolete = dir.getObsolete(); + assertEquals(7, obsolete.size()); + assertEquals("mock:/tbl/part1/base_10", obsolete.get(0).getPath().toString()); + assertEquals("mock:/tbl/part1/base_5", obsolete.get(1).getPath().toString()); + assertEquals("mock:/tbl/part1/delete_delta_025_030", obsolete.get(2).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_025_030", obsolete.get(3).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_025_025", obsolete.get(4).getPath().toString()); + assertEquals("mock:/tbl/part1/delete_delta_029_029", obsolete.get(5).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_029_029", obsolete.get(6).getPath().toString()); + assertEquals(0, dir.getOriginalFiles().size()); + List deltas = dir.getCurrentDirectories(); + assertEquals(2, deltas.size()); + assertEquals("mock:/tbl/part1/delete_delta_050_105", deltas.get(0).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_050_105", deltas.get(1).getPath().toString()); + // The delete_delta_110_110 should not be read because it is greater than the high watermark. + } + + @Test + public void testOverlapingDeltaAndDeleteDelta() throws Exception { + Configuration conf = new Configuration(); + conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, 1); + MockFileSystem fs = new MockFileSystem(conf, + new MockFile("mock:/tbl/part1/delta_0000063_63/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_000062_62/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_00061_61/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delete_delta_00064_64/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_40_60/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delete_delta_40_60/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_0060_60/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_052_55/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delete_delta_052_55/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/base_50/bucket_0", 500, new byte[0])); + Path part = new MockPath(fs, "mock:/tbl/part1"); + AcidUtils.Directory dir = + AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:")); + assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString()); + List obsolete = dir.getObsolete(); + assertEquals(3, obsolete.size()); + assertEquals("mock:/tbl/part1/delete_delta_052_55", obsolete.get(0).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_052_55", obsolete.get(1).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_0060_60", obsolete.get(2).getPath().toString()); + List delts = dir.getCurrentDirectories(); + assertEquals(6, delts.size()); + assertEquals("mock:/tbl/part1/delete_delta_40_60", delts.get(0).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_40_60", delts.get(1).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_00061_61", delts.get(2).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_000062_62", delts.get(3).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_0000063_63", delts.get(4).getPath().toString()); + assertEquals("mock:/tbl/part1/delete_delta_00064_64", delts.get(5).getPath().toString()); + } + + @Test + public void testMinorCompactedDeltaMakesInBetweenDelteDeltaObsolete() throws Exception { + // This test checks that if we have a minor compacted delta for the txn range [40,60] + // then it will make any delete delta in that range as obsolete. + Configuration conf = new Configuration(); + conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, + AcidUtils.AcidOperationalProperties.getDefault().toInt()); + MockFileSystem fs = new MockFileSystem(conf, + new MockFile("mock:/tbl/part1/delta_40_60/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delete_delta_50_50/bucket_0", 500, new byte[0])); + Path part = new MockPath(fs, "mock:/tbl/part1"); + AcidUtils.Directory dir = + AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:")); + List obsolete = dir.getObsolete(); + assertEquals(1, obsolete.size()); + assertEquals("mock:/tbl/part1/delete_delta_50_50", obsolete.get(0).getPath().toString()); + List delts = dir.getCurrentDirectories(); + assertEquals(1, delts.size()); + assertEquals("mock:/tbl/part1/delta_40_60", delts.get(0).getPath().toString()); + } + + @Test + public void deltasAndDeleteDeltasWithOpenTxnsNotInCompact() throws Exception { + // This tests checks that appropriate delta and delete_deltas are included when minor + // compactions specifies a valid open txn range. + Configuration conf = new Configuration(); + conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, + AcidUtils.AcidOperationalProperties.getDefault().toInt()); + MockFileSystem fs = new MockFileSystem(conf, + new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delete_delta_2_2/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_2_5/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delete_delta_2_5/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_2_5/bucket_0" + AcidUtils.DELTA_SIDE_FILE_SUFFIX, 500, + new byte[0]), + new MockFile("mock:/tbl/part1/delete_delta_7_7/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_6_10/bucket_0", 500, new byte[0])); + Path part = new MockPath(fs, "mock:/tbl/part1"); + AcidUtils.Directory dir = + AcidUtils.getAcidState(part, conf, new ValidCompactorTxnList("100:4")); + List delts = dir.getCurrentDirectories(); + assertEquals(2, delts.size()); + assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString()); + assertEquals("mock:/tbl/part1/delete_delta_2_2", delts.get(1).getPath().toString()); + } + + @Test + public void deleteDeltasWithOpenTxnInRead() throws Exception { + Configuration conf = new Configuration(); + conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, + AcidUtils.AcidOperationalProperties.getDefault().toInt()); + MockFileSystem fs = new MockFileSystem(conf, + new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_2_5/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delete_delta_2_5/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delete_delta_3_3/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_4_4_1/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_4_4_3/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_101_101_1/bucket_0", 500, new byte[0])); + Path part = new MockPath(fs, "mock:/tbl/part1"); + AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:4")); + List delts = dir.getCurrentDirectories(); + assertEquals(3, delts.size()); + assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString()); + assertEquals("mock:/tbl/part1/delete_delta_2_5", delts.get(1).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_2_5", delts.get(2).getPath().toString()); + // Note that delete_delta_3_3 should not be read, when a minor compacted + // [delete_]delta_2_5 is present. + } + + @Test + public void testDeleteDeltaSubdirPathGeneration() throws Exception { + String deleteDeltaSubdirPath = AcidUtils.deleteDeltaSubdir(1, 10); + assertEquals("delete_delta_0000001_0000010", deleteDeltaSubdirPath); + deleteDeltaSubdirPath = AcidUtils.deleteDeltaSubdir(1, 10, 5); + assertEquals("delete_delta_0000001_0000010_0005", deleteDeltaSubdirPath); + } + + @Test + public void testDeleteEventDeltaDirPathFilter() throws Exception { + Path positivePath = new Path("delete_delta_000001_000010"); + Path negativePath = new Path("delta_000001_000010"); + assertEquals(true, AcidUtils.deleteEventDeltaDirFilter.accept(positivePath)); + assertEquals(false, AcidUtils.deleteEventDeltaDirFilter.accept(negativePath)); + } + + @Test + public void testAcidOperationalProperties() throws Exception { + AcidUtils.AcidOperationalProperties testObj = AcidUtils.AcidOperationalProperties.getLegacy(); + assertsForAcidOperationalProperties(testObj, "legacy"); + + testObj = AcidUtils.AcidOperationalProperties.getDefault(); + assertsForAcidOperationalProperties(testObj, "default"); + + testObj = AcidUtils.AcidOperationalProperties.parseInt(0); + assertsForAcidOperationalProperties(testObj, "legacy"); -} + testObj = AcidUtils.AcidOperationalProperties.parseInt(1); + assertsForAcidOperationalProperties(testObj, "split_update"); + + testObj = AcidUtils.AcidOperationalProperties.parseString("legacy"); + assertsForAcidOperationalProperties(testObj, "legacy"); + + testObj = AcidUtils.AcidOperationalProperties.parseString("default"); + assertsForAcidOperationalProperties(testObj, "default"); + + } + + private void assertsForAcidOperationalProperties(AcidUtils.AcidOperationalProperties testObj, + String type) throws Exception { + switch(type) { + case "split_update": + case "default": + assertEquals(true, testObj.isSplitUpdate()); + assertEquals(false, testObj.isHashBasedMerge()); + assertEquals(1, testObj.toInt()); + assertEquals("|split_update", testObj.toString()); + break; + case "legacy": + assertEquals(false, testObj.isSplitUpdate()); + assertEquals(false, testObj.isHashBasedMerge()); + assertEquals(0, testObj.toInt()); + assertEquals("", testObj.toString()); + break; + default: + break; + } + } + + @Test + public void testAcidOperationalPropertiesSettersAndGetters() throws Exception { + AcidUtils.AcidOperationalProperties oprProps = AcidUtils.AcidOperationalProperties.getDefault(); + Configuration testConf = new Configuration(); + // Test setter for configuration object. + AcidUtils.setAcidOperationalProperties(testConf, oprProps); + assertEquals(1, testConf.getInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, 0)); + // Test getter for configuration object. + assertEquals(oprProps.toString(), AcidUtils.getAcidOperationalProperties(testConf).toString()); + + Map parameters = new HashMap(); + // Test setter for map object. + AcidUtils.setAcidOperationalProperties(parameters, oprProps); + assertEquals(oprProps.toString(), + parameters.get(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname)); + // Test getter for map object. + // Calling a get on the 'parameters' will still return legacy type because the setters/getters + // for map work on different string keys. The setter will set the HIVE_TXN_OPERATIONAL_PROPERTIES + // while the getter will try to read the key TABLE_TRANSACTIONAL_PROPERTIES. + assertEquals(0, AcidUtils.getAcidOperationalProperties(parameters).toInt()); + parameters.put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, oprProps.toString()); + // Set the appropriate key in the map and test that we are able to read it back correctly. + assertEquals(1, AcidUtils.getAcidOperationalProperties(parameters).toInt()); + } +} \ No newline at end of file