diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 70816bd..b68391b 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -18,7 +18,32 @@ package org.apache.hadoop.hive.conf; -import com.google.common.base.Joiner; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintStream; +import java.io.UnsupportedEncodingException; +import java.net.URI; +import java.net.URL; +import java.net.URLDecoder; +import java.net.URLEncoder; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import javax.security.auth.login.LoginException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -42,27 +67,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.security.auth.login.LoginException; - -import java.io.*; -import java.net.URI; -import java.net.URL; -import java.net.URLDecoder; -import java.net.URLEncoder; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; +import com.google.common.base.Joiner; /** * Hive Configuration. @@ -261,6 +266,7 @@ private static URL checkConfigFile(File f) { HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, HiveConf.ConfVars.HIVE_TXN_MANAGER, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, + HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES, HiveConf.ConfVars.HIVE_TXN_HEARTBEAT_THREADPOOL_SIZE, HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH, HiveConf.ConfVars.HIVE_TXN_RETRYABLE_SQLEX_REGEX, @@ -1762,6 +1768,13 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal " of the lock manager is dumped to log file. This is for debugging. See also " + "hive.lock.numretries and hive.lock.sleep.between.retries."), + HIVE_TXN_OPERATIONAL_PROPERTIES("hive.txn.operational.properties", 0, + "Sets the operational properties that control the appropriate behavior for various " + + "versions of the Hive ACID subsystem. Setting it to zero will turn on the legacy mode for " + + "ACID, while setting it to one will enable a split-update feature found in the newer " + + "version of Hive ACID subsystem. Mostly it is intended to be used as an internal property " + + "for future versions of ACID. (See HIVE-14035 for details.)"), + HIVE_MAX_OPEN_TXNS("hive.max.open.txns", 100000, "Maximum number of open transactions. If \n" + "current open transactions reach this limit, future open transaction requests will be \n" + "rejected, until this number goes below the limit."), diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java index 14f7316..b970153 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java @@ -133,6 +133,9 @@ public void configureInputJobProperties(TableDesc tableDesc, boolean isAcidTable = AcidUtils.isTablePropertyTransactional(tableProperties); AcidUtils.setTransactionalTableScan(jobProperties, isAcidTable); + AcidUtils.AcidOperationalProperties acidOperationalProperties = + AcidUtils.getAcidOperationalProperties(tableProperties); + AcidUtils.setAcidOperationalProperties(jobProperties, acidOperationalProperties); } } catch (IOException e) { throw new IllegalStateException("Failed to set output path", e); diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java index 974c6b8..b8615cb 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java @@ -48,6 +48,7 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.Properties; public abstract class AbstractRecordWriter implements RecordWriter { @@ -265,10 +266,16 @@ public void closeBatch() throws StreamingIOFailure { private RecordUpdater createRecordUpdater(int bucketId, Long minTxnId, Long maxTxnID) throws IOException, SerializationError { try { + // Initialize table properties from the table parameters. This is required because the table + // may define certain table parameters that may be required while writing. The table parameter + // 'transactional_properties' is one such example. + Properties tblProperties = new Properties(); + tblProperties.putAll(tbl.getParameters()); return outf.getRecordUpdater(partitionPath, new AcidOutputFormat.Options(conf) .inspector(getSerde().getObjectInspector()) .bucket(bucketId) + .tableProperties(tblProperties) .minimumTransactionId(minTxnId) .maximumTransactionId(maxTxnID) .statementId(-1) diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index 731caa8..f81752f 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -17,6 +17,19 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -68,20 +81,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.TimeUnit; - /** */ public class TestCompactor { @@ -857,6 +856,297 @@ public void majorCompactAfterAbort() throws Exception { } } + @Test + public void majorCompactWhileStreamingForSplitUpdate() throws Exception { + String dbName = "default"; + String tblName = "cws"; + List colNames = Arrays.asList("a", "b"); + String columnNamesProperty = "a,b"; + String columnTypesProperty = "int:string"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + + " CLUSTERED BY(a) INTO 2 BUCKETS" + //currently ACID requires table to be bucketed + " STORED AS ORC TBLPROPERTIES ('transactional'='true', " + + "'transactional_properties'='default') ", driver); // this turns on split-update U=D+I + + HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null); + DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); + try { + // Write a couple of batches + for (int i = 0; i < 2; i++) { + writeBatch(connection, writer, false); + } + + // Start a third batch, but don't close it. + writeBatch(connection, writer, true); + + // Now, compact + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR)); + Worker t = new Worker(); + t.setThreadId((int) t.getId()); + t.setHiveConf(conf); + AtomicBoolean stop = new AtomicBoolean(true); + AtomicBoolean looped = new AtomicBoolean(); + t.init(stop, looped); + t.run(); + + // Find the location of the table + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + Table table = msClient.getTable(dbName, tblName); + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = + fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.baseFileFilter); + if (1 != stat.length) { + Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat)); + } + String name = stat[0].getPath().getName(); + Assert.assertEquals(name, "base_0000004"); + checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L); + } finally { + connection.close(); + } + } + + @Test + public void testMinorCompactionForSplitUpdateWithInsertsAndDeletes() throws Exception { + String agentInfo = "UT_" + Thread.currentThread().getName(); + String dbName = "default"; + String tblName = "cws"; + List colNames = Arrays.asList("a", "b"); + String columnNamesProperty = "a,b"; + String columnTypesProperty = "int:string"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed + " STORED AS ORC TBLPROPERTIES ('transactional'='true'," + + "'transactional_properties'='default')", driver); + + // Insert some data -> this will generate only insert deltas and no delete deltas: delta_1_1 + executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(1, 'foo')", driver); + + // Insert some data -> this will again generate only insert deltas and no delete deltas: delta_2_2 + executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(2, 'bar')", driver); + + // Delete some data -> this will generate only delete deltas and no insert deltas: delete_delta_3_3 + executeStatementOnDriver("DELETE FROM " + tblName +" WHERE a = 2", driver); + + // Now, compact -> Compaction produces a single range for both delta and delete delta + // That is, both delta and delete_deltas would be compacted into delta_1_3 and delete_delta_1_3 + // even though there are only two delta_1_1, delta_2_2 and one delete_delta_3_3. + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR)); + Worker t = new Worker(); + t.setThreadId((int) t.getId()); + t.setHiveConf(conf); + AtomicBoolean stop = new AtomicBoolean(true); + AtomicBoolean looped = new AtomicBoolean(); + t.init(stop, looped); + t.run(); + + // Find the location of the table + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + Table table = msClient.getTable(dbName, tblName); + FileSystem fs = FileSystem.get(conf); + + // Verify that we have got correct set of deltas. + FileStatus[] stat = + fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deltaFileFilter); + String[] deltas = new String[stat.length]; + Path minorCompactedDelta = null; + for (int i = 0; i < deltas.length; i++) { + deltas[i] = stat[i].getPath().getName(); + if (deltas[i].equals("delta_0000001_0000003")) { + minorCompactedDelta = stat[i].getPath(); + } + } + Arrays.sort(deltas); + String[] expectedDeltas = new String[]{"delta_0000001_0000001_0000", "delta_0000001_0000003", "delta_0000002_0000002_0000"}; + if (!Arrays.deepEquals(expectedDeltas, deltas)) { + Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas)); + } + checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 1L, 2L); + + // Verify that we have got correct set of delete_deltas. + FileStatus[] deleteDeltaStat = + fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deleteEventDeltaDirFilter); + String[] deleteDeltas = new String[deleteDeltaStat.length]; + Path minorCompactedDeleteDelta = null; + for (int i = 0; i < deleteDeltas.length; i++) { + deleteDeltas[i] = deleteDeltaStat[i].getPath().getName(); + if (deleteDeltas[i].equals("delete_delta_0000001_0000003")) { + minorCompactedDeleteDelta = deleteDeltaStat[i].getPath(); + } + } + Arrays.sort(deleteDeltas); + String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000003", "delete_delta_0000003_0000003_0000"}; + if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) { + Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas)); + } + checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 2L, 2L); + } + + @Test + public void testMinorCompactionForSplitUpdateWithOnlyInserts() throws Exception { + String agentInfo = "UT_" + Thread.currentThread().getName(); + String dbName = "default"; + String tblName = "cws"; + List colNames = Arrays.asList("a", "b"); + String columnNamesProperty = "a,b"; + String columnTypesProperty = "int:string"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed + " STORED AS ORC TBLPROPERTIES ('transactional'='true'," + + "'transactional_properties'='default')", driver); + + // Insert some data -> this will generate only insert deltas and no delete deltas: delta_1_1 + executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(1, 'foo')", driver); + + // Insert some data -> this will again generate only insert deltas and no delete deltas: delta_2_2 + executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(2, 'bar')", driver); + + // Now, compact + // One important thing to note in this test is that minor compaction always produces + // delta_x_y and a counterpart delete_delta_x_y, even when there are no delete_delta events. + // Such a choice has been made to simplify processing of AcidUtils.getAcidState(). + + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR)); + Worker t = new Worker(); + t.setThreadId((int) t.getId()); + t.setHiveConf(conf); + AtomicBoolean stop = new AtomicBoolean(true); + AtomicBoolean looped = new AtomicBoolean(); + t.init(stop, looped); + t.run(); + + // Find the location of the table + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + Table table = msClient.getTable(dbName, tblName); + FileSystem fs = FileSystem.get(conf); + + // Verify that we have got correct set of deltas. + FileStatus[] stat = + fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deltaFileFilter); + String[] deltas = new String[stat.length]; + Path minorCompactedDelta = null; + for (int i = 0; i < deltas.length; i++) { + deltas[i] = stat[i].getPath().getName(); + if (deltas[i].equals("delta_0000001_0000002")) { + minorCompactedDelta = stat[i].getPath(); + } + } + Arrays.sort(deltas); + String[] expectedDeltas = new String[]{"delta_0000001_0000001_0000", "delta_0000001_0000002", "delta_0000002_0000002_0000"}; + if (!Arrays.deepEquals(expectedDeltas, deltas)) { + Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas)); + } + checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 1L, 2L); + + // Verify that we have got correct set of delete_deltas. + FileStatus[] deleteDeltaStat = + fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deleteEventDeltaDirFilter); + String[] deleteDeltas = new String[deleteDeltaStat.length]; + Path minorCompactedDeleteDelta = null; + for (int i = 0; i < deleteDeltas.length; i++) { + deleteDeltas[i] = deleteDeltaStat[i].getPath().getName(); + if (deleteDeltas[i].equals("delete_delta_0000001_0000002")) { + minorCompactedDeleteDelta = deleteDeltaStat[i].getPath(); + } + } + Arrays.sort(deleteDeltas); + String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000002"}; + if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) { + Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas)); + } + // There should be no rows in the delete_delta because there have been no delete events. + checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 0L, 0L); + } + + @Test + public void minorCompactWhileStreamingWithSplitUpdate() throws Exception { + String dbName = "default"; + String tblName = "cws"; + List colNames = Arrays.asList("a", "b"); + String columnNamesProperty = "a,b"; + String columnTypesProperty = "int:string"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed + " STORED AS ORC TBLPROPERTIES ('transactional'='true'," + + "'transactional_properties'='default')", driver); + + HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null); + DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt); + StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); + try { + // Write a couple of batches + for (int i = 0; i < 2; i++) { + writeBatch(connection, writer, false); + } + + // Start a third batch, but don't close it. + writeBatch(connection, writer, true); + + // Now, compact + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR)); + Worker t = new Worker(); + t.setThreadId((int) t.getId()); + t.setHiveConf(conf); + AtomicBoolean stop = new AtomicBoolean(true); + AtomicBoolean looped = new AtomicBoolean(); + t.init(stop, looped); + t.run(); + + // Find the location of the table + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + Table table = msClient.getTable(dbName, tblName); + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = + fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deltaFileFilter); + String[] names = new String[stat.length]; + Path resultFile = null; + for (int i = 0; i < names.length; i++) { + names[i] = stat[i].getPath().getName(); + if (names[i].equals("delta_0000001_0000004")) { + resultFile = stat[i].getPath(); + } + } + Arrays.sort(names); + String[] expected = new String[]{"delta_0000001_0000002", + "delta_0000001_0000004", "delta_0000003_0000004", "delta_0000005_0000006"}; + if (!Arrays.deepEquals(expected, names)) { + Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names)); + } + checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty, 0, 1L, 4L); + + // Verify that we have got correct set of delete_deltas also + FileStatus[] deleteDeltaStat = + fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deleteEventDeltaDirFilter); + String[] deleteDeltas = new String[deleteDeltaStat.length]; + Path minorCompactedDeleteDelta = null; + for (int i = 0; i < deleteDeltas.length; i++) { + deleteDeltas[i] = deleteDeltaStat[i].getPath().getName(); + if (deleteDeltas[i].equals("delete_delta_0000001_0000004")) { + minorCompactedDeleteDelta = deleteDeltaStat[i].getPath(); + } + } + Arrays.sort(deleteDeltas); + String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000004"}; + if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) { + Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas)); + } + // There should be no rows in the delete_delta because there have been no delete events. + checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 0L, 0L); + + } finally { + connection.close(); + } + } + /** * Users have the choice of specifying compaction related tblproperties either in CREATE TABLE * statement or in ALTER TABLE .. COMPACT statement. This tests both cases. diff --git metastore/if/hive_metastore.thrift metastore/if/hive_metastore.thrift index a2e35b8..872c0f3 100755 --- metastore/if/hive_metastore.thrift +++ metastore/if/hive_metastore.thrift @@ -1475,5 +1475,6 @@ const string FILE_OUTPUT_FORMAT = "file.outputformat", const string META_TABLE_STORAGE = "storage_handler", const string TABLE_IS_TRANSACTIONAL = "transactional", const string TABLE_NO_AUTO_COMPACT = "no_auto_compaction", +const string TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties", diff --git metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp index f982bf2..1cbd176 100644 --- metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp +++ metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp @@ -57,6 +57,8 @@ hive_metastoreConstants::hive_metastoreConstants() { TABLE_NO_AUTO_COMPACT = "no_auto_compaction"; + TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties"; + } }}} // namespace diff --git metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.h metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.h index ae14bd1..3d068c3 100644 --- metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.h +++ metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.h @@ -38,6 +38,7 @@ class hive_metastoreConstants { std::string META_TABLE_STORAGE; std::string TABLE_IS_TRANSACTIONAL; std::string TABLE_NO_AUTO_COMPACT; + std::string TABLE_TRANSACTIONAL_PROPERTIES; }; extern const hive_metastoreConstants g_hive_metastore_constants; diff --git metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java index 5a666f2..8de8896 100644 --- metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java +++ metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java @@ -82,4 +82,6 @@ public static final String TABLE_NO_AUTO_COMPACT = "no_auto_compaction"; + public static final String TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties"; + } diff --git metastore/src/gen/thrift/gen-php/metastore/Types.php metastore/src/gen/thrift/gen-php/metastore/Types.php index d6f7f49..2f9cc9b 100644 --- metastore/src/gen/thrift/gen-php/metastore/Types.php +++ metastore/src/gen/thrift/gen-php/metastore/Types.php @@ -18865,6 +18865,7 @@ final class Constant extends \Thrift\Type\TConstant { static protected $META_TABLE_STORAGE; static protected $TABLE_IS_TRANSACTIONAL; static protected $TABLE_NO_AUTO_COMPACT; + static protected $TABLE_TRANSACTIONAL_PROPERTIES; static protected function init_DDL_TIME() { return "transient_lastDdlTime"; @@ -18957,6 +18958,10 @@ final class Constant extends \Thrift\Type\TConstant { static protected function init_TABLE_NO_AUTO_COMPACT() { return "no_auto_compaction"; } + + static protected function init_TABLE_TRANSACTIONAL_PROPERTIES() { + return "transactional_properties"; + } } diff --git metastore/src/gen/thrift/gen-py/hive_metastore/constants.py metastore/src/gen/thrift/gen-py/hive_metastore/constants.py index d1c07a5..5100236 100644 --- metastore/src/gen/thrift/gen-py/hive_metastore/constants.py +++ metastore/src/gen/thrift/gen-py/hive_metastore/constants.py @@ -32,3 +32,4 @@ META_TABLE_STORAGE = "storage_handler" TABLE_IS_TRANSACTIONAL = "transactional" TABLE_NO_AUTO_COMPACT = "no_auto_compaction" +TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties" diff --git metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb index eeccc84..6aa7143 100644 --- metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb +++ metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb @@ -53,3 +53,5 @@ TABLE_IS_TRANSACTIONAL = %q"transactional" TABLE_NO_AUTO_COMPACT = %q"no_auto_compaction" +TABLE_TRANSACTIONAL_PROPERTIES = %q"transactional_properties" + diff --git metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java index 3e74675..acaa3da 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java @@ -17,25 +17,37 @@ */ package org.apache.hadoop.hive.metastore; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.events.PreAlterTableEvent; import org.apache.hadoop.hive.metastore.events.PreCreateTableEvent; import org.apache.hadoop.hive.metastore.events.PreEventContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -final class TransactionalValidationListener extends MetaStorePreEventListener { +public final class TransactionalValidationListener extends MetaStorePreEventListener { public static final Logger LOG = LoggerFactory.getLogger(TransactionalValidationListener.class); + // Values mirrored from org.apache.hadoop.hive.ql.io.AcidUtils. + // We could have imported the constants but that would create a cyclic dependency + // between hive.metastore and hive.ql, hence is duplicated to avoid that. + public static final String DEFAULT_TRANSACTIONAL_PROPERTY = "default"; + public static final String LEGACY_TRANSACTIONAL_PROPERTY = "legacy"; + TransactionalValidationListener(Configuration conf) { super(conf); } + @Override public void onEvent(PreEventContext context) throws MetaException, NoSuchObjectException, InvalidOperationException { switch (context.getEventType()) { @@ -60,6 +72,8 @@ private void handle(PreCreateTableEvent context) throws MetaException { /** * once a table is marked transactional, you cannot go back. Enforce this. + * Also in current version, 'transactional_properties' of the table cannot be altered after + * the table is created. Any attempt to alter it will throw a MetaException. */ private void handleAlterTableTransactionalProp(PreAlterTableEvent context) throws MetaException { Table newTable = context.getNewTable(); @@ -70,12 +84,22 @@ private void handleAlterTableTransactionalProp(PreAlterTableEvent context) throw Set keys = new HashSet<>(parameters.keySet()); String transactionalValue = null; boolean transactionalValuePresent = false; + boolean isTransactionalPropertiesPresent = false; + String transactionalPropertiesValue = null; + boolean hasValidTransactionalValue = false; + for (String key : keys) { if(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.equalsIgnoreCase(key)) { transactionalValuePresent = true; transactionalValue = parameters.get(key); parameters.remove(key); } + if(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES.equalsIgnoreCase(key)) { + isTransactionalPropertiesPresent = true; + transactionalPropertiesValue = parameters.get(key); + // Do not remove the parameter yet, because we have separate initialization routine + // that will use it down below. + } } if (transactionalValuePresent) { //normalize prop name @@ -91,24 +115,52 @@ private void handleAlterTableTransactionalProp(PreAlterTableEvent context) throw throw new MetaException(newTable.getDbName() + "." + newTable.getTableName() + " cannot be declared transactional because it's an external table"); } - - return; + hasValidTransactionalValue = true; } + Table oldTable = context.getOldTable(); String oldTransactionalValue = null; + String oldTransactionalPropertiesValue = null; for (String key : oldTable.getParameters().keySet()) { if (hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.equalsIgnoreCase(key)) { oldTransactionalValue = oldTable.getParameters().get(key); } + if (hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES.equalsIgnoreCase(key)) { + oldTransactionalPropertiesValue = oldTable.getParameters().get(key); + } } + + if (oldTransactionalValue == null ? transactionalValue == null : oldTransactionalValue.equalsIgnoreCase(transactionalValue)) { //this covers backward compat cases where this prop may have been set already - return; + hasValidTransactionalValue = true; + } + + if (!hasValidTransactionalValue) { + // if here, there is attempt to set transactional to something other than 'true' + // and NOT the same value it was before + throw new MetaException("TBLPROPERTIES with 'transactional'='true' cannot be unset"); + } + + if (isTransactionalPropertiesPresent) { + // Now validate transactional_properties for the table. + if (oldTransactionalValue == null) { + // If this is the first time the table is being initialized to 'transactional=true', + // any valid value can be set for the 'transactional_properties'. + initializeTransactionalProperties(newTable); + } else { + // If the table was already marked as 'transactional=true', then the new value of + // 'transactional_properties' must match the old value. Any attempt to alter the previous + // value will throw an error. An exception will still be thrown if the previous value was + // null and an attempt is made to set it. This behaviour can be changed in the future. + if (oldTransactionalPropertiesValue == null + || !oldTransactionalPropertiesValue.equalsIgnoreCase(transactionalPropertiesValue) ) { + throw new MetaException("TBLPROPERTIES with 'transactional_properties' cannot be " + + "altered after the table is created"); + } + } } - // if here, there is attempt to set transactional to something other than 'true' - // and NOT the same value it was before - throw new MetaException("TBLPROPERTIES with 'transactional'='true' cannot be unset"); } /** @@ -157,6 +209,7 @@ private void handleCreateTableTransactionalProp(PreCreateTableEvent context) thr // normalize prop name parameters.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.TRUE.toString()); + initializeTransactionalProperties(newTable); return; } @@ -187,4 +240,53 @@ private boolean conformToAcid(Table table) throws MetaException { return true; } -} \ No newline at end of file + + private void initializeTransactionalProperties(Table table) throws MetaException { + // All new versions of Acid tables created after the introduction of Acid version/type system + // can have TRANSACTIONAL_PROPERTIES property defined. This parameter can be used to change + // the operational behavior of ACID. However if this parameter is not defined, the new Acid + // tables will still behave as the old ones. This is done so to preserve the behavior + // in case of rolling downgrade. + + // Initialize transaction table properties with default string value. + String tableTransactionalProperties = null; + + Map parameters = table.getParameters(); + if (parameters != null) { + Set keys = parameters.keySet(); + for (String key : keys) { + if (hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES.equalsIgnoreCase(key)) { + tableTransactionalProperties = parameters.get(key).toLowerCase(); + parameters.remove(key); + String validationError = validateTransactionalProperties(tableTransactionalProperties); + if (validationError != null) { + throw new MetaException("Invalid transactional properties specified for the " + + "table with the error " + validationError); + } + break; + } + } + } + + if (tableTransactionalProperties != null) { + parameters.put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, + tableTransactionalProperties); + } + } + + private String validateTransactionalProperties(String transactionalProperties) { + boolean isValid = false; + switch (transactionalProperties) { + case DEFAULT_TRANSACTIONAL_PROPERTY: + case LEGACY_TRANSACTIONAL_PROPERTY: + isValid = true; + break; + default: + isValid = false; + } + if (!isValid) { + return "unknown value for transactional_properties"; + } + return null; // All checks passed, return null. + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java index db6848a..8c7d99d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java @@ -81,6 +81,7 @@ public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext HiveInputFormat.pushFilters(job, ts); AcidUtils.setTransactionalTableScan(job, ts.getConf().isAcidTable()); + AcidUtils.setAcidOperationalProperties(job, ts.getConf().getAcidOperationalProperties()); } sink = work.getSink(); fetch = new FetchOperator(work, job, source, getVirtualColumns(source)); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java index 57b6c67..584eff4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java @@ -212,6 +212,7 @@ public void initializeMapredLocalWork(MapJoinDesc mjConf, Configuration hconf, HiveInputFormat.pushFilters(jobClone, ts); AcidUtils.setTransactionalTableScan(jobClone, ts.getConf().isAcidTable()); + AcidUtils.setAcidOperationalProperties(jobClone, ts.getConf().getAcidOperationalProperties()); ts.passExecContext(getExecContext()); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java index 26e6443..ac922ce 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java @@ -472,6 +472,7 @@ private void initializeOperators(Map fetchOpJobConfMap) HiveInputFormat.pushFilters(jobClone, ts); AcidUtils.setTransactionalTableScan(jobClone, ts.getConf().isAcidTable()); + AcidUtils.setAcidOperationalProperties(jobClone, ts.getConf().getAcidOperationalProperties()); // create a fetch operator FetchOperator fetchOp = new FetchOperator(entry.getValue(), jobClone); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java index dd90a95..b85b827 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java @@ -18,6 +18,10 @@ package org.apache.hadoop.hive.ql.io; +import java.io.IOException; +import java.io.PrintStream; +import java.util.Properties; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -26,10 +30,6 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.Reporter; -import java.io.IOException; -import java.io.PrintStream; -import java.util.Properties; - /** * An extension for OutputFormats that want to implement ACID transactions. * @param the row type of the file @@ -44,6 +44,7 @@ private FileSystem fs; private ObjectInspector inspector; private boolean writingBase = false; + private boolean writingDeleteDelta = false; private boolean isCompressed = false; private Properties properties; private Reporter reporter; @@ -98,6 +99,16 @@ public Options writingBase(boolean val) { } /** + * Is this writing a delete delta directory? + * @param val is this a delete delta file? + * @return this + */ + public Options writingDeleteDelta(boolean val) { + this.writingDeleteDelta = val; + return this; + } + + /** * Provide a file system to the writer. Otherwise, the filesystem for the * path will be used. * @param fs the file system that corresponds to the the path @@ -223,7 +234,7 @@ public Options finalDestination(Path p) { this.finalDestination = p; return this; } - + public Configuration getConfiguration() { return configuration; } @@ -260,6 +271,10 @@ public boolean isWritingBase() { return writingBase; } + public boolean isWritingDeleteDelta() { + return writingDeleteDelta; + } + public int getBucket() { return bucket; } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 449d889..34025db 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -18,10 +18,14 @@ package org.apache.hadoop.hive.ql.io; -import org.apache.hadoop.hive.metastore.api.DataOperationType; -import org.apache.hadoop.hive.ql.ErrorMsg; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.regex.Pattern; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -30,22 +34,19 @@ import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.api.DataOperationType; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.TransactionalValidationListener; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.shims.HadoopShims; -import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.regex.Pattern; - /** * Utilities that are shared by all of the ACID input and output formats. They * are used by the compactor and cleaner and thus must be format agnostic. @@ -61,6 +62,7 @@ public boolean accept(Path path) { } }; public static final String DELTA_PREFIX = "delta_"; + public static final String DELETE_DELTA_PREFIX = "delete_delta_"; public static final String DELTA_SIDE_FILE_SUFFIX = "_flush_length"; public static final PathFilter deltaFileFilter = new PathFilter() { @Override @@ -68,6 +70,12 @@ public boolean accept(Path path) { return path.getName().startsWith(DELTA_PREFIX); } }; + public static final PathFilter deleteEventDeltaDirFilter = new PathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().startsWith(DELETE_DELTA_PREFIX); + } + }; public static final String BUCKET_PREFIX = "bucket_"; public static final PathFilter bucketFileFilter = new PathFilter() { @Override @@ -142,6 +150,25 @@ public static String deltaSubdir(long min, long max, int statementId) { return deltaSubdir(min, max) + "_" + String.format(STATEMENT_DIGITS, statementId); } + /** + * This is format of delete delta dir name prior to Hive 2.2.x + */ + @VisibleForTesting + static String deleteDeltaSubdir(long min, long max) { + return DELETE_DELTA_PREFIX + String.format(DELTA_DIGITS, min) + "_" + + String.format(DELTA_DIGITS, max); + } + + /** + * Each write statement in a transaction creates its own delete delta dir, + * when split-update acid operational property is turned on. + * @since 2.2.x + */ + @VisibleForTesting + static String deleteDeltaSubdir(long min, long max, int statementId) { + return deleteDeltaSubdir(min, max) + "_" + String.format(STATEMENT_DIGITS, statementId); + } + public static String baseDir(long txnId) { return BASE_PREFIX + String.format(DELTA_DIGITS, txnId); } @@ -163,12 +190,19 @@ public static Path createFilename(Path directory, } else if(options.getStatementId() == -1) { //when minor compaction runs, we collapse per statement delta files inside a single //transaction so we no longer need a statementId in the file name - subdir = deltaSubdir(options.getMinimumTransactionId(), - options.getMaximumTransactionId()); + subdir = options.isWritingDeleteDelta() ? + deleteDeltaSubdir(options.getMinimumTransactionId(), + options.getMaximumTransactionId()) + : deltaSubdir(options.getMinimumTransactionId(), + options.getMaximumTransactionId()); } else { - subdir = deltaSubdir(options.getMinimumTransactionId(), - options.getMaximumTransactionId(), - options.getStatementId()); + subdir = options.isWritingDeleteDelta() ? + deleteDeltaSubdir(options.getMinimumTransactionId(), + options.getMaximumTransactionId(), + options.getStatementId()) + : deltaSubdir(options.getMinimumTransactionId(), + options.getMaximumTransactionId(), + options.getStatementId()); } return createBucketFile(new Path(directory, subdir), options.getBucket()); } @@ -195,11 +229,10 @@ static long parseBase(Path path) { * @return the options used to create that filename */ public static AcidOutputFormat.Options - parseBaseBucketFilename(Path bucketFile, - Configuration conf) { + parseBaseOrDeltaBucketFilename(Path bucketFile, + Configuration conf) { AcidOutputFormat.Options result = new AcidOutputFormat.Options(conf); String filename = bucketFile.getName(); - result.writingBase(true); if (ORIGINAL_PATTERN.matcher(filename).matches()) { int bucket = Integer.parseInt(filename.substring(0, filename.indexOf('_'))); @@ -207,18 +240,37 @@ static long parseBase(Path path) { .setOldStyle(true) .minimumTransactionId(0) .maximumTransactionId(0) - .bucket(bucket); + .bucket(bucket) + .writingBase(true); } else if (filename.startsWith(BUCKET_PREFIX)) { int bucket = Integer.parseInt(filename.substring(filename.indexOf('_') + 1)); - result - .setOldStyle(false) - .minimumTransactionId(0) - .maximumTransactionId(parseBase(bucketFile.getParent())) - .bucket(bucket); + if (bucketFile.getParent().getName().startsWith(BASE_PREFIX)) { + result + .setOldStyle(false) + .minimumTransactionId(0) + .maximumTransactionId(parseBase(bucketFile.getParent())) + .bucket(bucket) + .writingBase(true); + } else if (bucketFile.getParent().getName().startsWith(DELTA_PREFIX)) { + ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELTA_PREFIX); + result + .setOldStyle(false) + .minimumTransactionId(parsedDelta.minTransaction) + .maximumTransactionId(parsedDelta.maxTransaction) + .bucket(bucket); + } else if (bucketFile.getParent().getName().startsWith(DELETE_DELTA_PREFIX)) { + ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELETE_DELTA_PREFIX); + result + .setOldStyle(false) + .minimumTransactionId(parsedDelta.minTransaction) + .maximumTransactionId(parsedDelta.maxTransaction) + .bucket(bucket); + } } else { result.setOldStyle(true).bucket(-1).minimumTransactionId(0) - .maximumTransactionId(0); + .maximumTransactionId(0) + .writingBase(true); } return result; } @@ -248,6 +300,140 @@ public static DataOperationType toDataOperationType(Operation op) { } } + public static class AcidOperationalProperties { + private int description = 0x00; + public static final int SPLIT_UPDATE_BIT = 0x01; + public static final String SPLIT_UPDATE_STRING = "split_update"; + public static final int HASH_BASED_MERGE_BIT = 0x02; + public static final String HASH_BASED_MERGE_STRING = "hash_merge"; + public static final String DEFAULT_VALUE_STRING = TransactionalValidationListener.DEFAULT_TRANSACTIONAL_PROPERTY; + public static final String LEGACY_VALUE_STRING = TransactionalValidationListener.LEGACY_TRANSACTIONAL_PROPERTY; + + private AcidOperationalProperties() { + } + + /** + * Returns an acidOperationalProperties object that represents ACID behavior for legacy tables + * that were created before ACID type system using operational properties was put in place. + * @return the acidOperationalProperties object + */ + public static AcidOperationalProperties getLegacy() { + AcidOperationalProperties obj = new AcidOperationalProperties(); + // In legacy mode, none of these properties are turned on. + return obj; + } + + /** + * Returns an acidOperationalProperties object that represents default ACID behavior for tables + * that do no explicitly specify/override the default behavior. + * @return the acidOperationalProperties object. + */ + public static AcidOperationalProperties getDefault() { + AcidOperationalProperties obj = new AcidOperationalProperties(); + obj.setSplitUpdate(true); + obj.setHashBasedMerge(false); + return obj; + } + + /** + * Returns an acidOperationalProperties object that is represented by an encoded string. + * @param propertiesStr an encoded string representing the acidOperationalProperties. + * @return the acidOperationalProperties object. + */ + public static AcidOperationalProperties parseString(String propertiesStr) { + if (propertiesStr == null) { + return AcidOperationalProperties.getLegacy(); + } + if (propertiesStr.equalsIgnoreCase(DEFAULT_VALUE_STRING)) { + return AcidOperationalProperties.getDefault(); + } + if (propertiesStr.equalsIgnoreCase(LEGACY_VALUE_STRING)) { + return AcidOperationalProperties.getLegacy(); + } + AcidOperationalProperties obj = new AcidOperationalProperties(); + String[] options = propertiesStr.split("\\|"); + for (String option : options) { + if (option.trim().length() == 0) continue; // ignore empty strings + switch (option) { + case SPLIT_UPDATE_STRING: + obj.setSplitUpdate(true); + break; + case HASH_BASED_MERGE_STRING: + obj.setHashBasedMerge(true); + break; + default: + throw new IllegalArgumentException("Unexpected value for ACID operational properties!"); + } + } + return obj; + } + + /** + * Returns an acidOperationalProperties object that is represented by an encoded 32-bit integer. + * @param properties an encoded 32-bit representing the acidOperationalProperties. + * @return the acidOperationalProperties object. + */ + public static AcidOperationalProperties parseInt(int properties) { + AcidOperationalProperties obj = new AcidOperationalProperties(); + if ((properties & SPLIT_UPDATE_BIT) > 0) { + obj.setSplitUpdate(true); + } + if ((properties & HASH_BASED_MERGE_BIT) > 0) { + obj.setHashBasedMerge(true); + } + return obj; + } + + /** + * Sets the split update property for ACID operations based on the boolean argument. + * When split update is turned on, an update ACID event is interpreted as a combination of + * delete event followed by an update event. + * @param isSplitUpdate a boolean property that turns on split update when true. + * @return the acidOperationalProperties object. + */ + public AcidOperationalProperties setSplitUpdate(boolean isSplitUpdate) { + description = (isSplitUpdate + ? (description | SPLIT_UPDATE_BIT) : (description & ~SPLIT_UPDATE_BIT)); + return this; + } + + /** + * Sets the hash-based merge property for ACID operations that combines delta files using + * GRACE hash join based approach, when turned on. (Currently unimplemented!) + * @param isHashBasedMerge a boolean property that turns on hash-based merge when true. + * @return the acidOperationalProperties object. + */ + public AcidOperationalProperties setHashBasedMerge(boolean isHashBasedMerge) { + description = (isHashBasedMerge + ? (description | HASH_BASED_MERGE_BIT) : (description & ~HASH_BASED_MERGE_BIT)); + return this; + } + + public boolean isSplitUpdate() { + return (description & SPLIT_UPDATE_BIT) > 0; + } + + public boolean isHashBasedMerge() { + return (description & HASH_BASED_MERGE_BIT) > 0; + } + + public int toInt() { + return description; + } + + @Override + public String toString() { + StringBuilder str = new StringBuilder(); + if (isSplitUpdate()) { + str.append("|" + SPLIT_UPDATE_STRING); + } + if (isHashBasedMerge()) { + str.append("|" + HASH_BASED_MERGE_STRING); + } + return str.toString(); + } + } + public static interface Directory { /** @@ -287,18 +473,20 @@ public static DataOperationType toDataOperationType(Operation op) { //-1 is for internal (getAcidState()) purposes and means the delta dir //had no statement ID private final int statementId; + private final boolean isDeleteDelta; // records whether delta dir is of type 'delete_delta_x_y...' /** * for pre 1.3.x delta files */ - ParsedDelta(long min, long max, FileStatus path) { - this(min, max, path, -1); + ParsedDelta(long min, long max, FileStatus path, boolean isDeleteDelta) { + this(min, max, path, -1, isDeleteDelta); } - ParsedDelta(long min, long max, FileStatus path, int statementId) { + ParsedDelta(long min, long max, FileStatus path, int statementId, boolean isDeleteDelta) { this.minTransaction = min; this.maxTransaction = max; this.path = path; this.statementId = statementId; + this.isDeleteDelta = isDeleteDelta; } public long getMinTransaction() { @@ -317,6 +505,10 @@ public int getStatementId() { return statementId == -1 ? 0 : statementId; } + public boolean isDeleteDelta() { + return isDeleteDelta; + } + /** * Compactions (Major/Minor) merge deltas/bases but delete of old files * happens in a different process; thus it's possible to have bases/deltas with @@ -382,6 +574,12 @@ else if(statementId != parsedDelta.statementId) { List result = new ArrayList<>(deltas.size()); AcidInputFormat.DeltaMetaData last = null; for(ParsedDelta parsedDelta : deltas) { + if (!parsedDelta.isDeleteDelta() && !parsedDelta.getPath().getName().startsWith(DELTA_PREFIX)) { + continue; // when serializing the deltas, skip anything which does not start with delta prefix. + } + if (parsedDelta.isDeleteDelta() && !parsedDelta.getPath().getName().startsWith(DELETE_DELTA_PREFIX)) { + continue; // when serializing the delete_deltas, skip anything which does not start with delete_delta prefix. + } if(last != null && last.getMinTxnId() == parsedDelta.getMinTransaction() && last.getMaxTxnId() == parsedDelta.getMaxTransaction()) { last.getStmtIds().add(parsedDelta.getStatementId()); continue; @@ -418,15 +616,49 @@ else if(statementId != parsedDelta.statementId) { return results.toArray(new Path[results.size()]); } - private static ParsedDelta parseDelta(FileStatus path) { - ParsedDelta p = parsedDelta(path.getPath()); - return new ParsedDelta(p.getMinTransaction(), - p.getMaxTransaction(), path, p.statementId); + /** + * Convert the list of begin/end transaction id pairs to a list of delete delta + * directories. Note that there may be multiple delta files for the exact same txn range starting + * with 2.2.x; + * see {@link org.apache.hadoop.hive.ql.io.AcidUtils#deltaSubdir(long, long, int)} + * @param root the root directory + * @param deleteDeltas list of begin/end transaction id pairs + * @return the list of delta paths + */ + public static Path[] deserializeDeleteDeltas(Path root, final List deleteDeltas) throws IOException { + List results = new ArrayList(deleteDeltas.size()); + for(AcidInputFormat.DeltaMetaData dmd : deleteDeltas) { + if(dmd.getStmtIds().isEmpty()) { + results.add(new Path(root, deleteDeltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId()))); + continue; + } + for(Integer stmtId : dmd.getStmtIds()) { + results.add(new Path(root, deleteDeltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId(), stmtId))); + } + } + return results.toArray(new Path[results.size()]); } + public static ParsedDelta parsedDelta(Path deltaDir) { + String deltaDirName = deltaDir.getName(); + if (deltaDirName.startsWith(DELETE_DELTA_PREFIX)) { + return parsedDelta(deltaDir, DELETE_DELTA_PREFIX); + } + return parsedDelta(deltaDir, DELTA_PREFIX); // default prefix is delta_prefix + } + + private static ParsedDelta parseDelta(FileStatus path, String deltaPrefix) { + ParsedDelta p = parsedDelta(path.getPath(), deltaPrefix); + boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX); + return new ParsedDelta(p.getMinTransaction(), + p.getMaxTransaction(), path, p.statementId, isDeleteDelta); + } + + public static ParsedDelta parsedDelta(Path deltaDir, String deltaPrefix) { String filename = deltaDir.getName(); - if (filename.startsWith(DELTA_PREFIX)) { - String rest = filename.substring(DELTA_PREFIX.length()); + boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX); + if (filename.startsWith(deltaPrefix)) { + String rest = filename.substring(deltaPrefix.length()); int split = rest.indexOf('_'); int split2 = rest.indexOf('_', split + 1);//may be -1 if no statementId long min = Long.parseLong(rest.substring(0, split)); @@ -434,13 +666,13 @@ public static ParsedDelta parsedDelta(Path deltaDir) { Long.parseLong(rest.substring(split + 1)) : Long.parseLong(rest.substring(split + 1, split2)); if(split2 == -1) { - return new ParsedDelta(min, max, null); + return new ParsedDelta(min, max, null, isDeleteDelta); } int statementId = Integer.parseInt(rest.substring(split2 + 1)); - return new ParsedDelta(min, max, null, statementId); + return new ParsedDelta(min, max, null, statementId, isDeleteDelta); } throw new IllegalArgumentException(deltaDir + " does not start with " + - DELTA_PREFIX); + deltaPrefix); } /** @@ -456,7 +688,8 @@ public static boolean isAcid(Path directory, for(FileStatus file: fs.listStatus(directory)) { String filename = file.getPath().getName(); if (filename.startsWith(BASE_PREFIX) || - filename.startsWith(DELTA_PREFIX)) { + filename.startsWith(DELTA_PREFIX) || + filename.startsWith(DELETE_DELTA_PREFIX)) { if (file.isDir()) { return true; } @@ -499,6 +732,7 @@ public static Directory getAcidState(Path directory, boolean ignoreEmptyFiles ) throws IOException { FileSystem fs = directory.getFileSystem(conf); + // The following 'deltas' includes all kinds of delta files including insert & delete deltas. final List deltas = new ArrayList(); List working = new ArrayList(); List originalDirectories = new ArrayList(); @@ -553,6 +787,7 @@ public static Directory getAcidState(Path directory, //subject to list of 'exceptions' in 'txnList' (not show in above example). long current = bestBase.txn; int lastStmtId = -1; + ParsedDelta prev = null; for(ParsedDelta next: working) { if (next.maxTransaction > current) { // are any of the new transactions ones that we care about? @@ -561,6 +796,7 @@ public static Directory getAcidState(Path directory, deltas.add(next); current = next.maxTransaction; lastStmtId = next.statementId; + prev = next; } } else if(next.maxTransaction == current && lastStmtId >= 0) { @@ -568,6 +804,24 @@ else if(next.maxTransaction == current && lastStmtId >= 0) { //generate multiple delta files with the same txnId range //of course, if maxTransaction has already been minor compacted, all per statement deltas are obsolete deltas.add(next); + prev = next; + } + else if (prev != null && next.maxTransaction == prev.maxTransaction + && next.minTransaction == prev.minTransaction + && next.statementId == prev.statementId) { + // The 'next' parsedDelta may have everything equal to the 'prev' parsedDelta, except + // the path. This may happen when we have split update and we have two types of delta + // directories- 'delta_x_y' and 'delete_delta_x_y' for the SAME txn range. + + // Also note that any delete_deltas in between a given delta_x_y range would be made + // obsolete. For example, a delta_30_50 would make delete_delta_40_40 obsolete. + // This is valid because minor compaction always compacts the normal deltas and the delete + // deltas for the same range. That is, if we had 3 directories, delta_30_30, + // delete_delta_40_40 and delta_50_50, then running minor compaction would produce + // delta_30_50 and delete_delta_30_50. + + deltas.add(next); + prev = next; } else { obsolete.add(next.path); @@ -638,7 +892,8 @@ private static boolean isValidBase(long baseTxnId, ValidTxnList txnList) { } private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId, ValidTxnList txnList, List working, List originalDirectories, - List original, List obsolete, TxnBase bestBase, boolean ignoreEmptyFiles) { + List original, List obsolete, TxnBase bestBase, + boolean ignoreEmptyFiles) throws IOException { Path p = child.getPath(); String fn = p.getName(); if (fn.startsWith(BASE_PREFIX) && child.isDir()) { @@ -662,8 +917,11 @@ private static void getChildState(FileStatus child, HdfsFileStatusWithId childWi } else { obsolete.add(child); } - } else if (fn.startsWith(DELTA_PREFIX) && child.isDir()) { - ParsedDelta delta = parseDelta(child); + } else if ((fn.startsWith(DELTA_PREFIX) || fn.startsWith(DELETE_DELTA_PREFIX)) + && child.isDir()) { + String deltaPrefix = + (fn.startsWith(DELTA_PREFIX)) ? DELTA_PREFIX : DELETE_DELTA_PREFIX; + ParsedDelta delta = parseDelta(child, deltaPrefix); if (txnList.isTxnRangeValid(delta.minTransaction, delta.maxTransaction) != ValidTxnList.RangeResponse.NONE) { @@ -791,4 +1049,84 @@ public static boolean isAcidTable(Table table) { return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true"); } + + /** + * Sets the acidOperationalProperties in the configuration object argument. + * @param conf Mutable configuration object + * @param properties An acidOperationalProperties object to initialize from. + */ + public static void setAcidOperationalProperties(Configuration conf, + AcidOperationalProperties properties) { + if (properties != null) { + HiveConf.setIntVar(conf, ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES, properties.toInt()); + } + } + + /** + * Sets the acidOperationalProperties in the map object argument. + * @param parameters Mutable map object + * @param properties An acidOperationalProperties object to initialize from. + */ + public static void setAcidOperationalProperties( + Map parameters, AcidOperationalProperties properties) { + if (properties != null) { + parameters.put(ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, properties.toString()); + } + } + + /** + * Returns the acidOperationalProperties for a given table. + * @param table A table object + * @return the acidOperationalProperties object for the corresponding table. + */ + public static AcidOperationalProperties getAcidOperationalProperties(Table table) { + String transactionalProperties = table.getProperty( + hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); + if (transactionalProperties == null) { + // If the table does not define any transactional properties, we return a legacy type. + return AcidOperationalProperties.getLegacy(); + } + return AcidOperationalProperties.parseString(transactionalProperties); + } + + /** + * Returns the acidOperationalProperties for a given configuration. + * @param conf A configuration object + * @return the acidOperationalProperties object for the corresponding configuration. + */ + public static AcidOperationalProperties getAcidOperationalProperties(Configuration conf) { + // If the conf does not define any transactional properties, the parseInt() should receive + // a value of zero, which will set AcidOperationalProperties to a legacy type and return that. + return AcidOperationalProperties.parseInt( + HiveConf.getIntVar(conf, ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES)); + } + + /** + * Returns the acidOperationalProperties for a given set of properties. + * @param props A properties object + * @return the acidOperationalProperties object for the corresponding properties. + */ + public static AcidOperationalProperties getAcidOperationalProperties(Properties props) { + String resultStr = props.getProperty(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); + if (resultStr == null) { + // If the properties does not define any transactional properties, we return a legacy type. + return AcidOperationalProperties.getLegacy(); + } + return AcidOperationalProperties.parseString(resultStr); + } + + /** + * Returns the acidOperationalProperties for a given map. + * @param parameters A parameters object + * @return the acidOperationalProperties object for the corresponding map. + */ + public static AcidOperationalProperties getAcidOperationalProperties( + Map parameters) { + String resultStr = parameters.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); + if (resultStr == null) { + // If the parameters does not define any transactional properties, we return a legacy type. + return AcidOperationalProperties.getLegacy(); + } + return AcidOperationalProperties.parseString(resultStr); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 945b828..c4b9940 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -637,6 +637,7 @@ protected void pushProjectionsAndFilters(JobConf jobConf, Class inputFormatClass pushFilters(jobConf, ts); AcidUtils.setTransactionalTableScan(job, ts.getConf().isAcidTable()); + AcidUtils.setAcidOperationalProperties(job, ts.getConf().getAcidOperationalProperties()); } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 334cb31..0586d0d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -81,6 +81,7 @@ import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.AcidUtils.Directory; +import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta; import org.apache.hadoop.hive.ql.io.BatchToRowInputFormat; import org.apache.hadoop.hive.ql.io.BatchToRowReader; import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; @@ -639,17 +640,20 @@ public static void clearLocalCache() { @VisibleForTesting static final class AcidDirInfo { public AcidDirInfo(FileSystem fs, Path splitPath, Directory acidInfo, - List baseOrOriginalFiles) { + List baseOrOriginalFiles, + List parsedDeltas) { this.splitPath = splitPath; this.acidInfo = acidInfo; this.baseOrOriginalFiles = baseOrOriginalFiles; this.fs = fs; + this.parsedDeltas = parsedDeltas; } final FileSystem fs; final Path splitPath; final AcidUtils.Directory acidInfo; final List baseOrOriginalFiles; + final List parsedDeltas; } @VisibleForTesting @@ -1032,13 +1036,66 @@ public AcidDirInfo run() throws Exception { } private AcidDirInfo callInternal() throws IOException { - AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir, - context.conf, context.transactionList, useFileIds, true); + AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir, context.conf, + context.transactionList, useFileIds, true); Path base = dirInfo.getBaseDirectory(); // find the base files (original or new style) - List children = (base == null) - ? dirInfo.getOriginalFiles() : findBaseFiles(base, useFileIds); - return new AcidDirInfo(fs, dir, dirInfo, children); + List baseOrOriginalFiles = new ArrayList(); + if (base == null) { + baseOrOriginalFiles.addAll(dirInfo.getOriginalFiles()); + } else { + baseOrOriginalFiles.addAll(findBaseFiles(base, useFileIds)); + } + + // Find the parsed deltas- some of them containing only the insert delta events + // may get treated as base if split-update is enabled for ACID. (See HIVE-14035 for details) + List parsedDeltas = new ArrayList(); + + // Determine the transactional_properties of the table from the job conf stored in context. + // The table properties are copied to job conf at HiveInputFormat::addSplitsForGroup(), + // & therefore we should be able to retrieve them here and determine appropriate behavior. + String transactional_properties = + context.conf.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); + AcidUtils.AcidOperationalProperties acidOperationalProperties + = AcidUtils.AcidOperationalProperties.parseString(transactional_properties); + if (acidOperationalProperties.isSplitUpdate()) { + // If we have split-update turned on for this table, then the delta events have already been + // split into two directories- delta_x_y/ and delete_delta_x_y/. + // When you have split-update turned on, the insert events go to delta_x_y/ directory and all + // the delete events go to delete_x_y/. An update event will generate two events- + // a delete event for the old record that is put into delete_delta_x_y/, + // followed by an insert event for the updated record put into the usual delta_x_y/. + // Therefore, everything inside delta_x_y/ is an insert event and all the files in delta_x_y/ + // can be treated like base files. Hence, each of these are added to baseOrOriginalFiles list. + + for (ParsedDelta parsedDelta : dirInfo.getCurrentDirectories()) { + if (parsedDelta.isDeleteDelta()) { + parsedDeltas.add(parsedDelta); + } else { + // This is a normal insert delta, which only has insert events and hence all the files + // in this delta directory can be considered as a base. + if (useFileIds) { + try { + baseOrOriginalFiles.addAll(SHIMS.listLocatedHdfsStatus(fs, parsedDelta.getPath(), AcidUtils.hiddenFileFilter)); + continue; // move on to process to the next parsedDelta. + } catch (Throwable t) { + LOG.error("Failed to get files with ID; using regular API: " + t.getMessage()); + } + } + // Fall back to regular API and create statuses without ID. + List children = HdfsUtils.listLocatedStatus(fs, parsedDelta.getPath(), AcidUtils.hiddenFileFilter); + for (FileStatus child : children) { + baseOrOriginalFiles.add(AcidUtils.createOriginalObj(null, child)); + } + } + } + + } else { + // When split-update is not enabled, then all the deltas in the current directories + // should be considered as usual. + parsedDeltas.addAll(dirInfo.getCurrentDirectories()); + } + return new AcidDirInfo(fs, dir, dirInfo, baseOrOriginalFiles, parsedDeltas); } private List findBaseFiles( @@ -1529,7 +1586,7 @@ private long computeProjectionSize(List fileTypes, // We have received a new directory information, make a split strategy. --resultsLeft; SplitStrategy splitStrategy = determineSplitStrategy(combinedCtx, context, adi.fs, - adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles, readerTypes, ugi, + adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles, adi.parsedDeltas, readerTypes, ugi, allowSyntheticFileIds); if (splitStrategy == null) continue; // Combined. @@ -1763,9 +1820,18 @@ public float getProgress() throws IOException { final OrcSplit split = (OrcSplit) inputSplit; final Path path = split.getPath(); + + // Infer whether its an original file or base/delta file based on the path. + boolean isOriginal = true; + if (path.getParent().getName().startsWith(AcidUtils.BASE_PREFIX) + || path.getParent().getName().startsWith(AcidUtils.DELTA_PREFIX) + || path.getParent().getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)) { + isOriginal = false; + } + Path root; if (split.hasBase()) { - if (split.isOriginal()) { + if (isOriginal) { root = path.getParent(); } else { root = path.getParent().getParent(); @@ -1773,7 +1839,20 @@ public float getProgress() throws IOException { } else { root = path; } - final Path[] deltas = AcidUtils.deserializeDeltas(root, split.getDeltas()); + + // Retrieve the acidOperationalProperties for the table, initialized in HiveInputFormat. + AcidUtils.AcidOperationalProperties acidOperationalProperties + = AcidUtils.getAcidOperationalProperties(options.getConfiguration()); + + // The deltas are decided based on whether split-update has been turned on for the table or not. + // When split-update is turned off, everything in the delta_x_y/ directory should be treated + // as delta. However if split-update is turned on, only the files in delete_delta_x_y/ directory + // need to be considered as delta, because files in delta_x_y/ will be processed as base files + // since they only have insert events in them. + final Path[] deltas = + acidOperationalProperties.isSplitUpdate() ? + AcidUtils.deserializeDeleteDeltas(root, split.getDeltas()) + : AcidUtils.deserializeDeltas(root, split.getDeltas()); final Configuration conf = options.getConfiguration(); @@ -1793,7 +1872,7 @@ public float getProgress() throws IOException { setSearchArgument(readOptions, schemaTypes, conf, SCHEMA_TYPES_IS_ORIGINAL); if (split.hasBase()) { - bucket = AcidUtils.parseBaseBucketFilename(split.getPath(), conf) + bucket = AcidUtils.parseBaseOrDeltaBucketFilename(split.getPath(), conf) .getBucket(); OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf) .maxLength(split.getFileLength()); @@ -1809,7 +1888,7 @@ public float getProgress() throws IOException { ValidTxnList validTxnList = txnString == null ? new ValidReadTxnList() : new ValidReadTxnList(txnString); final OrcRawRecordMerger records = - new OrcRawRecordMerger(conf, true, reader, split.isOriginal(), bucket, + new OrcRawRecordMerger(conf, true, reader, isOriginal, bucket, validTxnList, readOptions, deltas); return new RowReader() { OrcStruct innerRecord = records.createValue(); @@ -1950,21 +2029,24 @@ private static boolean isStripeSatisfyPredicate( @VisibleForTesting static SplitStrategy determineSplitStrategy(CombinedCtx combinedCtx, Context context, FileSystem fs, Path dir, AcidUtils.Directory dirInfo, - List baseOrOriginalFiles, List readerTypes, + List baseOrOriginalFiles, + List parsedDeltas, + List readerTypes, UserGroupInformation ugi, boolean allowSyntheticFileIds) { Path base = dirInfo.getBaseDirectory(); List original = dirInfo.getOriginalFiles(); - List deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories()); + List deltas = AcidUtils.serializeDeltas(parsedDeltas); boolean[] covered = new boolean[context.numBuckets]; - boolean isOriginal = base == null; + boolean isOriginal = (base == null) && !(original.isEmpty()); // if we have a base to work from - if (base != null || !original.isEmpty()) { + if (base != null || !original.isEmpty() || !baseOrOriginalFiles.isEmpty()) { long totalFileSize = 0; for (HdfsFileStatusWithId child : baseOrOriginalFiles) { totalFileSize += child.getFileStatus().getLen(); - AcidOutputFormat.Options opts = AcidUtils.parseBaseBucketFilename + AcidOutputFormat.Options opts = AcidUtils.parseBaseOrDeltaBucketFilename (child.getFileStatus().getPath(), context.conf); + opts.writingBase(true); int b = opts.getBucket(); // If the bucket is in the valid range, mark it as covered. // I wish Hive actually enforced bucketing all of the time. diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index efde2db..229f899 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -146,7 +146,7 @@ public int compareTo(RecordIdentifier other) { private boolean isSameRow(ReaderKey other) { return compareRow(other) == 0 && currentTransactionId == other.currentTransactionId; } - + public long getCurrentTransactionId() { return currentTransactionId; } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 1a1af28..7bf59b5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -25,11 +25,6 @@ import java.util.ArrayList; import java.util.List; -import org.apache.orc.impl.AcidStats; -import org.apache.orc.impl.OrcAcidUtils; -import org.apache.orc.OrcConf; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -42,10 +37,16 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; +import org.apache.orc.OrcConf; +import org.apache.orc.impl.AcidStats; +import org.apache.orc.impl.OrcAcidUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; @@ -79,9 +80,13 @@ private static final Charset UTF8 = Charset.forName("UTF-8"); private final AcidOutputFormat.Options options; + private final AcidUtils.AcidOperationalProperties acidOperationalProperties; private final Path path; + private Path deleteEventPath; private final FileSystem fs; + private OrcFile.WriterOptions writerOptions; private Writer writer; + private Writer deleteEventWriter = null; private final FSDataOutputStream flushLengths; private final OrcStruct item; private final IntWritable operation = new IntWritable(); @@ -95,13 +100,16 @@ // because that is monotonically increasing to give new unique row ids. private long rowCountDelta = 0; private final KeyIndexBuilder indexBuilder = new KeyIndexBuilder(); + private KeyIndexBuilder deleteEventIndexBuilder; private StructField recIdField = null; // field to look for the record identifier in private StructField rowIdField = null; // field inside recId to look for row id in private StructField originalTxnField = null; // field inside recId to look for original txn in + private StructField bucketField = null; // field inside recId to look for bucket in private StructObjectInspector rowInspector; // OI for the original row private StructObjectInspector recIdInspector; // OI for the record identifier struct private LongObjectInspector rowIdInspector; // OI for the long row id inside the recordIdentifier private LongObjectInspector origTxnInspector; // OI for the original txn inside the record + private IntObjectInspector bucketInspector; // OI for the bucket inside the record // identifer static int getOperation(OrcStruct struct) { @@ -180,8 +188,19 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { OrcRecordUpdater(Path path, AcidOutputFormat.Options options) throws IOException { this.options = options; + // Initialize acidOperationalProperties based on table properties, and + // if they are not available, see if we can find it in the job configuration. + if (options.getTableProperties() != null) { + this.acidOperationalProperties = + AcidUtils.getAcidOperationalProperties(options.getTableProperties()); + } else { + this.acidOperationalProperties = + AcidUtils.getAcidOperationalProperties(options.getConfiguration()); + } this.bucket.set(options.getBucket()); this.path = AcidUtils.createFilename(path, options); + this.deleteEventWriter = null; + this.deleteEventPath = null; FileSystem fs = options.getFilesystem(); if (fs == null) { fs = path.getFileSystem(options.getConfiguration()); @@ -205,7 +224,7 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { } else { flushLengths = null; } - OrcFile.WriterOptions writerOptions = null; + this.writerOptions = null; // If writing delta dirs, we need to make a clone of original options, to avoid polluting it for // the base writer if (options.isWritingBase()) { @@ -242,6 +261,13 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { writerOptions.inspector(createEventSchema(findRecId(options.getInspector(), options.getRecordIdColumn()))); this.writer = OrcFile.createWriter(this.path, writerOptions); + if (this.acidOperationalProperties.isSplitUpdate()) { + // If this is a split-update, we initialize a delete delta file path in anticipation that + // they would write update/delete events to that separate file. + // This writes to a file in directory which starts with "delete_delta_..." + // The actual initialization of a writer only happens if any delete events are written. + this.deleteEventPath = AcidUtils.createFilename(path, options.writingDeleteDelta(true)); + } item = new OrcStruct(FIELDS); item.setFieldValue(OPERATION, operation); item.setFieldValue(CURRENT_TRANSACTION, currentTransaction); @@ -250,6 +276,7 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { item.setFieldValue(ROW_ID, rowId); } + @Override public String toString() { return getClass().getName() + "[" + path +"]"; } @@ -264,14 +291,16 @@ private long findRowIdOffsetForInsert() throws IOException { * 1. need to know bucket we are writing to * 2. need to know which delta dir it's in * Then, - * 1. find the same bucket file in previous delta dir for this txn + * 1. find the same bucket file in previous (insert) delta dir for this txn + * (Note: in case of split_update, we can ignore the delete_delta dirs) * 2. read the footer and get AcidStats which has insert count - * 2.1 if AcidStats.inserts>0 done + * 2.1 if AcidStats.inserts>0 add to the insert count. * else go to previous delta file * For example, consider insert/update/insert case...*/ if(options.getStatementId() <= 0) { return 0;//there is only 1 statement in this transaction (so far) } + long totalInserts = 0; for(int pastStmt = options.getStatementId() - 1; pastStmt >= 0; pastStmt--) { Path matchingBucket = AcidUtils.createFilename(options.getFinalDestination(), options.clone().statementId(pastStmt)); if(!fs.exists(matchingBucket)) { @@ -281,12 +310,10 @@ private long findRowIdOffsetForInsert() throws IOException { //no close() on Reader?! AcidStats acidStats = OrcAcidUtils.parseAcidStats(reader); if(acidStats.inserts > 0) { - return acidStats.inserts; + totalInserts += acidStats.inserts; } } - //if we got here, we looked at all delta files in this txn, prior to current statement and didn't - //find any inserts... - return 0; + return totalInserts; } // Find the record identifier column (if there) and return a possibly new ObjectInspector that // will strain out the record id for the underlying writer. @@ -307,6 +334,8 @@ private ObjectInspector findRecId(ObjectInspector inspector, int rowIdColNum) { // in RecordIdentifier is transactionId, bucketId, rowId originalTxnField = fields.get(0); origTxnInspector = (LongObjectInspector)originalTxnField.getFieldObjectInspector(); + bucketField = fields.get(1); + bucketInspector = (IntObjectInspector) bucketField.getFieldObjectInspector(); rowIdField = fields.get(2); rowIdInspector = (LongObjectInspector)rowIdField.getFieldObjectInspector(); @@ -316,7 +345,7 @@ private ObjectInspector findRecId(ObjectInspector inspector, int rowIdColNum) { } } - private void addEvent(int operation, long currentTransaction, long rowId, Object row) + private void addSimpleEvent(int operation, long currentTransaction, long rowId, Object row) throws IOException { this.operation.set(operation); this.currentTransaction.set(currentTransaction); @@ -334,11 +363,53 @@ else if(operation == INSERT_OPERATION) { } this.rowId.set(rowId); this.originalTransaction.set(originalTransaction); + item.setFieldValue(OrcRecordUpdater.OPERATION, new IntWritable(operation)); item.setFieldValue(OrcRecordUpdater.ROW, (operation == DELETE_OPERATION ? null : row)); indexBuilder.addKey(operation, originalTransaction, bucket.get(), rowId); writer.addRow(item); } + private void addSplitUpdateEvent(int operation, long currentTransaction, long rowId, Object row) + throws IOException { + if (operation == INSERT_OPERATION) { + // Just insert the record in the usual way, i.e., default to the simple behavior. + addSimpleEvent(operation, currentTransaction, rowId, row); + return; + } + this.operation.set(operation); + this.currentTransaction.set(currentTransaction); + Object rowValue = rowInspector.getStructFieldData(row, recIdField); + long originalTransaction = origTxnInspector.get( + recIdInspector.getStructFieldData(rowValue, originalTxnField)); + rowId = rowIdInspector.get( + recIdInspector.getStructFieldData(rowValue, rowIdField)); + + if (operation == DELETE_OPERATION || operation == UPDATE_OPERATION) { + // Initialize a deleteEventWriter if not yet done. (Lazy initialization) + if (deleteEventWriter == null) { + // Initialize an indexBuilder for deleteEvents. + deleteEventIndexBuilder = new KeyIndexBuilder(); + // Change the indexBuilder callback too for the deleteEvent file, the remaining writer + // options remain the same. + this.deleteEventWriter = OrcFile.createWriter(deleteEventPath, + writerOptions.callback(deleteEventIndexBuilder)); + } + + // A delete/update generates a delete event for the original row. + this.rowId.set(rowId); + this.originalTransaction.set(originalTransaction); + item.setFieldValue(OrcRecordUpdater.OPERATION, new IntWritable(DELETE_OPERATION)); + item.setFieldValue(OrcRecordUpdater.ROW, null); // ROW is null for delete events. + deleteEventIndexBuilder.addKey(DELETE_OPERATION, originalTransaction, bucket.get(), rowId); + deleteEventWriter.addRow(item); + } + + if (operation == UPDATE_OPERATION) { + // A new row is also inserted in the usual delta file for an update event. + addSimpleEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row); + } + } + @Override public void insert(long currentTransaction, Object row) throws IOException { if (this.currentTransaction.get() != currentTransaction) { @@ -347,7 +418,11 @@ public void insert(long currentTransaction, Object row) throws IOException { //always true in that case rowIdOffset = findRowIdOffsetForInsert(); } - addEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row); + if (acidOperationalProperties.isSplitUpdate()) { + addSplitUpdateEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row); + } else { + addSimpleEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row); + } rowCountDelta++; } @@ -355,8 +430,13 @@ public void insert(long currentTransaction, Object row) throws IOException { public void update(long currentTransaction, Object row) throws IOException { if (this.currentTransaction.get() != currentTransaction) { insertedRows = 0; + rowIdOffset = findRowIdOffsetForInsert(); + } + if (acidOperationalProperties.isSplitUpdate()) { + addSplitUpdateEvent(UPDATE_OPERATION, currentTransaction, -1L, row); + } else { + addSimpleEvent(UPDATE_OPERATION, currentTransaction, -1L, row); } - addEvent(UPDATE_OPERATION, currentTransaction, -1L, row); } @Override @@ -364,9 +444,12 @@ public void delete(long currentTransaction, Object row) throws IOException { if (this.currentTransaction.get() != currentTransaction) { insertedRows = 0; } - addEvent(DELETE_OPERATION, currentTransaction, -1, row); + if (acidOperationalProperties.isSplitUpdate()) { + addSplitUpdateEvent(DELETE_OPERATION, currentTransaction, -1L, row); + } else { + addSimpleEvent(DELETE_OPERATION, currentTransaction, -1L, row); + } rowCountDelta--; - } @Override @@ -390,13 +473,38 @@ public void close(boolean abort) throws IOException { fs.delete(path, false); } } else { - if (writer != null) writer.close(); + if (writer != null) { + if (acidOperationalProperties.isSplitUpdate()) { + // When split-update is enabled, we can choose not to write + // any delta files when there are no inserts. In such cases only the delete_deltas + // would be written & they are closed separately below. + if (indexBuilder.acidStats.inserts > 0) { + writer.close(); // normal close, when there are inserts. + } else { + // Just remove insert delta paths, when there are no insert events. + fs.delete(path, false); + } + } else { + writer.close(); // normal close. + } + } + if (deleteEventWriter != null) { + if (deleteEventIndexBuilder.acidStats.deletes > 0) { + // Only need to write out & close the delete_delta if there have been any. + deleteEventWriter.close(); + } else { + // Just remove delete_delta, if there have been no delete events. + fs.delete(deleteEventPath, false); + } + } + } if (flushLengths != null) { flushLengths.close(); fs.delete(OrcAcidUtils.getSideFile(path), false); } writer = null; + deleteEventWriter = null; } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 8cb5e8a..5f53aef 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -61,6 +61,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.common.ObjectPair; @@ -3167,10 +3168,11 @@ static protected void copyFiles(HiveConf conf, Path srcf, Path destf, private static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst, List newFiles) throws HiveException { - // The layout for ACID files is table|partname/base|delta/bucket + // The layout for ACID files is table|partname/base|delta|delete_delta/bucket // We will always only be writing delta files. In the buckets created by FileSinkOperator - // it will look like bucket/delta/bucket. So we need to move that into the above structure. - // For the first mover there will be no delta directory, so we can move the whole directory. + // it will look like bucket/delta|delete_delta/bucket. So we need to move that into + // the above structure. For the first mover there will be no delta directory, + // so we can move the whole directory. // For everyone else we will need to just move the buckets under the existing delta // directory. @@ -3193,49 +3195,58 @@ private static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst, for (FileStatus origBucketStat : origBucketStats) { Path origBucketPath = origBucketStat.getPath(); - LOG.debug("Acid move looking for delta files in bucket " + origBucketPath); + moveAcidDeltaFiles(AcidUtils.DELTA_PREFIX, AcidUtils.deltaFileFilter, + fs, dst, origBucketPath, createdDeltaDirs, newFiles); + moveAcidDeltaFiles(AcidUtils.DELETE_DELTA_PREFIX, AcidUtils.deleteEventDeltaDirFilter, + fs, dst,origBucketPath, createdDeltaDirs, newFiles); + } + } + } - FileStatus[] deltaStats = null; - try { - deltaStats = fs.listStatus(origBucketPath, AcidUtils.deltaFileFilter); - } catch (IOException e) { - throw new HiveException("Unable to look for delta files in original bucket " + - origBucketPath.toUri().toString(), e); - } - LOG.debug("Acid move found " + deltaStats.length + " delta files"); - - for (FileStatus deltaStat : deltaStats) { - Path deltaPath = deltaStat.getPath(); - // Create the delta directory. Don't worry if it already exists, - // as that likely means another task got to it first. Then move each of the buckets. - // it would be more efficient to try to move the delta with it's buckets but that is - // harder to make race condition proof. - Path deltaDest = new Path(dst, deltaPath.getName()); + private static void moveAcidDeltaFiles(String deltaFileType, PathFilter pathFilter, FileSystem fs, + Path dst, Path origBucketPath, Set createdDeltaDirs, + List newFiles) throws HiveException { + LOG.debug("Acid move looking for " + deltaFileType + " files in bucket " + origBucketPath); + + FileStatus[] deltaStats = null; + try { + deltaStats = fs.listStatus(origBucketPath, pathFilter); + } catch (IOException e) { + throw new HiveException("Unable to look for " + deltaFileType + " files in original bucket " + + origBucketPath.toUri().toString(), e); + } + LOG.debug("Acid move found " + deltaStats.length + " " + deltaFileType + " files"); + + for (FileStatus deltaStat : deltaStats) { + Path deltaPath = deltaStat.getPath(); + // Create the delta directory. Don't worry if it already exists, + // as that likely means another task got to it first. Then move each of the buckets. + // it would be more efficient to try to move the delta with it's buckets but that is + // harder to make race condition proof. + Path deltaDest = new Path(dst, deltaPath.getName()); + try { + if (!createdDeltaDirs.contains(deltaDest)) { try { - if (!createdDeltaDirs.contains(deltaDest)) { - try { - fs.mkdirs(deltaDest); - createdDeltaDirs.add(deltaDest); - } catch (IOException swallowIt) { - // Don't worry about this, as it likely just means it's already been created. - LOG.info("Unable to create delta directory " + deltaDest + - ", assuming it already exists: " + swallowIt.getMessage()); - } - } - FileStatus[] bucketStats = fs.listStatus(deltaPath, AcidUtils.bucketFileFilter); - LOG.debug("Acid move found " + bucketStats.length + " bucket files"); - for (FileStatus bucketStat : bucketStats) { - Path bucketSrc = bucketStat.getPath(); - Path bucketDest = new Path(deltaDest, bucketSrc.getName()); - LOG.info("Moving bucket " + bucketSrc.toUri().toString() + " to " + - bucketDest.toUri().toString()); - fs.rename(bucketSrc, bucketDest); - if (newFiles != null) newFiles.add(bucketDest); - } - } catch (IOException e) { - throw new HiveException("Error moving acid files " + e.getMessage(), e); + fs.mkdirs(deltaDest); + createdDeltaDirs.add(deltaDest); + } catch (IOException swallowIt) { + // Don't worry about this, as it likely just means it's already been created. + LOG.info("Unable to create " + deltaFileType + " directory " + deltaDest + + ", assuming it already exists: " + swallowIt.getMessage()); } } + FileStatus[] bucketStats = fs.listStatus(deltaPath, AcidUtils.bucketFileFilter); + LOG.debug("Acid move found " + bucketStats.length + " bucket files"); + for (FileStatus bucketStat : bucketStats) { + Path bucketSrc = bucketStat.getPath(); + Path bucketDest = new Path(deltaDest, bucketSrc.getName()); + LOG.info("Moving bucket " + bucketSrc.toUri().toString() + " to " + + bucketDest.toUri().toString()); + fs.rename(bucketSrc, bucketDest); + if (newFiles != null) newFiles.add(bucketDest); + } + } catch (IOException e) { + throw new HiveException("Error moving acid files " + e.getMessage(), e); } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java index 8cf261d..47c65bb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java @@ -101,6 +101,8 @@ private boolean isAcidTable; + private AcidUtils.AcidOperationalProperties acidOperationalProperties = null; + private transient TableSample tableSample; private transient Table tableMetadata; @@ -127,6 +129,9 @@ public TableScanDesc(final String alias, List vcs, Table tblMetad this.virtualCols = vcs; this.tableMetadata = tblMetadata; isAcidTable = AcidUtils.isAcidTable(this.tableMetadata); + if (isAcidTable) { + acidOperationalProperties = AcidUtils.getAcidOperationalProperties(this.tableMetadata); + } } @Override @@ -159,6 +164,10 @@ public boolean isAcidTable() { return isAcidTable; } + public AcidUtils.AcidOperationalProperties getAcidOperationalProperties() { + return acidOperationalProperties; + } + @Explain(displayName = "Output", explainLevels = { Level.USER }) public List getOutputColumnNames() { return this.neededColumns; diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 6caca98..c3e3982 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -17,9 +17,16 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; -import org.apache.hadoop.hive.common.ValidCompactorTxnList; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.regex.Matcher; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; @@ -27,6 +34,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.StringableMap; +import org.apache.hadoop.hive.common.ValidCompactorTxnList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.CompactionType; @@ -61,12 +69,8 @@ import org.apache.hadoop.mapred.TaskAttemptContext; import org.apache.hadoop.mapred.lib.NullOutputFormat; import org.apache.hadoop.util.StringUtils; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.*; -import java.util.regex.Matcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Class to do compactions via an MR job. This has to be in the ql package rather than metastore @@ -129,7 +133,7 @@ private JobConf createBaseJobConf(HiveConf conf, String jobName, Table t, Storag job.setInt(NUM_BUCKETS, sd.getNumBuckets()); job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString()); overrideMRProps(job, t.getParameters()); // override MR properties from tblproperties if applicable - if (ci.properties != null) { // override MR properties and general tblproperties if applicable + if (ci.properties != null) { overrideTblProps(job, t.getParameters(), ci.properties); } setColumnTypes(job, sd.getCols()); @@ -137,6 +141,11 @@ private JobConf createBaseJobConf(HiveConf conf, String jobName, Table t, Storag //to generate the target dir in the Map task, there is no easy way to pass it to OutputCommitter //to do the final move job.setBoolean("mapreduce.map.speculative", false); + + // Set appropriate Acid readers/writers based on the table properties. + AcidUtils.setAcidOperationalProperties(job, + AcidUtils.getAcidOperationalProperties(t.getParameters())); + return job; } @@ -501,12 +510,18 @@ public String toString() { Map splitToBucketMap = new HashMap(); for (Path dir : dirsToSearch) { FileSystem fs = dir.getFileSystem(entries); + // When we have split-update and there are two kinds of delta directories- + // the delta_x_y/ directory one which has only insert events and + // the delete_delta_x_y/ directory which has only the delete events. + // The clever thing about this kind of splitting is that everything in the delta_x_y/ + // directory can be processed as base files. However, this is left out currently + // as an improvement for the future. - // If this is a base or delta directory, then we need to be looking for the bucket files. - // But if it's a legacy file then we need to add it directly. if (dir.getName().startsWith(AcidUtils.BASE_PREFIX) || - dir.getName().startsWith(AcidUtils.DELTA_PREFIX)) { + dir.getName().startsWith(AcidUtils.DELTA_PREFIX) || + dir.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)) { boolean sawBase = dir.getName().startsWith(AcidUtils.BASE_PREFIX); + FileStatus[] files = fs.listStatus(dir, AcidUtils.bucketFileFilter); for(FileStatus f : files) { // For each file, figure out which bucket it is. @@ -519,6 +534,8 @@ public String toString() { addFileToMap(matcher, dir, true, splitToBucketMap); } } + + List splits = new ArrayList(splitToBucketMap.size()); for (Map.Entry e : splitToBucketMap.entrySet()) { BucketTracker bt = e.getValue(); @@ -613,7 +630,8 @@ public float getProgress() throws IOException { implements Mapper { JobConf jobConf; - RecordWriter writer; + RecordWriter writer = null; + RecordWriter deleteEventWriter = null; @Override public void map(WritableComparable key, CompactorInputSplit split, @@ -636,10 +654,30 @@ public void map(WritableComparable key, CompactorInputSplit split, RecordIdentifier identifier = reader.createKey(); V value = reader.createValue(); getWriter(reporter, reader.getObjectInspector(), split.getBucket()); + + AcidUtils.AcidOperationalProperties acidOperationalProperties + = AcidUtils.getAcidOperationalProperties(jobConf); + + if (!isMajor && acidOperationalProperties.isSplitUpdate()) { + // When split-update is enabled for ACID, we initialize a separate deleteEventWriter + // that is used to write all the delete events (in case of minor compaction only). For major + // compaction, history is not required to be maintained hence the delete events are processed + // but not re-written separately. + getDeleteEventWriter(reporter, reader.getObjectInspector(), split.getBucket()); + } + while (reader.next(identifier, value)) { - if (isMajor && reader.isDelete(value)) continue; - writer.write(value); - reporter.progress(); + boolean sawDeleteRecord = reader.isDelete(value); + if (isMajor && sawDeleteRecord) continue; + if (sawDeleteRecord && deleteEventWriter != null) { + // When minor compacting, write delete events to a separate file when split-update is + // turned on. + deleteEventWriter.write(value); + reporter.progress(); + } else { + writer.write(value); + reporter.progress(); + } } } @@ -653,6 +691,9 @@ public void close() throws IOException { if (writer != null) { writer.close(false); } + if (deleteEventWriter != null) { + deleteEventWriter.close(false); + } } private void getWriter(Reporter reporter, ObjectInspector inspector, @@ -679,6 +720,30 @@ private void getWriter(Reporter reporter, ObjectInspector inspector, } } + private void getDeleteEventWriter(Reporter reporter, ObjectInspector inspector, + int bucket) throws IOException { + if (deleteEventWriter == null) { + AcidOutputFormat.Options options = new AcidOutputFormat.Options(jobConf); + options.inspector(inspector) + .writingBase(false) + .writingDeleteDelta(true) // this is the option which will make it a delete writer + .isCompressed(jobConf.getBoolean(IS_COMPRESSED, false)) + .tableProperties(new StringableMap(jobConf.get(TABLE_PROPS)).toProperties()) + .reporter(reporter) + .minimumTransactionId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE)) + .maximumTransactionId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE)) + .bucket(bucket) + .statementId(-1);//setting statementId == -1 makes compacted delta files use + //delta_xxxx_yyyy format + + // Instantiate the underlying output format + @SuppressWarnings("unchecked")//since there is no way to parametrize instance of Class + AcidOutputFormat aof = + instantiate(AcidOutputFormat.class, jobConf.get(OUTPUT_FORMAT_CLASS_NAME)); + + deleteEventWriter = aof.getRawRecordWriter(new Path(jobConf.get(TMP_LOCATION)), options); + } + } } static class StringableList extends ArrayList { diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index af192fb..efcde2b 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -70,18 +70,19 @@ * specifically the tests; the supporting code here is just a clone of TestTxnCommands */ public class TestTxnCommands2 { - private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + + protected static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator + TestTxnCommands2.class.getCanonicalName() + "-" + System.currentTimeMillis() ).getPath().replaceAll("\\\\", "/"); - private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse"; + protected static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse"; //bucket count for test tables; set it to 1 for easier debugging - private static int BUCKET_COUNT = 2; + protected static int BUCKET_COUNT = 2; @Rule public TestName testName = new TestName(); - private HiveConf hiveConf; - private Driver d; - private static enum Table { + + protected HiveConf hiveConf; + protected Driver d; + protected static enum Table { ACIDTBL("acidTbl"), ACIDTBLPART("acidTblPart"), NONACIDORCTBL("nonAcidOrcTbl"), @@ -127,7 +128,7 @@ public void setUp() throws Exception { runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')"); runStatementOnDriver("create table " + Table.NONACIDPART + "(a int, b int) partitioned by (p string) stored as orc TBLPROPERTIES ('transactional'='false')"); } - private void dropTables() throws Exception { + protected void dropTables() throws Exception { for(Table t : Table.values()) { runStatementOnDriver("drop table if exists " + t); } @@ -731,6 +732,8 @@ public void testNonAcidToAcidConversion3() throws Exception { Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); } + + @Test public void testValidTxnsBookkeeping() throws Exception { // 1. Run a query against a non-ACID table, and we shouldn't have txn logged in conf @@ -859,11 +862,15 @@ private static void checkCompactionState(CompactionsByState expected, Compaction */ @Test public void testInitiatorWithMultipleFailedCompactions() throws Exception { + testInitiatorWithMultipleFailedCompactionsForVariousTblProperties("'transactional'='true'"); + } + + protected void testInitiatorWithMultipleFailedCompactionsForVariousTblProperties(String tblProperties) throws Exception { String tblName = "hive12353"; runStatementOnDriver("drop table if exists " + tblName); runStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed - " STORED AS ORC TBLPROPERTIES ('transactional'='true')"); + " STORED AS ORC TBLPROPERTIES ( " + tblProperties + " )"); hiveConf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 4); for(int i = 0; i < 5; i++) { //generate enough delta files so that Initiator can trigger auto compaction @@ -1074,11 +1081,15 @@ public static void runHouseKeeperService(HouseKeeperService houseKeeperService, */ @Test public void writeBetweenWorkerAndCleaner() throws Exception { + writeBetweenWorkerAndCleanerForVariousTblProperties("'transactional'='true'"); + } + + protected void writeBetweenWorkerAndCleanerForVariousTblProperties(String tblProperties) throws Exception { String tblName = "hive12352"; runStatementOnDriver("drop table if exists " + tblName); runStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed - " STORED AS ORC TBLPROPERTIES ('transactional'='true')"); + " STORED AS ORC TBLPROPERTIES ( " + tblProperties + " )"); //create some data runStatementOnDriver("insert into " + tblName + " values(1, 'foo'),(2, 'bar'),(3, 'baz')"); @@ -1125,7 +1136,6 @@ public void writeBetweenWorkerAndCleaner() throws Exception { Assert.assertEquals("", expected, runStatementOnDriver("select a,b from " + tblName + " order by a")); } - /** * Simulate the scenario when a heartbeat failed due to client errors such as no locks or no txns being found. * When a heartbeat fails, the query should be failed too. @@ -1215,17 +1225,59 @@ public void testNoHistory() throws Exception { hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true); runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData)); hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false); - + runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'"); runWorker(hiveConf); runCleaner(hiveConf); runStatementOnDriver("select count(*) from " + Table.ACIDTBL); } + + @Test + public void testACIDwithSchemaEvolution() throws Exception { + testACIDwithSchemaEvolutionForVariousTblProperties("'transactional'='true'"); + } + + protected void testACIDwithSchemaEvolutionForVariousTblProperties(String tblProperties) throws Exception { + String tblName = "acidWithSchemaEvol"; + runStatementOnDriver("drop table if exists " + tblName); + runStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed + " STORED AS ORC TBLPROPERTIES ( " + tblProperties + " )"); + + // create some data + runStatementOnDriver("insert into " + tblName + " values(1, 'foo'),(2, 'bar'),(3, 'baz')"); + runStatementOnDriver("update " + tblName + " set b = 'blah' where a = 3"); + + // apply schema evolution by adding some columns + runStatementOnDriver("alter table " + tblName + " add columns(c int, d string)"); + + // insert some data in new schema + runStatementOnDriver("insert into " + tblName + " values(4, 'acid', 100, 'orc')," + + "(5, 'llap', 200, 'tez')"); + + // update old data with values for the new schema columns + runStatementOnDriver("update " + tblName + " set d = 'hive' where a <= 3"); + runStatementOnDriver("update " + tblName + " set c = 999 where a <= 3"); + + // read the entire data back and see if did everything right + List rs = runStatementOnDriver("select * from " + tblName + " order by a"); + String[] expectedResult = { "1\tfoo\t999\thive", "2\tbar\t999\thive", "3\tblah\t999\thive", "4\tacid\t100\torc", "5\tllap\t200\ttez" }; + Assert.assertEquals(Arrays.asList(expectedResult), rs); + + // now compact and see if compaction still preserves the data correctness + runStatementOnDriver("alter table "+ tblName + " compact 'MAJOR'"); + runWorker(hiveConf); + runCleaner(hiveConf); + rs = runStatementOnDriver("select * from " + tblName + " order by a"); + Assert.assertEquals(Arrays.asList(expectedResult), rs); + } + + /** * takes raw data and turns it into a string as if from Driver.getResults() * sorts rows in dictionary order */ - private List stringifyValues(int[][] rowsIn) { + protected List stringifyValues(int[][] rowsIn) { assert rowsIn.length > 0; int[][] rows = rowsIn.clone(); Arrays.sort(rows, new RowComp()); @@ -1275,7 +1327,7 @@ private String makeValuesClause(int[][] rows) { return sb.toString(); } - private List runStatementOnDriver(String stmt) throws Exception { + protected List runStatementOnDriver(String stmt) throws Exception { CommandProcessorResponse cpr = d.run(stmt); if(cpr.getResponseCode() != 0) { throw new RuntimeException(stmt + " failed: " + cpr); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java new file mode 100644 index 0000000..82f7f92 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java @@ -0,0 +1,610 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql; + +import java.io.File; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +/** + * Same as TestTxnCommands2 but tests ACID tables with 'transactional_properties' set to 'default'. + * This tests whether ACID tables with split-update turned on are working correctly or not + * for the same set of tests when it is turned off. Of course, it also adds a few tests to test + * specific behaviors of ACID tables with split-update turned on. + */ +public class TestTxnCommands3 extends TestTxnCommands2 { + + public TestTxnCommands3() { + super(); + } + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Override + @Before + public void setUp() throws Exception { + tearDown(); + hiveConf = new HiveConf(this.getClass()); + hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR); + hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict"); + hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName()); + hiveConf + .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, + "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); + TxnDbUtil.setConfValues(hiveConf); + TxnDbUtil.prepDb(); + File f = new File(TEST_WAREHOUSE_DIR); + if (f.exists()) { + FileUtil.fullyDelete(f); + } + if (!(new File(TEST_WAREHOUSE_DIR).mkdirs())) { + throw new RuntimeException("Could not create " + TEST_WAREHOUSE_DIR); + } + SessionState.start(new SessionState(hiveConf)); + d = new Driver(hiveConf); + dropTables(); + runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true','transactional_properties'='default')"); + runStatementOnDriver("create table " + Table.ACIDTBLPART + "(a int, b int) partitioned by (p string) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true','transactional_properties'='default')"); + runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')"); + runStatementOnDriver("create table " + Table.NONACIDPART + "(a int, b int) partitioned by (p string) stored as orc TBLPROPERTIES ('transactional'='false')"); + } + + @Override + @Test + public void testOrcPPD() throws Exception { + final String defaultUnset = "unset"; + String oldSplitStrategyValue = hiveConf.get(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, defaultUnset); + // TODO: Setting split strategy as 'BI' is workaround for HIVE-14448 until it is resolved. + hiveConf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI"); + + super.testOrcPPD(); + + // Restore the previous value for split strategy, or unset if not previously set. + if (oldSplitStrategyValue.equals(defaultUnset)) { + hiveConf.unset(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname); + } else { + hiveConf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, oldSplitStrategyValue); + } + } + + @Override + @Test + public void testOrcNoPPD() throws Exception { + final String defaultUnset = "unset"; + String oldSplitStrategyValue = hiveConf.get(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, defaultUnset); + // TODO: Setting split strategy as 'BI' is workaround for HIVE-14448 until it is resolved. + hiveConf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI"); + + super.testOrcNoPPD(); + + // Restore the previous value for split strategy, or unset if not previously set. + if (oldSplitStrategyValue.equals(defaultUnset)) { + hiveConf.unset(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname); + } else { + hiveConf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, oldSplitStrategyValue); + } + } + + @Override + @Test + public void testInitiatorWithMultipleFailedCompactions() throws Exception { + // Test with split-update turned on. + testInitiatorWithMultipleFailedCompactionsForVariousTblProperties("'transactional'='true','transactional_properties'='default'"); + } + + @Override + @Test + public void writeBetweenWorkerAndCleaner() throws Exception { + writeBetweenWorkerAndCleanerForVariousTblProperties("'transactional'='true','transactional_properties'='default'"); + } + + @Override + @Test + public void testACIDwithSchemaEvolution() throws Exception { + testACIDwithSchemaEvolutionForVariousTblProperties("'transactional'='true','transactional_properties'='default'"); + } + + /** + * In current implementation of ACID, altering the value of transactional_properties or trying to + * set a value for previously unset value for an acid table will throw an exception. + * @throws Exception + */ + @Test + public void testFailureOnAlteringTransactionalProperties() throws Exception { + expectedException.expect(RuntimeException.class); + expectedException.expectMessage("TBLPROPERTIES with 'transactional_properties' cannot be altered after the table is created"); + runStatementOnDriver("create table acidTblLegacy (a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("alter table acidTblLegacy SET TBLPROPERTIES ('transactional_properties' = 'default')"); + } + + /** + * Test the query correctness and directory layout for ACID table conversion with split-update + * enabled. + * 1. Insert a row to Non-ACID table + * 2. Convert Non-ACID to ACID table with split-update enabled + * 3. Insert a row to ACID table + * 4. Perform Major compaction + * 5. Clean + * @throws Exception + */ + @Test + public void testNonAcidToAcidSplitUpdateConversion1() throws Exception { + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] status; + + // 1. Insert a row to Non-ACID table + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 original bucket files in the location (000000_0 and 000001_0) + Assert.assertEquals(BUCKET_COUNT, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + List rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + int [][] resultData = new int[][] {{1, 2}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + int resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 2. Convert NONACIDORCTBL to ACID table + runStatementOnDriver("alter table " + Table.NONACIDORCTBL + + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // Everything should be same as before + Assert.assertEquals(BUCKET_COUNT, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 2}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 3. Insert another row to newly-converted ACID table + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(3,4)"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 original bucket files (000000_0 and 000001_0), plus a new delta directory. + // The delta directory should also have only 1 bucket file (bucket_00001) + Assert.assertEquals(3, status.length); + boolean sawNewDelta = false; + for (int i = 0; i < status.length; i++) { + if (status[i].getPath().getName().matches("delta_.*")) { + sawNewDelta = true; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(1, buckets.length); // only one bucket file + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001")); + } else { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + } + Assert.assertTrue(sawNewDelta); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 2}, {3, 4}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 2; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 4. Perform a major compaction + runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'"); + runWorker(hiveConf); + // There should be 1 new directory: base_xxxxxxx. + // Original bucket files and delta directory should stay until Cleaner kicks in. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(4, status.length); + boolean sawNewBase = false; + for (int i = 0; i < status.length; i++) { + if (status[i].getPath().getName().matches("base_.*")) { + sawNewBase = true; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(1, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001")); + } + } + Assert.assertTrue(sawNewBase); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 2}, {3, 4}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 2; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 5. Let Cleaner delete obsolete files/dirs + // Note, here we create a fake directory along with fake files as original directories/files + String fakeFile0 = TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase() + + "/subdir/000000_0"; + String fakeFile1 = TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase() + + "/subdir/000000_1"; + fs.create(new Path(fakeFile0)); + fs.create(new Path(fakeFile1)); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // Before Cleaner, there should be 5 items: + // 2 original files, 1 original directory, 1 base directory and 1 delta directory + Assert.assertEquals(5, status.length); + runCleaner(hiveConf); + // There should be only 1 directory left: base_xxxxxxx. + // Original bucket files and delta directory should have been cleaned up. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(1, status.length); + Assert.assertTrue(status[0].getPath().getName().matches("base_.*")); + FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(1, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001")); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 2}, {3, 4}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 2; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + } + + /** + * Test the query correctness and directory layout for ACID table conversion with split-update + * enabled. + * 1. Insert a row to Non-ACID table + * 2. Convert Non-ACID to ACID table with split update enabled. + * 3. Update the existing row in ACID table + * 4. Perform Major compaction + * 5. Clean + * @throws Exception + */ + @Test + public void testNonAcidToAcidSplitUpdateConversion2() throws Exception { + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] status; + + // 1. Insert a row to Non-ACID table + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 original bucket files in the location (000000_0 and 000001_0) + Assert.assertEquals(BUCKET_COUNT, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + List rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + int [][] resultData = new int[][] {{1, 2}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + int resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 2. Convert NONACIDORCTBL to ACID table + runStatementOnDriver("alter table " + Table.NONACIDORCTBL + + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // Everything should be same as before + Assert.assertEquals(BUCKET_COUNT, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 2}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 3. Update the existing row in newly-converted ACID table + runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where a=1"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 original bucket files (000000_0 and 000001_0), plus one delta directory + // and one delete_delta directory. When split-update is enabled, an update event is split into + // a combination of delete and insert, that generates the delete_delta directory. + // The delta directory should also have 2 bucket files (bucket_00000 and bucket_00001) + // and so should the delete_delta directory. + Assert.assertEquals(4, status.length); + boolean sawNewDelta = false; + boolean sawNewDeleteDelta = false; + for (int i = 0; i < status.length; i++) { + if (status[i].getPath().getName().matches("delta_.*")) { + sawNewDelta = true; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + } else if (status[i].getPath().getName().matches("delete_delta_.*")) { + sawNewDeleteDelta = true; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + } else { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + } + Assert.assertTrue(sawNewDelta); + Assert.assertTrue(sawNewDeleteDelta); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 3}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 4. Perform a major compaction + runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'"); + runWorker(hiveConf); + // There should be 1 new directory: base_0000001. + // Original bucket files and delta directory should stay until Cleaner kicks in. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(5, status.length); + boolean sawNewBase = false; + for (int i = 0; i < status.length; i++) { + if (status[i].getPath().getName().matches("base_.*")) { + sawNewBase = true; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001")); + } + } + Assert.assertTrue(sawNewBase); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 3}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 5. Let Cleaner delete obsolete files/dirs + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // Before Cleaner, there should be 5 items: + // 2 original files, 1 delta directory, 1 delete_delta directory and 1 base directory + Assert.assertEquals(5, status.length); + runCleaner(hiveConf); + // There should be only 1 directory left: base_0000001. + // Original bucket files, delta directory and delete_delta should have been cleaned up. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(1, status.length); + Assert.assertTrue(status[0].getPath().getName().matches("base_.*")); + FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001")); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 3}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + } + + /** + * Test the query correctness and directory layout for ACID table conversion with split-update + * enabled. + * 1. Insert a row to Non-ACID table + * 2. Convert Non-ACID to ACID table with split-update enabled + * 3. Perform Major compaction + * 4. Insert a new row to ACID table + * 5. Perform another Major compaction + * 6. Clean + * @throws Exception + */ + @Test + public void testNonAcidToAcidSplitUpdateConversion3() throws Exception { + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] status; + + // 1. Insert a row to Non-ACID table + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 original bucket files in the location (000000_0 and 000001_0) + Assert.assertEquals(BUCKET_COUNT, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + List rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + int [][] resultData = new int[][] {{1, 2}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + int resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 2. Convert NONACIDORCTBL to ACID table with split_update enabled. (txn_props=default) + runStatementOnDriver("alter table " + Table.NONACIDORCTBL + + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // Everything should be same as before + Assert.assertEquals(BUCKET_COUNT, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 2}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 3. Perform a major compaction + runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'"); + runWorker(hiveConf); + // There should be 1 new directory: base_-9223372036854775808 + // Original bucket files should stay until Cleaner kicks in. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(3, status.length); + boolean sawNewBase = false; + for (int i = 0; i < status.length; i++) { + if (status[i].getPath().getName().matches("base_.*")) { + Assert.assertEquals("base_-9223372036854775808", status[i].getPath().getName()); + sawNewBase = true; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); + } + } + Assert.assertTrue(sawNewBase); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 2}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 1; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 4. Update the existing row, and insert another row to newly-converted ACID table + runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where a=1"); + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(3,4)"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Arrays.sort(status); // make sure delta_0000001_0000001_0000 appears before delta_0000002_0000002_0000 + // There should be 2 original bucket files (000000_0 and 000001_0), a base directory, + // plus two new delta directories and one delete_delta directory that would be created due to + // the update statement (remember split-update U=D+I)! + Assert.assertEquals(6, status.length); + int numDelta = 0; + int numDeleteDelta = 0; + sawNewBase = false; + for (int i = 0; i < status.length; i++) { + if (status[i].getPath().getName().matches("delta_.*")) { + numDelta++; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Arrays.sort(buckets); + if (numDelta == 1) { + Assert.assertEquals("delta_0000001_0000001_0000", status[i].getPath().getName()); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); + } else if (numDelta == 2) { + Assert.assertEquals("delta_0000002_0000002_0000", status[i].getPath().getName()); + Assert.assertEquals(1, buckets.length); + Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); + } + } else if (status[i].getPath().getName().matches("delete_delta_.*")) { + numDeleteDelta++; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Arrays.sort(buckets); + if (numDeleteDelta == 1) { + Assert.assertEquals("delete_delta_0000001_0000001_0000", status[i].getPath().getName()); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); + } + } else if (status[i].getPath().getName().matches("base_.*")) { + Assert.assertEquals("base_-9223372036854775808", status[i].getPath().getName()); + sawNewBase = true; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); + } else { + Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); + } + } + Assert.assertEquals(2, numDelta); + Assert.assertEquals(1, numDeleteDelta); + Assert.assertTrue(sawNewBase); + + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 3}, {3, 4}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 2; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 5. Perform another major compaction + runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'"); + runWorker(hiveConf); + // There should be 1 new base directory: base_0000001 + // Original bucket files, delta directories, delete_delta directories and the + // previous base directory should stay until Cleaner kicks in. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Arrays.sort(status); + Assert.assertEquals(7, status.length); + int numBase = 0; + for (int i = 0; i < status.length; i++) { + if (status[i].getPath().getName().matches("base_.*")) { + numBase++; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Arrays.sort(buckets); + if (numBase == 1) { + Assert.assertEquals("base_-9223372036854775808", status[i].getPath().getName()); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); + } else if (numBase == 2) { + // The new base dir now has two bucket files, since the delta dir has two bucket files + Assert.assertEquals("base_0000002", status[i].getPath().getName()); + Assert.assertEquals(1, buckets.length); + Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); + } + } + } + Assert.assertEquals(2, numBase); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 3}, {3, 4}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 2; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + + // 6. Let Cleaner delete obsolete files/dirs + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // Before Cleaner, there should be 6 items: + // 2 original files, 2 delta directories, 1 delete_delta directory and 2 base directories + Assert.assertEquals(7, status.length); + runCleaner(hiveConf); + // There should be only 1 directory left: base_0000001. + // Original bucket files, delta directories and previous base directory should have been cleaned up. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(1, status.length); + Assert.assertEquals("base_0000002", status[0].getPath().getName()); + FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Arrays.sort(buckets); + Assert.assertEquals(1, buckets.length); + Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + resultData = new int[][] {{1, 3}, {3, 4}}; + Assert.assertEquals(stringifyValues(resultData), rs); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + resultCount = 2; + Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java index 556df18..a7ff9a3 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java @@ -17,11 +17,21 @@ */ package org.apache.hadoop.hive.ql.io; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidCompactorTxnList; import org.apache.hadoop.hive.common.ValidReadTxnList; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.io.AcidUtils.AcidOperationalProperties; import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat; import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFile; import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFileSystem; @@ -30,13 +40,6 @@ import org.junit.Assert; import org.junit.Test; -import java.io.IOException; -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - public class TestAcidUtils { @Test @@ -60,12 +63,23 @@ public void testCreateFilename() throws Exception { options.writingBase(false); assertEquals("/tmp/delta_0000100_0000200_0000/bucket_00023", AcidUtils.createFilename(p, options).toString()); + options.writingDeleteDelta(true); + assertEquals("/tmp/delete_delta_0000100_0000200_0000/bucket_00023", + AcidUtils.createFilename(p, options).toString()); + options.writingDeleteDelta(false); options.statementId(-1); assertEquals("/tmp/delta_0000100_0000200/bucket_00023", AcidUtils.createFilename(p, options).toString()); + options.writingDeleteDelta(true); + assertEquals("/tmp/delete_delta_0000100_0000200/bucket_00023", + AcidUtils.createFilename(p, options).toString()); + options.writingDeleteDelta(false); options.statementId(7); assertEquals("/tmp/delta_0000100_0000200_0007/bucket_00023", AcidUtils.createFilename(p, options).toString()); + options.writingDeleteDelta(true); + assertEquals("/tmp/delete_delta_0000100_0000200_0007/bucket_00023", + AcidUtils.createFilename(p, options).toString()); } @Test public void testCreateFilenameLargeIds() throws Exception { @@ -86,7 +100,6 @@ public void testCreateFilenameLargeIds() throws Exception { assertEquals("/tmp/delta_1234567880_1234567890_0000/bucket_00023", AcidUtils.createFilename(p, options).toString()); } - @Test public void testParsing() throws Exception { @@ -94,19 +107,34 @@ public void testParsing() throws Exception { Path dir = new Path("/tmp/tbl"); Configuration conf = new Configuration(); AcidOutputFormat.Options opts = - AcidUtils.parseBaseBucketFilename(new Path(dir, "base_567/bucket_123"), + AcidUtils.parseBaseOrDeltaBucketFilename(new Path(dir, "base_567/bucket_123"), conf); assertEquals(false, opts.getOldStyle()); assertEquals(true, opts.isWritingBase()); assertEquals(567, opts.getMaximumTransactionId()); assertEquals(0, opts.getMinimumTransactionId()); assertEquals(123, opts.getBucket()); - opts = AcidUtils.parseBaseBucketFilename(new Path(dir, "000123_0"), conf); + opts = AcidUtils.parseBaseOrDeltaBucketFilename(new Path(dir, "delta_000005_000006/bucket_00001"), + conf); + assertEquals(false, opts.getOldStyle()); + assertEquals(false, opts.isWritingBase()); + assertEquals(6, opts.getMaximumTransactionId()); + assertEquals(5, opts.getMinimumTransactionId()); + assertEquals(1, opts.getBucket()); + opts = AcidUtils.parseBaseOrDeltaBucketFilename(new Path(dir, "delete_delta_000005_000006/bucket_00001"), + conf); + assertEquals(false, opts.getOldStyle()); + assertEquals(false, opts.isWritingBase()); + assertEquals(6, opts.getMaximumTransactionId()); + assertEquals(5, opts.getMinimumTransactionId()); + assertEquals(1, opts.getBucket()); + opts = AcidUtils.parseBaseOrDeltaBucketFilename(new Path(dir, "000123_0"), conf); assertEquals(true, opts.getOldStyle()); assertEquals(true, opts.isWritingBase()); assertEquals(123, opts.getBucket()); assertEquals(0, opts.getMinimumTransactionId()); assertEquals(0, opts.getMaximumTransactionId()); + } @Test @@ -471,5 +499,230 @@ public void deltasWithOpenTxnsNotInCompact2() throws Exception { assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString()); } + @Test + public void testBaseWithDeleteDeltas() throws Exception { + Configuration conf = new Configuration(); + conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, + AcidOperationalProperties.getDefault().toInt()); + MockFileSystem fs = new MockFileSystem(conf, + new MockFile("mock:/tbl/part1/base_5/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/base_10/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/base_49/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_025_025/bucket_0", 0, new byte[0]), + new MockFile("mock:/tbl/part1/delta_029_029/bucket_0", 0, new byte[0]), + new MockFile("mock:/tbl/part1/delete_delta_029_029/bucket_0", 0, new byte[0]), + new MockFile("mock:/tbl/part1/delta_025_030/bucket_0", 0, new byte[0]), + new MockFile("mock:/tbl/part1/delete_delta_025_030/bucket_0", 0, new byte[0]), + new MockFile("mock:/tbl/part1/delta_050_105/bucket_0", 0, new byte[0]), + new MockFile("mock:/tbl/part1/delete_delta_050_105/bucket_0", 0, new byte[0]), + new MockFile("mock:/tbl/part1/delete_delta_110_110/bucket_0", 0, new byte[0])); + AcidUtils.Directory dir = + AcidUtils.getAcidState(new TestInputOutputFormat.MockPath(fs, + "mock:/tbl/part1"), conf, new ValidReadTxnList("100:" + Long.MAX_VALUE + ":")); + assertEquals("mock:/tbl/part1/base_49", dir.getBaseDirectory().toString()); + List obsolete = dir.getObsolete(); + assertEquals(7, obsolete.size()); + assertEquals("mock:/tbl/part1/base_10", obsolete.get(0).getPath().toString()); + assertEquals("mock:/tbl/part1/base_5", obsolete.get(1).getPath().toString()); + assertEquals("mock:/tbl/part1/delete_delta_025_030", obsolete.get(2).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_025_030", obsolete.get(3).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_025_025", obsolete.get(4).getPath().toString()); + assertEquals("mock:/tbl/part1/delete_delta_029_029", obsolete.get(5).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_029_029", obsolete.get(6).getPath().toString()); + assertEquals(0, dir.getOriginalFiles().size()); + List deltas = dir.getCurrentDirectories(); + assertEquals(2, deltas.size()); + assertEquals("mock:/tbl/part1/delete_delta_050_105", deltas.get(0).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_050_105", deltas.get(1).getPath().toString()); + // The delete_delta_110_110 should not be read because it is greater than the high watermark. + } + + @Test + public void testOverlapingDeltaAndDeleteDelta() throws Exception { + Configuration conf = new Configuration(); + conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, + AcidOperationalProperties.getDefault().toInt()); + MockFileSystem fs = new MockFileSystem(conf, + new MockFile("mock:/tbl/part1/delta_0000063_63/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_000062_62/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_00061_61/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delete_delta_00064_64/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_40_60/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delete_delta_40_60/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_0060_60/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_052_55/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delete_delta_052_55/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/base_50/bucket_0", 500, new byte[0])); + Path part = new MockPath(fs, "mock:/tbl/part1"); + AcidUtils.Directory dir = + AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:" + Long.MAX_VALUE + ":")); + assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString()); + List obsolete = dir.getObsolete(); + assertEquals(3, obsolete.size()); + assertEquals("mock:/tbl/part1/delete_delta_052_55", obsolete.get(0).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_052_55", obsolete.get(1).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_0060_60", obsolete.get(2).getPath().toString()); + List delts = dir.getCurrentDirectories(); + assertEquals(6, delts.size()); + assertEquals("mock:/tbl/part1/delete_delta_40_60", delts.get(0).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_40_60", delts.get(1).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_00061_61", delts.get(2).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_000062_62", delts.get(3).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_0000063_63", delts.get(4).getPath().toString()); + assertEquals("mock:/tbl/part1/delete_delta_00064_64", delts.get(5).getPath().toString()); + } -} + @Test + public void testMinorCompactedDeltaMakesInBetweenDelteDeltaObsolete() throws Exception { + // This test checks that if we have a minor compacted delta for the txn range [40,60] + // then it will make any delete delta in that range as obsolete. + Configuration conf = new Configuration(); + conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, + AcidUtils.AcidOperationalProperties.getDefault().toInt()); + MockFileSystem fs = new MockFileSystem(conf, + new MockFile("mock:/tbl/part1/delta_40_60/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delete_delta_50_50/bucket_0", 500, new byte[0])); + Path part = new MockPath(fs, "mock:/tbl/part1"); + AcidUtils.Directory dir = + AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:" + Long.MAX_VALUE + ":")); + List obsolete = dir.getObsolete(); + assertEquals(1, obsolete.size()); + assertEquals("mock:/tbl/part1/delete_delta_50_50", obsolete.get(0).getPath().toString()); + List delts = dir.getCurrentDirectories(); + assertEquals(1, delts.size()); + assertEquals("mock:/tbl/part1/delta_40_60", delts.get(0).getPath().toString()); + } + + @Test + public void deltasAndDeleteDeltasWithOpenTxnsNotInCompact() throws Exception { + // This tests checks that appropriate delta and delete_deltas are included when minor + // compactions specifies a valid open txn range. + Configuration conf = new Configuration(); + conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, + AcidUtils.AcidOperationalProperties.getDefault().toInt()); + MockFileSystem fs = new MockFileSystem(conf, + new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delete_delta_2_2/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_2_5/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delete_delta_2_5/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_2_5/bucket_0" + AcidUtils.DELTA_SIDE_FILE_SUFFIX, 500, + new byte[0]), + new MockFile("mock:/tbl/part1/delete_delta_7_7/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_6_10/bucket_0", 500, new byte[0])); + Path part = new MockPath(fs, "mock:/tbl/part1"); + AcidUtils.Directory dir = + AcidUtils.getAcidState(part, conf, new ValidCompactorTxnList("4:" + Long.MAX_VALUE + ":")); + List delts = dir.getCurrentDirectories(); + assertEquals(2, delts.size()); + assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString()); + assertEquals("mock:/tbl/part1/delete_delta_2_2", delts.get(1).getPath().toString()); + } + + @Test + public void deleteDeltasWithOpenTxnInRead() throws Exception { + Configuration conf = new Configuration(); + conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, + AcidUtils.AcidOperationalProperties.getDefault().toInt()); + MockFileSystem fs = new MockFileSystem(conf, + new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_2_5/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delete_delta_2_5/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delete_delta_3_3/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_4_4_1/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_4_4_3/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_101_101_1/bucket_0", 500, new byte[0])); + Path part = new MockPath(fs, "mock:/tbl/part1"); + AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:4:4")); + List delts = dir.getCurrentDirectories(); + assertEquals(3, delts.size()); + assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString()); + assertEquals("mock:/tbl/part1/delete_delta_2_5", delts.get(1).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_2_5", delts.get(2).getPath().toString()); + // Note that delete_delta_3_3 should not be read, when a minor compacted + // [delete_]delta_2_5 is present. + } + + @Test + public void testDeleteDeltaSubdirPathGeneration() throws Exception { + String deleteDeltaSubdirPath = AcidUtils.deleteDeltaSubdir(1, 10); + assertEquals("delete_delta_0000001_0000010", deleteDeltaSubdirPath); + deleteDeltaSubdirPath = AcidUtils.deleteDeltaSubdir(1, 10, 5); + assertEquals("delete_delta_0000001_0000010_0005", deleteDeltaSubdirPath); + } + + @Test + public void testDeleteEventDeltaDirPathFilter() throws Exception { + Path positivePath = new Path("delete_delta_000001_000010"); + Path negativePath = new Path("delta_000001_000010"); + assertEquals(true, AcidUtils.deleteEventDeltaDirFilter.accept(positivePath)); + assertEquals(false, AcidUtils.deleteEventDeltaDirFilter.accept(negativePath)); + } + + @Test + public void testAcidOperationalProperties() throws Exception { + AcidUtils.AcidOperationalProperties testObj = AcidUtils.AcidOperationalProperties.getLegacy(); + assertsForAcidOperationalProperties(testObj, "legacy"); + + testObj = AcidUtils.AcidOperationalProperties.getDefault(); + assertsForAcidOperationalProperties(testObj, "default"); + + testObj = AcidUtils.AcidOperationalProperties.parseInt(0); + assertsForAcidOperationalProperties(testObj, "legacy"); + + testObj = AcidUtils.AcidOperationalProperties.parseInt(1); + assertsForAcidOperationalProperties(testObj, "split_update"); + + testObj = AcidUtils.AcidOperationalProperties.parseString("legacy"); + assertsForAcidOperationalProperties(testObj, "legacy"); + + testObj = AcidUtils.AcidOperationalProperties.parseString("default"); + assertsForAcidOperationalProperties(testObj, "default"); + + } + + private void assertsForAcidOperationalProperties(AcidUtils.AcidOperationalProperties testObj, + String type) throws Exception { + switch(type) { + case "split_update": + case "default": + assertEquals(true, testObj.isSplitUpdate()); + assertEquals(false, testObj.isHashBasedMerge()); + assertEquals(1, testObj.toInt()); + assertEquals("|split_update", testObj.toString()); + break; + case "legacy": + assertEquals(false, testObj.isSplitUpdate()); + assertEquals(false, testObj.isHashBasedMerge()); + assertEquals(0, testObj.toInt()); + assertEquals("", testObj.toString()); + break; + default: + break; + } + } + + @Test + public void testAcidOperationalPropertiesSettersAndGetters() throws Exception { + AcidUtils.AcidOperationalProperties oprProps = AcidUtils.AcidOperationalProperties.getDefault(); + Configuration testConf = new Configuration(); + // Test setter for configuration object. + AcidUtils.setAcidOperationalProperties(testConf, oprProps); + assertEquals(1, testConf.getInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, 0)); + // Test getter for configuration object. + assertEquals(oprProps.toString(), AcidUtils.getAcidOperationalProperties(testConf).toString()); + + Map parameters = new HashMap(); + // Test setter for map object. + AcidUtils.setAcidOperationalProperties(parameters, oprProps); + assertEquals(oprProps.toString(), + parameters.get(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname)); + // Test getter for map object. + // Calling a get on the 'parameters' will still return legacy type because the setters/getters + // for map work on different string keys. The setter will set the HIVE_TXN_OPERATIONAL_PROPERTIES + // while the getter will try to read the key TABLE_TRANSACTIONAL_PROPERTIES. + assertEquals(0, AcidUtils.getAcidOperationalProperties(parameters).toInt()); + parameters.put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, oprProps.toString()); + // Set the appropriate key in the map and test that we are able to read it back correctly. + assertEquals(1, AcidUtils.getAcidOperationalProperties(parameters).toInt()); + } +} \ No newline at end of file diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index 6648829..3470263 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -768,7 +768,8 @@ public void testEtlCombinedStrategy() throws Exception { MockFileSystem fs, String path, OrcInputFormat.CombinedCtx combineCtx) throws IOException { OrcInputFormat.AcidDirInfo adi = createAdi(context, fs, path); return OrcInputFormat.determineSplitStrategy(combineCtx, context, - adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles, null, null, true); + adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles, adi.parsedDeltas, + null, null, true); } public OrcInputFormat.AcidDirInfo createAdi( @@ -781,7 +782,8 @@ public void testEtlCombinedStrategy() throws Exception { OrcInputFormat.Context context, OrcInputFormat.FileGenerator gen) throws IOException { OrcInputFormat.AcidDirInfo adi = gen.call(); return OrcInputFormat.determineSplitStrategy( - null, context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles, null, null, true); + null, context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles, adi.parsedDeltas, + null, null, true); } public static class MockBlock {