diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 761dbb2..57a6084 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1687,7 +1687,11 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Set this to true so that when attempt to acquire a lock on resource times out, the current state" + " of the lock manager is dumped to log file. This is for debugging. See also " + "hive.lock.numretries and hive.lock.sleep.between.retries."), - + + HIVE_ACID_OPERATIONAL_PROPERTIES("hive.acid.operational.properties", 0, + "Operational properties that can be used to choose the appropriate set of readers/writers" + + "for various types of allowed Acid files."), + HIVE_MAX_OPEN_TXNS("hive.max.open.txns", 100000, "Maximum number of open transactions. If \n" + "current open transactions reach this limit, future open transaction requests will be \n" + "rejected, until this number goes below the limit."), diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java index 14f7316..fddab85 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java @@ -133,6 +133,9 @@ public void configureInputJobProperties(TableDesc tableDesc, boolean isAcidTable = AcidUtils.isTablePropertyTransactional(tableProperties); AcidUtils.setTransactionalTableScan(jobProperties, isAcidTable); + AcidUtils.AcidOperationalProperties acidOperationalProperties = + AcidUtils.getAcidOperationalProperties(tableProperties); + AcidUtils.setAcidOperationalProperties(jobProperties, acidOperationalProperties); } } catch (IOException e) { throw new IllegalStateException("Failed to set output path", e); diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java index 0c6b9ea..8503702 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java @@ -46,6 +46,7 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.Properties; public abstract class AbstractRecordWriter implements RecordWriter { @@ -238,10 +239,16 @@ public void closeBatch() throws StreamingIOFailure { private RecordUpdater createRecordUpdater(int bucketId, Long minTxnId, Long maxTxnID) throws IOException, SerializationError { try { + // Initialize table properties from the table parameters. This is required because the table + // may define certain table parameters that may be required while writing. The table parameter + // 'transactional_properties' is one such example. + Properties tblProperties = new Properties(); + tblProperties.putAll(tbl.getParameters()); return outf.getRecordUpdater(partitionPath, new AcidOutputFormat.Options(conf) .inspector(getSerde().getObjectInspector()) .bucket(bucketId) + .tableProperties(tblProperties) .minimumTransactionId(minTxnId) .maximumTransactionId(maxTxnID) .statementId(-1) diff --git metastore/if/hive_metastore.thrift metastore/if/hive_metastore.thrift index 4d92b73..84cc4f6 100755 --- metastore/if/hive_metastore.thrift +++ metastore/if/hive_metastore.thrift @@ -1474,5 +1474,6 @@ const string FILE_OUTPUT_FORMAT = "file.outputformat", const string META_TABLE_STORAGE = "storage_handler", const string TABLE_IS_TRANSACTIONAL = "transactional", const string TABLE_NO_AUTO_COMPACT = "no_auto_compaction", +const string TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties", diff --git metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp index f982bf2..1cbd176 100644 --- metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp +++ metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp @@ -57,6 +57,8 @@ hive_metastoreConstants::hive_metastoreConstants() { TABLE_NO_AUTO_COMPACT = "no_auto_compaction"; + TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties"; + } }}} // namespace diff --git metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.h metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.h index ae14bd1..3d068c3 100644 --- metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.h +++ metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.h @@ -38,6 +38,7 @@ class hive_metastoreConstants { std::string META_TABLE_STORAGE; std::string TABLE_IS_TRANSACTIONAL; std::string TABLE_NO_AUTO_COMPACT; + std::string TABLE_TRANSACTIONAL_PROPERTIES; }; extern const hive_metastoreConstants g_hive_metastore_constants; diff --git metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java index 5a666f2..8de8896 100644 --- metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java +++ metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java @@ -82,4 +82,6 @@ public static final String TABLE_NO_AUTO_COMPACT = "no_auto_compaction"; + public static final String TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties"; + } diff --git metastore/src/gen/thrift/gen-php/metastore/Types.php metastore/src/gen/thrift/gen-php/metastore/Types.php index f505208..10b1d97 100644 --- metastore/src/gen/thrift/gen-php/metastore/Types.php +++ metastore/src/gen/thrift/gen-php/metastore/Types.php @@ -18842,6 +18842,7 @@ final class Constant extends \Thrift\Type\TConstant { static protected $META_TABLE_STORAGE; static protected $TABLE_IS_TRANSACTIONAL; static protected $TABLE_NO_AUTO_COMPACT; + static protected $TABLE_TRANSACTIONAL_PROPERTIES; static protected function init_DDL_TIME() { return "transient_lastDdlTime"; @@ -18934,6 +18935,10 @@ final class Constant extends \Thrift\Type\TConstant { static protected function init_TABLE_NO_AUTO_COMPACT() { return "no_auto_compaction"; } + + static protected function init_TABLE_TRANSACTIONAL_PROPERTIES() { + return "transactional_properties"; + } } diff --git metastore/src/gen/thrift/gen-py/hive_metastore/constants.py metastore/src/gen/thrift/gen-py/hive_metastore/constants.py index d1c07a5..5100236 100644 --- metastore/src/gen/thrift/gen-py/hive_metastore/constants.py +++ metastore/src/gen/thrift/gen-py/hive_metastore/constants.py @@ -32,3 +32,4 @@ META_TABLE_STORAGE = "storage_handler" TABLE_IS_TRANSACTIONAL = "transactional" TABLE_NO_AUTO_COMPACT = "no_auto_compaction" +TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties" diff --git metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb index eeccc84..6aa7143 100644 --- metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb +++ metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb @@ -53,3 +53,5 @@ TABLE_IS_TRANSACTIONAL = %q"transactional" TABLE_NO_AUTO_COMPACT = %q"no_auto_compaction" +TABLE_TRANSACTIONAL_PROPERTIES = %q"transactional_properties" + diff --git metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java index 3e74675..524d77e 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java @@ -17,20 +17,29 @@ */ package org.apache.hadoop.hive.metastore; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.events.PreAlterTableEvent; import org.apache.hadoop.hive.metastore.events.PreCreateTableEvent; import org.apache.hadoop.hive.metastore.events.PreEventContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - final class TransactionalValidationListener extends MetaStorePreEventListener { public static final Logger LOG = LoggerFactory.getLogger(TransactionalValidationListener.class); + + // Mirrored from org.apache.hadoop.hive.ql.io.AcidUtils. We could have imported the constant but + // that would create a cyclic dependency between hive.metastore and hive.ql, hence is duplicated. + public static final String DEFAULT_VALUE_STRING = "default"; TransactionalValidationListener(Configuration conf) { super(conf); @@ -80,6 +89,7 @@ private void handleAlterTableTransactionalProp(PreAlterTableEvent context) throw if (transactionalValuePresent) { //normalize prop name parameters.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, transactionalValue); + initializeTransactionalProperties(newTable); } if ("true".equalsIgnoreCase(transactionalValue)) { if (!conformToAcid(newTable)) { @@ -157,6 +167,7 @@ private void handleCreateTableTransactionalProp(PreCreateTableEvent context) thr // normalize prop name parameters.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.TRUE.toString()); + initializeTransactionalProperties(newTable); return; } @@ -187,4 +198,38 @@ private boolean conformToAcid(Table table) throws MetaException { return true; } + + private void initializeTransactionalProperties(Table table) throws MetaException { + // All new versions of Acid tables created after the introduction of Acid version/type system + // must have TRANSACTIONAL_PROPERTIES property defined, otherwise they will be assumed of + // legacy type. + + // Initialize transaction table properties with default string value. + String tableTransactionalProperties = DEFAULT_VALUE_STRING; + + Map parameters = table.getParameters(); + if (parameters != null) { + Set keys = new HashSet<>(parameters.keySet()); + for (String key : keys) { + if (hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES.equalsIgnoreCase(key)) { + tableTransactionalProperties = parameters.get(key); + parameters.remove(key); + String validationError = validateTransactionalProperties(tableTransactionalProperties); + if (validationError != null) { + throw new MetaException("Invalid transactional properties specified for the " + + "table with the error " + validationError); + } + break; + } + } + } + + parameters.put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, + tableTransactionalProperties); + } + + private String validateTransactionalProperties(String transactionalProperties) { + // TODO: Implement validation logic for transactional properties + return null; // All checks passed, return null. + } } \ No newline at end of file diff --git orc/src/java/org/apache/orc/impl/SchemaEvolution.java orc/src/java/org/apache/orc/impl/SchemaEvolution.java index 2c80aaa..fc1f073 100644 --- orc/src/java/org/apache/orc/impl/SchemaEvolution.java +++ orc/src/java/org/apache/orc/impl/SchemaEvolution.java @@ -38,6 +38,12 @@ private final TypeDescription readerSchema; private static final Log LOG = LogFactory.getLog(SchemaEvolution.class); + private enum SchemaType { + NON_ACID_SCHEMA, + SIMPLE_ACID_SCHEMA, + SPLIT_UPDATE_ACID_SCHEMA; + } + public SchemaEvolution(TypeDescription readerSchema, boolean[] included) { this.included = included; readerToFile = null; @@ -49,10 +55,16 @@ public SchemaEvolution(TypeDescription fileSchema, boolean[] included) throws IOException { readerToFile = new HashMap<>(readerSchema.getMaximumId() + 1); this.included = included; - if (checkAcidSchema(fileSchema)) { - this.readerSchema = createEventSchema(readerSchema); - } else { - this.readerSchema = readerSchema; + SchemaType schemaType = getSchemaType(fileSchema); + switch (schemaType) { + case SIMPLE_ACID_SCHEMA: + this.readerSchema = createSimpleEventAcidSchema(readerSchema); + break; + case SPLIT_UPDATE_ACID_SCHEMA: + this.readerSchema = createSplitUpdateEventAcidSchema(readerSchema); + break; + default: + this.readerSchema = readerSchema; } buildMapping(fileSchema, this.readerSchema); } @@ -153,21 +165,23 @@ void buildMapping(TypeDescription fileType, } } - private static boolean checkAcidSchema(TypeDescription type) { + private static SchemaType getSchemaType(TypeDescription type) { if (type.getCategory().equals(TypeDescription.Category.STRUCT)) { List rootFields = type.getFieldNames(); - if (acidEventFieldNames.equals(rootFields)) { - return true; + if (acidSimpleEventFieldNames.equals(rootFields)) { + return SchemaType.SIMPLE_ACID_SCHEMA; + } else if (acidSplitUpdateEventFieldNames.equals(rootFields)) { + return SchemaType.SPLIT_UPDATE_ACID_SCHEMA; } } - return false; + return SchemaType.NON_ACID_SCHEMA; } /** - * @param typeDescr + * @param typeDescr TypeDescription corresponding to an ACID event * @return ORC types for the ACID event based on the row's type description */ - public static TypeDescription createEventSchema(TypeDescription typeDescr) { + public static TypeDescription createSimpleEventAcidSchema(TypeDescription typeDescr) { TypeDescription result = TypeDescription.createStruct() .addField("operation", TypeDescription.createInt()) .addField("originalTransaction", TypeDescription.createLong()) @@ -178,13 +192,48 @@ public static TypeDescription createEventSchema(TypeDescription typeDescr) { return result; } - public static final List acidEventFieldNames= new ArrayList(); + /** + * @param typeDescr TypeDescription corresponding to an ACID event + * @return ORC types for the ACID event based on the row's type description + */ + public static TypeDescription createSplitUpdateEventAcidSchema(TypeDescription typeDescr) { + TypeDescription result = TypeDescription.createStruct() + .addField("operation", TypeDescription.createInt()) + .addField("originalTransaction", TypeDescription.createLong()) + .addField("bucket", TypeDescription.createInt()) + .addField("rowId", TypeDescription.createLong()) + .addField("currentTransaction", TypeDescription.createLong()) + .addField("row", typeDescr.clone()) + .addField("deletedOriginalTransaction", TypeDescription.createLong()) + .addField("deletedBucket", TypeDescription.createInt()) + .addField("deletedRowId", TypeDescription.createLong()) + .addField("deletedCurrentTransaction", TypeDescription.createLong()); + return result; + } + + public static final List acidSimpleEventFieldNames = new ArrayList(); + + static { + acidSimpleEventFieldNames.add("operation"); + acidSimpleEventFieldNames.add("originalTransaction"); + acidSimpleEventFieldNames.add("bucket"); + acidSimpleEventFieldNames.add("rowId"); + acidSimpleEventFieldNames.add("currentTransaction"); + acidSimpleEventFieldNames.add("row"); + } + + public static final List acidSplitUpdateEventFieldNames = new ArrayList(); + static { - acidEventFieldNames.add("operation"); - acidEventFieldNames.add("originalTransaction"); - acidEventFieldNames.add("bucket"); - acidEventFieldNames.add("rowId"); - acidEventFieldNames.add("currentTransaction"); - acidEventFieldNames.add("row"); + acidSplitUpdateEventFieldNames.add("operation"); + acidSplitUpdateEventFieldNames.add("originalTransaction"); + acidSplitUpdateEventFieldNames.add("bucket"); + acidSplitUpdateEventFieldNames.add("rowId"); + acidSplitUpdateEventFieldNames.add("currentTransaction"); + acidSplitUpdateEventFieldNames.add("row"); + acidSplitUpdateEventFieldNames.add("deletedOriginalTransaction"); + acidSplitUpdateEventFieldNames.add("deletedBucket"); + acidSplitUpdateEventFieldNames.add("deletedRowId"); + acidSplitUpdateEventFieldNames.add("deletedCurrentTransaction"); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java index dff1815..1e518ea 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java @@ -79,6 +79,7 @@ public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext HiveInputFormat.pushFilters(job, ts); AcidUtils.setTransactionalTableScan(job, ts.getConf().isAcidTable()); + AcidUtils.setAcidOperationalProperties(job, ts.getConf().getAcidOperationalProperties()); } sink = work.getSink(); fetch = new FetchOperator(work, job, source, getVirtualColumns(source)); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java index 27b1673..ac17b78 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java @@ -205,6 +205,7 @@ public void initializeMapredLocalWork(MapJoinDesc mjConf, Configuration hconf, HiveInputFormat.pushFilters(jobClone, ts); AcidUtils.setTransactionalTableScan(jobClone, ts.getConf().isAcidTable()); + AcidUtils.setAcidOperationalProperties(jobClone, ts.getConf().getAcidOperationalProperties()); ts.passExecContext(getExecContext()); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java index 24bf506..26f3157 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java @@ -472,6 +472,7 @@ private void initializeOperators(Map fetchOpJobConfMap) HiveInputFormat.pushFilters(jobClone, ts); AcidUtils.setTransactionalTableScan(job, ts.getConf().isAcidTable()); + AcidUtils.setAcidOperationalProperties(job, ts.getConf().getAcidOperationalProperties()); // create a fetch operator FetchOperator fetchOp = new FetchOperator(entry.getValue(), jobClone); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 36f38f6..5f85119 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -18,9 +18,14 @@ package org.apache.hadoop.hive.ql.io; -import org.apache.hadoop.hive.metastore.api.DataOperationType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.regex.Pattern; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -29,22 +34,17 @@ import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.api.DataOperationType; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.shims.HadoopShims; -import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.regex.Pattern; - /** * Utilities that are shared by all of the ACID input and output formats. They * are used by the compactor and cleaner and thus must be format agnostic. @@ -246,6 +246,167 @@ public static DataOperationType toDataOperationType(Operation op) { throw new IllegalArgumentException("Unexpected Operation: " + op); } } + + public enum AcidEventSchemaType { + SIMPLE_SCHEMA("simple_schema"), + SPLIT_UPDATE_SCHEMA("split_update_schema"); + + private final String acidEventSchemaType; + private AcidEventSchemaType(String acidEventSchemaType) { + this.acidEventSchemaType = acidEventSchemaType; + } + + @Override + public String toString() { + return acidEventSchemaType; + } + } + + public static class AcidOperationalProperties { + private int description = 0x00; + public static final int SPLIT_UPDATE_BIT = 0x01; + public static final String SPLIT_UPDATE_STRING = "split_update"; + public static final int HASH_BASED_MERGE_BIT = 0x02; + public static final String HASH_BASED_MERGE_STRING = "hash_merge"; + public static final String DEFAULT_VALUE_STRING = "default"; + public static final String LEGACY_VALUE_STRING = "legacy"; + + private AcidOperationalProperties() { + } + + /** + * Returns an acidOperationalProperties object that represents ACID behavior for legacy tables + * that were created before ACID type system using operational properties was put in place. + * @return the acidOperationalProperties object + */ + public static AcidOperationalProperties getLegacy() { + AcidOperationalProperties obj = new AcidOperationalProperties(); + // In legacy mode, none of these properties are turned on. + return obj; + } + + /** + * Returns an acidOperationalProperties object that represents default ACID behavior for tables + * that do no explicitly specify/override the default behavior. + * @return the acidOperationalProperties object. + */ + public static AcidOperationalProperties getDefault() { + AcidOperationalProperties obj = new AcidOperationalProperties(); + obj.setSplitUpdate(true); + obj.setHashBasedMerge(false); + return obj; + } + + /** + * Returns an acidOperationalProperties object that is represented by an encoded string. + * @param propertiesStr an encoded string representing the acidOperationalProperties. + * @return the acidOperationalProperties object. + */ + public static AcidOperationalProperties parseString(String propertiesStr) { + if (propertiesStr == null) { + return null; + } + if (propertiesStr.equalsIgnoreCase(DEFAULT_VALUE_STRING)) { + return AcidOperationalProperties.getDefault(); + } + if (propertiesStr.equalsIgnoreCase(LEGACY_VALUE_STRING)) { + return AcidOperationalProperties.getLegacy(); + } + AcidOperationalProperties obj = new AcidOperationalProperties(); + String[] options = propertiesStr.split("|"); + for (String option : options) { + switch (option) { + case SPLIT_UPDATE_STRING: + obj.setSplitUpdate(true); + break; + case HASH_BASED_MERGE_STRING: + obj.setHashBasedMerge(true); + break; + default: + break; + } + } + return obj; + } + + /** + * Returns an acidOperationalProperties object that is represented by an encoded 32-bit integer. + * @param properties an encoded 32-bit representing the acidOperationalProperties. + * @return the acidOperationalProperties object. + */ + public static AcidOperationalProperties parseInt(int properties) { + AcidOperationalProperties obj = new AcidOperationalProperties(); + if ((properties & SPLIT_UPDATE_BIT) > 0) { + obj.setSplitUpdate(true); + } + if ((properties & HASH_BASED_MERGE_BIT) > 0) { + obj.setHashBasedMerge(true); + } + return obj; + } + + /** + * Sets the split update property for ACID operations based on the boolean argument. + * When split update is turned on, an update ACID event is interpreted as a combination of + * delete event followed by an update event. + * @param isSplitUpdate a boolean property that turns on split update when true. + * @return the acidOperationalProperties object. + */ + public AcidOperationalProperties setSplitUpdate(boolean isSplitUpdate) { + description = (isSplitUpdate + ? (description | SPLIT_UPDATE_BIT) : (description & ~SPLIT_UPDATE_BIT)); + return this; + } + + /** + * Sets the hash-based merge property for ACID operations that combines delta files using + * GRACE hash join based approach, when turned on. (Currently unimplemented!) + * @param isHashBasedMerge a boolean property that turns on hash-based merge when true. + * @return the acidOperationalProperties object. + */ + public AcidOperationalProperties setHashBasedMerge(boolean isHashBasedMerge) { + description = (isHashBasedMerge + ? (description | HASH_BASED_MERGE_BIT) : (description & ~HASH_BASED_MERGE_BIT)); + return this; + } + + public boolean isSplitUpdate() { + return (description & SPLIT_UPDATE_BIT) > 0; + } + + /** + * Returns the appropriate schema type for the decorated ACID columns based on the + * acid operational properties defined. + * @return the acid event schema type. + */ + public AcidEventSchemaType getAcidEventSchemaType() { + if (isSplitUpdate()) { + return AcidEventSchemaType.SPLIT_UPDATE_SCHEMA; + } else { + return AcidEventSchemaType.SIMPLE_SCHEMA; + } + } + + public boolean isHashBasedMerge() { + return (description & HASH_BASED_MERGE_BIT) > 0; + } + + public int toInt() { + return description; + } + + @Override + public String toString() { + StringBuilder str = new StringBuilder(); + if (isSplitUpdate()) { + str.append("|" + SPLIT_UPDATE_STRING); + } + if (isHashBasedMerge()) { + str.append("|" + HASH_BASED_MERGE_STRING); + } + return str.toString(); + } + } public static interface Directory { @@ -745,4 +906,84 @@ public static boolean isAcidTable(Table table) { return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true"); } + + /** + * Sets the acidOperationalProperties in the configuration object argument. + * @param conf Mutable configuration object + * @param properties An acidOperationalProperties object to initialize from. + */ + public static void setAcidOperationalProperties(Configuration conf, + AcidOperationalProperties properties) { + if (properties != null) { + HiveConf.setIntVar(conf, ConfVars.HIVE_ACID_OPERATIONAL_PROPERTIES, properties.toInt()); + } + } + + /** + * Sets the acidOperationalProperties in the map object argument. + * @param parameters Mutable map object + * @param properties An acidOperationalProperties object to initialize from. + */ + public static void setAcidOperationalProperties( + Map parameters, AcidOperationalProperties properties) { + if (properties != null) { + parameters.put(ConfVars.HIVE_ACID_OPERATIONAL_PROPERTIES.varname, properties.toString()); + } + } + + /** + * Returns the acidOperationalProperties for a given table. + * @param table A table object + * @return the acidOperationalProperties object for the corresponding table. + */ + public static AcidOperationalProperties getAcidOperationalProperties(Table table) { + String transactionalProperties = table.getProperty( + hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); + if (transactionalProperties == null) { + // If the table does not define any transactional properties, we return a legacy type. + return AcidOperationalProperties.getLegacy(); + } + return AcidOperationalProperties.parseString(transactionalProperties); + } + + /** + * Returns the acidOperationalProperties for a given configuration. + * @param conf A configuration object + * @return the acidOperationalProperties object for the corresponding configuration. + */ + public static AcidOperationalProperties getAcidOperationalProperties(Configuration conf) { + // If the conf does not define any transactional properties, the parseInt() should receive + // a value of zero, which will set AcidOperationalProperties to a legacy type and return that. + return AcidOperationalProperties.parseInt( + HiveConf.getIntVar(conf, ConfVars.HIVE_ACID_OPERATIONAL_PROPERTIES)); + } + + /** + * Returns the acidOperationalProperties for a given set of properties. + * @param props A properties object + * @return the acidOperationalProperties object for the corresponding properties. + */ + public static AcidOperationalProperties getAcidOperationalProperties(Properties props) { + String resultStr = props.getProperty(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); + if (resultStr == null) { + // If the properties does not define any transactional properties, we return a legacy type. + return AcidOperationalProperties.getLegacy(); + } + return AcidOperationalProperties.parseString(resultStr); + } + + /** + * Returns the acidOperationalProperties for a given map. + * @param parameters A parameters object + * @return the acidOperationalProperties object for the corresponding map. + */ + public static AcidOperationalProperties getAcidOperationalProperties( + Map parameters) { + String resultStr = parameters.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); + if (resultStr == null) { + // If the parameters does not define any transactional properties, we return a legacy type. + return AcidOperationalProperties.getLegacy(); + } + return AcidOperationalProperties.parseString(resultStr); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index cfedf35..6e90c42 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -654,6 +654,7 @@ protected void pushProjectionsAndFilters(JobConf jobConf, Class inputFormatClass pushFilters(jobConf, ts); AcidUtils.setTransactionalTableScan(job, ts.getConf().isAcidTable()); + AcidUtils.setAcidOperationalProperties(job, ts.getConf().getAcidOperationalProperties()); } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index d7a8c2f..f5ed422 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -1781,13 +1781,7 @@ public ObjectInspector getObjectInspector() { @Override public boolean next(RecordIdentifier recordIdentifier, OrcStruct orcStruct) throws IOException { - boolean result; - // filter out the deleted records - do { - result = records.next(recordIdentifier, innerRecord); - } while (result && - OrcRecordUpdater.getOperation(innerRecord) == - OrcRecordUpdater.DELETE_OPERATION); + boolean result = records.next(recordIdentifier, innerRecord); if (result) { // swap the fields with the passed in orcStruct orcStruct.linkFields(OrcRecordUpdater.getRow(innerRecord)); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java index b0f8c8b..82dd498 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java @@ -285,6 +285,9 @@ public RecordUpdater getRecordUpdater(Path path, final Path filename = AcidUtils.createFilename(path, options); final OrcFile.WriterOptions opts = OrcFile.writerOptions(options.getConfiguration()); + final AcidUtils.AcidOperationalProperties acidOperationalProperties = + AcidUtils.getAcidOperationalProperties(options.getConfiguration()); + if (!options.isWritingBase()) { opts.bufferSize(OrcRecordUpdater.DELTA_BUFFER_SIZE) .stripeSize(OrcRecordUpdater.DELTA_STRIPE_SIZE) @@ -306,7 +309,8 @@ public void write(Writable w) throws IOException { ((LongWritable) orc.getFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION)).get(), ((IntWritable) orc.getFieldValue(OrcRecordUpdater.BUCKET)).get(), - ((LongWritable) orc.getFieldValue(OrcRecordUpdater.ROW_ID)).get()); + ((LongWritable) orc.getFieldValue(OrcRecordUpdater.ROW_ID)).get(), + acidOperationalProperties.getAcidEventSchemaType()); writer.addRow(w); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index b9094bf..a6bf3dd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -19,27 +19,30 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.TreeMap; -import org.apache.orc.OrcUtils; -import org.apache.orc.StripeInformation; -import org.apache.orc.TypeDescription; -import org.apache.orc.impl.AcidStats; -import org.apache.orc.impl.OrcAcidUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; +import org.apache.orc.OrcUtils; +import org.apache.orc.Reader.Options; +import org.apache.orc.StripeInformation; +import org.apache.orc.TypeDescription; +import org.apache.orc.impl.AcidStats; +import org.apache.orc.impl.OrcAcidUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; @@ -52,6 +55,7 @@ private static final Logger LOG = LoggerFactory.getLogger(OrcRawRecordMerger.class); private final Configuration conf; + private final AcidUtils.AcidOperationalProperties acidOperationalProperties; private final boolean collapse; private final RecordReader baseReader; private final ObjectInspector objectInspector; @@ -124,6 +128,9 @@ public boolean equals(Object other) { @Override public int compareTo(RecordIdentifier other) { int sup = compareToInternal(other); + // In the new version of Update=Insert+Delete, the following if clause becomes + // redundant. The if-clause will always be skipped (i.e. sup will never be zero). + // This is because we will not have any records with same reader key triplet. if (sup == 0) { if (other.getClass() == ReaderKey.class) { ReaderKey oth = (ReaderKey) other; @@ -146,7 +153,7 @@ public int compareTo(RecordIdentifier other) { private boolean isSameRow(ReaderKey other) { return compareRow(other) == 0 && currentTransactionId == other.currentTransactionId; } - + public long getCurrentTransactionId() { return currentTransactionId; } @@ -176,6 +183,7 @@ public String toString() { */ static class ReaderPair { OrcStruct nextRecord; + final AcidUtils.AcidOperationalProperties acidOperationalProperties; final Reader reader; final RecordReader recordReader; final ReaderKey key; @@ -196,9 +204,11 @@ public String toString() { * @param statementId id of SQL statement within a transaction * @throws IOException */ - ReaderPair(ReaderKey key, Reader reader, int bucket, + ReaderPair(AcidUtils.AcidOperationalProperties acidOperationalProperties, + ReaderKey key, Reader reader, int bucket, RecordIdentifier minKey, RecordIdentifier maxKey, ReaderImpl.Options options, int statementId) throws IOException { + this.acidOperationalProperties = acidOperationalProperties; this.reader = reader; this.key = key; this.maxKey = maxKey; @@ -246,21 +256,24 @@ int getColumns() { * makes the relevant translations. */ static final class OriginalReaderPair extends ReaderPair { - OriginalReaderPair(ReaderKey key, Reader reader, int bucket, + OriginalReaderPair(AcidUtils.AcidOperationalProperties acidOperationalProperties, + ReaderKey key, Reader reader, int bucket, RecordIdentifier minKey, RecordIdentifier maxKey, Reader.Options options) throws IOException { - super(key, reader, bucket, minKey, maxKey, options, 0); + super(acidOperationalProperties, key, reader, bucket, minKey, maxKey, options, 0); } @Override void next(OrcStruct next) throws IOException { if (recordReader.hasNext()) { + final int total_number_of_fields = OrcRecordUpdater.getTotalNumberOfFields( + acidOperationalProperties.getAcidEventSchemaType()); long nextRowId = recordReader.getRowNumber(); // have to do initialization here, because the super's constructor // calls next and thus we need to initialize before our constructor // runs if (next == null) { - nextRecord = new OrcStruct(OrcRecordUpdater.FIELDS); + nextRecord = new OrcStruct(total_number_of_fields); IntWritable operation = new IntWritable(OrcRecordUpdater.INSERT_OPERATION); nextRecord.setFieldValue(OrcRecordUpdater.OPERATION, operation); @@ -274,6 +287,17 @@ void next(OrcStruct next) throws IOException { new LongWritable(nextRowId)); nextRecord.setFieldValue(OrcRecordUpdater.ROW, recordReader.next(null)); + if (acidOperationalProperties.getAcidEventSchemaType() + == AcidUtils.AcidEventSchemaType.SPLIT_UPDATE_SCHEMA) { + nextRecord.setFieldValue(OrcRecordUpdater.DELETED_CURRENT_TRANSACTION, + new LongWritable(OrcRecordUpdater.NON_EXISTENT_VALUE)); + nextRecord.setFieldValue(OrcRecordUpdater.DELETED_ORIGINAL_TRANSACTION, + new LongWritable(OrcRecordUpdater.NON_EXISTENT_VALUE)); + nextRecord.setFieldValue(OrcRecordUpdater.DELETED_BUCKET, + new IntWritable(bucket)); + nextRecord.setFieldValue(OrcRecordUpdater.DELETED_ROW_ID, + new LongWritable(OrcRecordUpdater.NON_EXISTENT_VALUE)); + } } else { nextRecord = next; ((IntWritable) next.getFieldValue(OrcRecordUpdater.OPERATION)) @@ -288,6 +312,17 @@ void next(OrcStruct next) throws IOException { .set(0); nextRecord.setFieldValue(OrcRecordUpdater.ROW, recordReader.next(OrcRecordUpdater.getRow(next))); + if (acidOperationalProperties.getAcidEventSchemaType() + == AcidUtils.AcidEventSchemaType.SPLIT_UPDATE_SCHEMA) { + ((LongWritable) next.getFieldValue(OrcRecordUpdater.DELETED_ORIGINAL_TRANSACTION)) + .set(OrcRecordUpdater.NON_EXISTENT_VALUE); + ((IntWritable) next.getFieldValue(OrcRecordUpdater.DELETED_BUCKET)) + .set(bucket); + ((LongWritable) next.getFieldValue(OrcRecordUpdater.DELETED_CURRENT_TRANSACTION)) + .set(OrcRecordUpdater.NON_EXISTENT_VALUE); + ((LongWritable) next.getFieldValue(OrcRecordUpdater.DELETED_ROW_ID)) + .set(OrcRecordUpdater.NON_EXISTENT_VALUE); + } } key.setValues(0L, bucket, nextRowId, 0L, 0); if (maxKey != null && key.compareRow(maxKey) > 0) { @@ -309,15 +344,81 @@ int getColumns() { } } + /** + * A reader that wraps the underlying reader's row with an + * ACID delete event object and makes the relevant translations. + */ + static final class DeletedEventReaderPair extends ReaderPair { + private final int statementId; + + DeletedEventReaderPair(AcidUtils.AcidOperationalProperties acidOperationalProperties, + ReaderKey key, Reader reader, int bucket, + RecordIdentifier minKey, RecordIdentifier maxKey, + Reader.Options options, int statementId) throws IOException { + super(acidOperationalProperties, key, reader, bucket, minKey, maxKey, options, statementId); + this.statementId = statementId; + } + + @Override + void next(OrcStruct next) throws IOException { + if (recordReader.hasNext()) { + nextRecord = (OrcStruct) recordReader.next(next); + // set the key + key.setValues(OrcRecordUpdater.getDeletedOriginalTransaction(nextRecord), + OrcRecordUpdater.getDeletedBucket(nextRecord), + OrcRecordUpdater.getDeletedRowId(nextRecord), + OrcRecordUpdater.getDeletedCurrentTransaction(nextRecord), + 0); + + + // if this record is larger than maxKey, we need to stop + if (maxKey != null && key.compareRow(maxKey) > 0) { + LOG.debug("key " + key + " > maxkey " + maxKey); + nextRecord = null; + recordReader.close(); + } + } else { + nextRecord = null; + recordReader.close(); + } + } + + @Override + int getColumns() { + return reader.getTypes().get(0).getSubtypesCount(); + } + } + private final TreeMap readers = new TreeMap(); + private final TreeMap deletedEventReaders = + new TreeMap(new Comparator() { + @Override + public int compare(ReaderKey o1, ReaderKey o2) { + int cmp = o1.compareRow(o2); + if (cmp == 0) { + // Sort by currentTransactionId,statementId ascending, if equal. + if (o1.currentTransactionId != o2.currentTransactionId) { + return o1.currentTransactionId < o2.currentTransactionId ? -1 : +1; + } + if (o1.statementId != o2.statementId) { + return o1.statementId < o2.statementId ? -1 : +1; + } + } + return cmp; + } + }); + // The reader that currently has the lowest key. private ReaderPair primary; // The key of the next lowest reader. private ReaderKey secondaryKey = null; + // The deleted event reader that currently has the lowest key. + private ReaderPair deletedEventPrimary; + /** * Find the key range for original bucket files. * @param reader the reader @@ -390,28 +491,30 @@ private void discoverKeyBounds(Reader reader, * @param options options for the row reader * @return a cloned options object that is modified for the event reader */ - static Reader.Options createEventOptions(Reader.Options options) { + static Reader.Options createEventOptions(Reader.Options options, + AcidUtils.AcidEventSchemaType acidEventSchemaType) { Reader.Options result = options.clone(); + final int total_number_of_fields = OrcRecordUpdater.getTotalNumberOfFields(acidEventSchemaType); result.range(options.getOffset(), Long.MAX_VALUE); - // slide the columns down by 6 for the include array + // slide the columns down by as many fields before the actual row for the include array if (options.getInclude() != null) { boolean[] orig = options.getInclude(); // we always need the base row orig[0] = true; - boolean[] include = new boolean[orig.length + OrcRecordUpdater.FIELDS]; - Arrays.fill(include, 0, OrcRecordUpdater.FIELDS, true); - for(int i= 0; i < orig.length; ++i) { - include[i + OrcRecordUpdater.FIELDS] = orig[i]; + boolean[] include = new boolean[orig.length + total_number_of_fields]; + Arrays.fill(include, 0, (OrcRecordUpdater.ROW + 1), true); + for (int i = 0; i < orig.length; ++i) { + include[i + (OrcRecordUpdater.ROW + 1)] = orig[i]; } result.include(include); } - // slide the column names down by 6 for the name array + // slide the column names down by as many fields before the actual row for the name array if (options.getColumnNames() != null) { String[] orig = options.getColumnNames(); - String[] cols = new String[orig.length + OrcRecordUpdater.FIELDS]; - for(int i=0; i < orig.length; ++i) { - cols[i + OrcRecordUpdater.FIELDS] = orig[i]; + String[] cols = new String[orig.length + total_number_of_fields]; + for (int i = 0; i < orig.length; ++i) { + cols[i + (OrcRecordUpdater.ROW + 1)] = orig[i]; } result.searchArgument(options.getSearchArgument(), cols); } @@ -437,6 +540,7 @@ private void discoverKeyBounds(Reader reader, Reader.Options options, Path[] deltaDirectory) throws IOException { this.conf = conf; + this.acidOperationalProperties = AcidUtils.getAcidOperationalProperties(conf); this.collapse = collapseEvents; this.offset = options.getOffset(); this.length = options.getLength(); @@ -445,11 +549,13 @@ private void discoverKeyBounds(Reader reader, TypeDescription typeDescr = OrcInputFormat.getDesiredRowTypeDescr(conf, true, Integer.MAX_VALUE); - objectInspector = OrcRecordUpdater.createEventSchema - (OrcStruct.createObjectInspector(0, OrcUtils.getOrcTypes(typeDescr))); + objectInspector = OrcRecordUpdater.createEventSchema( + OrcStruct.createObjectInspector(0, OrcUtils.getOrcTypes(typeDescr)), + acidOperationalProperties.getAcidEventSchemaType()); // modify the options to reflect the event instead of the base row - Reader.Options eventOptions = createEventOptions(options); + Reader.Options eventOptions = createEventOptions(options, + acidOperationalProperties.getAcidEventSchemaType()); if (reader == null) { baseReader = null; } else { @@ -467,10 +573,10 @@ private void discoverKeyBounds(Reader reader, if (isOriginal) { options = options.clone(); options.range(options.getOffset(), Long.MAX_VALUE); - pair = new OriginalReaderPair(key, reader, bucket, minKey, maxKey, - options); + pair = new OriginalReaderPair(acidOperationalProperties, key, reader, bucket, minKey, + maxKey, options); } else { - pair = new ReaderPair(key, reader, bucket, minKey, maxKey, + pair = new ReaderPair(acidOperationalProperties, key, reader, bucket, minKey, maxKey, eventOptions, 0); } @@ -490,25 +596,13 @@ private void discoverKeyBounds(Reader reader, AcidUtils.ParsedDelta deltaDir = AcidUtils.parsedDelta(delta); FileSystem fs = deltaFile.getFileSystem(conf); long length = OrcAcidUtils.getLastFlushLength(fs, deltaFile); - if (length != -1 && fs.exists(deltaFile)) { - Reader deltaReader = OrcFile.createReader(deltaFile, - OrcFile.readerOptions(conf).maxLength(length)); - Reader.Options deltaEventOptions = null; - if(eventOptions.getSearchArgument() != null) { - // Turn off the sarg before pushing it to delta. We never want to push a sarg to a delta as - // it can produce wrong results (if the latest valid version of the record is filtered out by - // the sarg) or ArrayOutOfBounds errors (when the sarg is applied to a delete record) - // unless the delta only has insert events - AcidStats acidStats = OrcAcidUtils.parseAcidStats(deltaReader); - if(acidStats.deletes > 0 || acidStats.updates > 0) { - deltaEventOptions = eventOptions.clone().searchArgument(null, null); - } - } - ReaderPair deltaPair; - deltaPair = new ReaderPair(key, deltaReader, bucket, minKey, - maxKey, deltaEventOptions != null ? deltaEventOptions : eventOptions, deltaDir.getStatementId()); - if (deltaPair.nextRecord != null) { - readers.put(key, deltaPair); + if (length != -1 && fs.exists(deltaFile)) { + if (acidOperationalProperties.isSplitUpdate()) { + initializeSplitUpdateDeltaReaders(key, bucket, deltaDir, deltaFile, length, + eventOptions, options); + } else { + initializeSimpleDeltaReaders(key, bucket, deltaDir, deltaFile, length, + eventOptions, options); } } } @@ -529,6 +623,130 @@ private void discoverKeyBounds(Reader reader, // get the number of columns in the user's rows columns = primary.getColumns(); } + + if (acidOperationalProperties.isSplitUpdate()) { + // Get the first deleted record. + entry = deletedEventReaders.pollFirstEntry(); + if (entry == null) { + deletedEventPrimary = null; + } else { + // The following do-while loop skips over all the deleted events that do not correspond to + // any actual delete event. Basically these have (-1,-1,-1) as their record identifiers. + // Since we are using a min-heap, it discards the deleted events that are present at the + // beginning of these readers, until we find a reader which does not have one. + do { + deletedEventPrimary = entry.getValue(); + if (!hasValidDeletedEvent(deletedEventPrimary.key)) { + deletedEventPrimary = null; + entry = deletedEventReaders.pollFirstEntry(); + } else { + break; + } + } while (entry != null); + } + } + } + + private void initializeSimpleDeltaReaders(ReaderKey key, int bucket, ParsedDelta deltaDir, + Path deltaFile, long length, Reader.Options eventOptions, + Reader.Options originalOptions) throws IOException { + Reader deltaReader = OrcFile.createReader(deltaFile, + OrcFile.readerOptions(conf).maxLength(length)); + Reader.Options deltaEventOptions = null; + if (eventOptions.getSearchArgument() != null) { + // Turn off the sarg before pushing it to delta. We never want to push a sarg to a delta as + // it can produce wrong results (if the latest valid version of the record is filtered out by + // the sarg) or ArrayOutOfBounds errors (when the sarg is applied to a delete record) + // unless the delta only has insert events + AcidStats acidStats = OrcAcidUtils.parseAcidStats(deltaReader); + if (acidStats.deletes > 0 || acidStats.updates > 0) { + deltaEventOptions = eventOptions.clone().searchArgument(null, null); + } + } + ReaderPair deltaPair; + deltaPair = new ReaderPair(acidOperationalProperties, key, deltaReader, bucket, minKey, maxKey, + deltaEventOptions != null ? deltaEventOptions : eventOptions, deltaDir.getStatementId()); + if (deltaPair.nextRecord != null) { + readers.put(key, deltaPair); + } + } + + private void initializeSplitUpdateDeltaReaders(ReaderKey key, int bucket, ParsedDelta deltaDir, + Path deltaFile, long length, Reader.Options eventOptions, + Reader.Options originalOptions) throws IOException { + Reader deltaReader = OrcFile.createReader(deltaFile, + OrcFile.readerOptions(conf).maxLength(length)); + Reader.Options deltaEventOptions = null; + boolean canSkipDeltaFile = false; + // We can skip reading this delta file if it only has delete events & no insert events. + // These delete events in this delete-only delta file will be picked up by + // the deleted event reader down below. + AcidStats acidStats = OrcAcidUtils.parseAcidStats(deltaReader); + if (acidStats.deletes > 0 && acidStats.inserts == acidStats.deletes) { + // What this condition means is that all inserted records where due to delete events. + // Note that each delete/update event also bumps up the insert counter. + canSkipDeltaFile = true; + } + + // Now there is no need to turn-off search arguments for the delta readers + // in the new version, where each update event is written as a pair of delete and + // insert event. Therefore, the delta readers will only read insert events and + // we can enable PPD without any side-effects or correctness issue. Hence, the following + // lines are commented, which were applicable to previous type of acid files. + + if (!canSkipDeltaFile) { + ReaderPair deltaPair; + deltaPair = new ReaderPair(acidOperationalProperties, key, deltaReader, bucket, minKey, + maxKey, deltaEventOptions != null ? deltaEventOptions : eventOptions, + deltaDir.getStatementId()); + if (deltaPair.nextRecord != null) { + readers.put(key, deltaPair); + } + } + + // Create a set of readers for delete events. + Reader deletedDeltaReader = OrcFile.createReader(deltaFile, + OrcFile.readerOptions(conf).maxLength(length)); + // We can skip reading this deleted event delta file if it only has insert events + // and no update/delete events. + acidStats = OrcAcidUtils.parseAcidStats(deletedDeltaReader); + if (acidStats.inserts > 0 && acidStats.updates == 0 && acidStats.deletes == 0) { + return; // Skip everything below. + } + // The reader is only required to read the past event columns. + boolean[] includedColumns = new boolean[eventOptions.getInclude().length]; + Arrays.fill(includedColumns, true); + // An offset is added to account for the ORC underlying column index layout. + int offsetFromOriginalInclude = 0; + if (originalOptions != null && originalOptions.getInclude() != null) { + offsetFromOriginalInclude = originalOptions.getInclude().length; + } + includedColumns[OrcRecordUpdater.DELETED_ORIGINAL_TRANSACTION + + offsetFromOriginalInclude] = true; + includedColumns[OrcRecordUpdater.DELETED_BUCKET + + offsetFromOriginalInclude] = true; + includedColumns[OrcRecordUpdater.DELETED_ROW_ID + offsetFromOriginalInclude] = true; + includedColumns[OrcRecordUpdater.DELETED_CURRENT_TRANSACTION + + offsetFromOriginalInclude] = true; + Reader.Options deletedDeltaEventOptions = eventOptions.clone().include(includedColumns); + // Turn-off the sargs for the past event columns. Deleted column values + // have no relation with the current record and hence sargs are non-applicable. + deletedDeltaEventOptions.searchArgument(null, null); + ReaderKey deletedKey = new ReaderKey(); + DeletedEventReaderPair deletedDeltaPair = new DeletedEventReaderPair(acidOperationalProperties, + deletedKey, deletedDeltaReader, bucket, minKey, maxKey, deletedDeltaEventOptions, + deltaDir.getStatementId()); + if (deletedDeltaPair.nextRecord != null) { + deletedEventReaders.put(deletedKey, deletedDeltaPair); + } + } + + private boolean hasValidDeletedEvent(ReaderKey key) { + // This checks whether the delete event has placeholder values and can be skipped or not. + long invalidValue = OrcRecordUpdater.NON_EXISTENT_VALUE; + return (key.getTransactionId() != invalidValue + && key.getBucketId() != invalidValue + && key.getRowId() != invalidValue); } @VisibleForTesting @@ -598,6 +816,21 @@ public boolean next(RecordIdentifier recordIdentifier, continue; } + // If this record has a delete operation type, it will have no valid user data. + // We can just skip over these records with no side-effects. + if (OrcRecordUpdater.getOperation(current) == OrcRecordUpdater.DELETE_OPERATION) { + continue; + } + + if (acidOperationalProperties.isSplitUpdate()) { + boolean isRecordDeleted = modifyCurrentRecordIfDeleted(recordIdentifier, current); + if (collapse && isRecordDeleted) { + // This record produced by the set of insert readers has been deleted. + // We should discard this record and continue to find another one. + continue; + } + } + /*for multi-statement txns, you may have multiple events for the same * row in the same (current) transaction. We want to collapse these to just the last one * regardless whether we are minor compacting. Consider INSERT/UPDATE/UPDATE of the @@ -624,6 +857,112 @@ public boolean next(RecordIdentifier recordIdentifier, return !keysSame; } + private boolean modifyCurrentRecordIfDeleted(RecordIdentifier insertRecordIdentifier, + OrcStruct insertRecord) throws IOException { + boolean isRecordDeleted = false; + ReaderKey deletedEventKey = null; + if (deletedEventPrimary != null) { + int cmp = ((ReaderKey) insertRecordIdentifier).compareRow(deletedEventPrimary.key); + if (cmp == -1) { + // We can use placeholder values for the delete columns, as this record is not deleted. + deletedEventKey = new ReaderKey(); + } else if (cmp == 0) { + if (validTxnList.isTxnValid(deletedEventPrimary.key.getCurrentTransactionId())) { + deletedEventKey = deletedEventPrimary.key; + isRecordDeleted = true; + } else { + // This code path should never be triggered because whenever we update the + // deletedEventPrimary, we make sure that we only consider events from valid transactions. + throw new AssertionError("Found an invalid transaction when collapsing delete events."); + } + // The following small do..while loop takes care of the condition when we have multiple + // delete entries for the same record. This can happen in the case of concurrent deletes. + // Say txn#1 and txn#2 were started concurrently and they both deleted the same record + // before committing. Now, txn#3 would see two delete entries for the same record, when + // reading the data back. The fix is to only consider the earliest delete event and discard + // the rest. Since delete events are sorted in ascending order by current transaction id, + // (check the comparator for the deletedEventReaders) we have already considered + // the earliest delete event above. The following loop discards the rest of + // the delete events with the same key. + boolean areKeysStillSame = false; + do { + updateDeletedEventPrimary(); // Advance the deletedEventPrimary. + if (deletedEventPrimary == null) { + break; + } + areKeysStillSame = + ((ReaderKey) insertRecordIdentifier).compareRow(deletedEventPrimary.key) == 0; + } while (areKeysStillSame); + } else { + // This code path should not be usually triggered because all the insert keys are being + // processed in a sorted order and every delete key must have a corresponding insert key. + // The only case this can be triggered is when we have concurrent deletes. For example, + // say txn#1 and txn#3 are running concurrently and they both want to delete a record. + // If a major compaction happens by txn#2 that commits after txn#1 but before txn#3, + // then txn#3 delete will not correspond to a valid record (since txn#2 would delete it). + // Then we can have a case where insert_primary_key > delete_primary_key. + // The fix for such cases is to just discard the current delete_primary_key. + + // Advance the deletedEventPrimary. + updateDeletedEventPrimary(); + + // Recursion, call again. + return modifyCurrentRecordIfDeleted(insertRecordIdentifier, insertRecord); + } + } else { + // We can use placeholder values for the delete columns, as this record is not deleted. + deletedEventKey = new ReaderKey(); + } + + insertRecord.setFieldValue(OrcRecordUpdater.DELETED_ORIGINAL_TRANSACTION, + new LongWritable(deletedEventKey.getTransactionId())); + insertRecord.setFieldValue(OrcRecordUpdater.DELETED_BUCKET, + new IntWritable(deletedEventKey.getBucketId())); + insertRecord.setFieldValue(OrcRecordUpdater.DELETED_ROW_ID, + new LongWritable(deletedEventKey.getRowId())); + insertRecord.setFieldValue(OrcRecordUpdater.DELETED_CURRENT_TRANSACTION, + new LongWritable(deletedEventKey.getCurrentTransactionId())); + + return isRecordDeleted; + } + + private void updateDeletedEventPrimary() throws IOException { + if (deletedEventPrimary != null) { + // Advance the current deleteEventPrimary. + deletedEventPrimary.next(deletedEventPrimary.nextRecord); + // Add this deletedEventPrimary back to the map, if there are more records to read from it. + if (deletedEventPrimary.nextRecord != null) { + deletedEventReaders.put(deletedEventPrimary.key, deletedEventPrimary); + } + // Update deletedEventPrimary to point to the new lowest record among all deletedEventReaders. + Map.Entry entry = null; + do { + entry = deletedEventReaders.pollFirstEntry(); + if (entry != null) { + deletedEventPrimary = entry.getValue(); + + // Assertion: While collapsing the events or even otherwise, we can safely discard delete + // events created by transactions that are not valid. We can read from delta files of + // invalid transactions in two cases: (1) when such delta files were created by aborted + // transactions in which case discarding has no side-effect or, (2) when such delta files + // are created by streaming ingest APIs that do not have any update/delete events at all. + if (!validTxnList.isTxnValid(entry.getValue().key.getCurrentTransactionId())) { + continue; + } + + // Check if the deleted event column has valid entries, otherwise it can be discarded + // without any side-effects & we should continue to loop until we find one. + if (hasValidDeletedEvent(deletedEventPrimary.key)) { + break; // Found the next lowest record among all the deletedEventReaders. + } + } else { + // We have exhausted all the delete event readers, no further delete events will be found. + deletedEventPrimary = null; + } + } while (entry != null); + } + } + @Override public RecordIdentifier createKey() { return new ReaderKey(); @@ -631,7 +970,9 @@ public RecordIdentifier createKey() { @Override public OrcStruct createValue() { - return new OrcStruct(OrcRecordUpdater.FIELDS); + final int total_number_of_fields = OrcRecordUpdater.getTotalNumberOfFields( + acidOperationalProperties.getAcidEventSchemaType()); + return new OrcStruct(total_number_of_fields); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index e577961..2958e27 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.io.IntWritable; @@ -71,14 +72,24 @@ final static int ROW_ID = 3; final static int CURRENT_TRANSACTION = 4; final static int ROW = 5; - final static int FIELDS = 6; + final static int DELETED_ORIGINAL_TRANSACTION = 6; + final static int DELETED_BUCKET = 7; + final static int DELETED_ROW_ID = 8; + final static int DELETED_CURRENT_TRANSACTION = 9; + + final static int FIELDS_IN_SIMPLE_SCHEMA = 6; + final static int FIELDS_IN_SPLIT_UPDATE_SCHEMA = 10; + final static int DELTA_BUFFER_SIZE = 16 * 1024; final static long DELTA_STRIPE_SIZE = 16 * 1024 * 1024; + final static int NON_EXISTENT_VALUE = -1; + private static final Charset UTF8 = Charset.forName("UTF-8"); private final AcidOutputFormat.Options options; + private final AcidUtils.AcidOperationalProperties acidOperationalProperties; private final Path path; private final FileSystem fs; private Writer writer; @@ -89,6 +100,11 @@ private final LongWritable originalTransaction = new LongWritable(-1); private final IntWritable bucket = new IntWritable(); private final LongWritable rowId = new LongWritable(); + private final LongWritable deletedOriginalTransaction = new LongWritable(-1); + private final IntWritable deletedBucket = new IntWritable(); + private final LongWritable deletedRowId = new LongWritable(); + private final LongWritable deletedCurrentTransaction = new LongWritable(-1); + private long insertedRows = 0; private long rowIdOffset = 0; // This records how many rows have been inserted or deleted. It is separate from insertedRows @@ -98,12 +114,21 @@ private StructField recIdField = null; // field to look for the record identifier in private StructField rowIdField = null; // field inside recId to look for row id in private StructField originalTxnField = null; // field inside recId to look for original txn in + private StructField bucketField = null; // field inside recId to look for bucket in private StructObjectInspector rowInspector; // OI for the original row private StructObjectInspector recIdInspector; // OI for the record identifier struct private LongObjectInspector rowIdInspector; // OI for the long row id inside the recordIdentifier private LongObjectInspector origTxnInspector; // OI for the original txn inside the record + private IntObjectInspector bucketInspector; // OI for the bucket inside the record // identifer + static int getTotalNumberOfFields(AcidUtils.AcidEventSchemaType acidEventSchemaType) { + if (acidEventSchemaType == AcidUtils.AcidEventSchemaType.SPLIT_UPDATE_SCHEMA) { + return FIELDS_IN_SPLIT_UPDATE_SCHEMA; + } + return FIELDS_IN_SIMPLE_SCHEMA; + } + static int getOperation(OrcStruct struct) { return ((IntWritable) struct.getFieldValue(OPERATION)).get(); } @@ -132,6 +157,22 @@ static OrcStruct getRow(OrcStruct struct) { } } + static long getDeletedOriginalTransaction(OrcStruct struct) { + return ((LongWritable) struct.getFieldValue(DELETED_ORIGINAL_TRANSACTION)).get(); + } + + static int getDeletedBucket(OrcStruct struct) { + return ((IntWritable) struct.getFieldValue(DELETED_BUCKET)).get(); + } + + static long getDeletedRowId(OrcStruct struct) { + return ((LongWritable) struct.getFieldValue(DELETED_ROW_ID)).get(); + } + + static long getDeletedCurrentTransaction(OrcStruct struct) { + return ((LongWritable) struct.getFieldValue(DELETED_CURRENT_TRANSACTION)).get(); + } + /** * An extension to AcidOutputFormat that allows users to add additional * options. @@ -159,7 +200,8 @@ public OrcOptions orcOptions(OrcFile.WriterOptions opts) { * @param rowInspector the row's object inspector * @return an object inspector for the event stream */ - static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { + static StructObjectInspector createEventSchema(ObjectInspector rowInspector, + AcidUtils.AcidEventSchemaType acidEventSchemaType) { List fields = new ArrayList(); fields.add(new OrcStruct.Field("operation", PrimitiveObjectInspectorFactory.writableIntObjectInspector, OPERATION)); @@ -173,13 +215,38 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { fields.add(new OrcStruct.Field("currentTransaction", PrimitiveObjectInspectorFactory.writableLongObjectInspector, CURRENT_TRANSACTION)); + fields.add(new OrcStruct.Field("row", rowInspector, ROW)); + + if (acidEventSchemaType == AcidUtils.AcidEventSchemaType.SPLIT_UPDATE_SCHEMA) { + fields.add(new OrcStruct.Field("deletedOriginalTransaction", + PrimitiveObjectInspectorFactory.writableLongObjectInspector, + DELETED_ORIGINAL_TRANSACTION)); + fields.add(new OrcStruct.Field("deletedBucket", + PrimitiveObjectInspectorFactory.writableIntObjectInspector, + DELETED_BUCKET)); + fields.add(new OrcStruct.Field("deletedRowId", + PrimitiveObjectInspectorFactory.writableLongObjectInspector, + DELETED_ROW_ID)); + fields.add(new OrcStruct.Field("deletedCurrentTransaction", + PrimitiveObjectInspectorFactory.writableLongObjectInspector, + DELETED_CURRENT_TRANSACTION)); + } return new OrcStruct.OrcStructInspector(fields); } OrcRecordUpdater(Path path, AcidOutputFormat.Options options) throws IOException { - this.options = options; + this.options = options; + // Initialize acidOperationalProperties based on table properties, and + // if they are not available, see if we can find it in the job configuration. + if (options.getTableProperties() != null) { + this.acidOperationalProperties = + AcidUtils.getAcidOperationalProperties(options.getTableProperties()); + } else { + this.acidOperationalProperties = + AcidUtils.getAcidOperationalProperties(options.getConfiguration()); + } this.bucket.set(options.getBucket()); this.path = AcidUtils.createFilename(path, options); FileSystem fs = options.getFilesystem(); @@ -239,14 +306,22 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { writerOptions.fileSystem(fs).callback(indexBuilder); rowInspector = (StructObjectInspector)options.getInspector(); writerOptions.inspector(createEventSchema(findRecId(options.getInspector(), - options.getRecordIdColumn()))); + options.getRecordIdColumn()), acidOperationalProperties.getAcidEventSchemaType())); this.writer = OrcFile.createWriter(this.path, writerOptions); - item = new OrcStruct(FIELDS); + item = new OrcStruct(getTotalNumberOfFields( + acidOperationalProperties.getAcidEventSchemaType())); item.setFieldValue(OPERATION, operation); item.setFieldValue(CURRENT_TRANSACTION, currentTransaction); item.setFieldValue(ORIGINAL_TRANSACTION, originalTransaction); item.setFieldValue(BUCKET, bucket); item.setFieldValue(ROW_ID, rowId); + if (acidOperationalProperties.getAcidEventSchemaType() + == AcidUtils.AcidEventSchemaType.SPLIT_UPDATE_SCHEMA) { + item.setFieldValue(DELETED_ORIGINAL_TRANSACTION, deletedOriginalTransaction); + item.setFieldValue(DELETED_BUCKET, deletedBucket); + item.setFieldValue(DELETED_ROW_ID, deletedRowId); + item.setFieldValue(DELETED_CURRENT_TRANSACTION, deletedCurrentTransaction); + } } public String toString() { @@ -265,12 +340,13 @@ private long findRowIdOffsetForInsert() throws IOException { * Then, * 1. find the same bucket file in previous delta dir for this txn * 2. read the footer and get AcidStats which has insert count - * 2.1 if AcidStats.inserts>0 done + * 2.1 if AcidStats.inserts>0 add to the insert count. * else go to previous delta file * For example, consider insert/update/insert case...*/ if(options.getStatementId() <= 0) { return 0;//there is only 1 statement in this transaction (so far) } + long totalInserts = 0; for(int pastStmt = options.getStatementId() - 1; pastStmt >= 0; pastStmt--) { Path matchingBucket = AcidUtils.createFilename(options.getFinalDestination(), options.clone().statementId(pastStmt)); if(!fs.exists(matchingBucket)) { @@ -280,12 +356,10 @@ private long findRowIdOffsetForInsert() throws IOException { //no close() on Reader?! AcidStats acidStats = OrcAcidUtils.parseAcidStats(reader); if(acidStats.inserts > 0) { - return acidStats.inserts; + totalInserts += acidStats.inserts; } } - //if we got here, we looked at all delta files in this txn, prior to current statement and didn't - //find any inserts... - return 0; + return totalInserts; } // Find the record identifier column (if there) and return a possibly new ObjectInspector that // will strain out the record id for the underlying writer. @@ -306,6 +380,8 @@ private ObjectInspector findRecId(ObjectInspector inspector, int rowIdColNum) { // in RecordIdentifier is transactionId, bucketId, rowId originalTxnField = fields.get(0); origTxnInspector = (LongObjectInspector)originalTxnField.getFieldObjectInspector(); + bucketField = fields.get(1); + bucketInspector = (IntObjectInspector) bucketField.getFieldObjectInspector(); rowIdField = fields.get(2); rowIdInspector = (LongObjectInspector)rowIdField.getFieldObjectInspector(); @@ -315,7 +391,7 @@ private ObjectInspector findRecId(ObjectInspector inspector, int rowIdColNum) { } } - private void addEvent(int operation, long currentTransaction, long rowId, Object row) + private void addSimpleEvent(int operation, long currentTransaction, long rowId, Object row) throws IOException { this.operation.set(operation); this.currentTransaction.set(currentTransaction); @@ -333,8 +409,50 @@ else if(operation == INSERT_OPERATION) { } this.rowId.set(rowId); this.originalTransaction.set(originalTransaction); - item.setFieldValue(OrcRecordUpdater.ROW, (operation == DELETE_OPERATION ? null : row)); - indexBuilder.addKey(operation, originalTransaction, bucket.get(), rowId); + item.setFieldValue(ROW, (operation == DELETE_OPERATION ? null : row)); + indexBuilder.addSimpleEventKey(operation, originalTransaction, bucket.get(), rowId); + writer.addRow(item); + } + + private void addSplitUpdateEvent(int operation, long currentTransaction, long rowId, Object row) + throws IOException { + this.operation.set(operation); + this.currentTransaction.set(currentTransaction); + // If this is an insert, originalTransaction should be set to this transaction. If not, + // it will be reset by the following if anyway. + long originalTransaction = currentTransaction; + if (operation == DELETE_OPERATION || operation == UPDATE_OPERATION) { + Object rowIdValue = rowInspector.getStructFieldData(row, recIdField); + originalTransaction = origTxnInspector.get( + recIdInspector.getStructFieldData(rowIdValue, originalTxnField)); + long prevRowId = rowIdInspector.get( + recIdInspector.getStructFieldData(rowIdValue, rowIdField)); + int bucket = bucketInspector.get(recIdInspector.getStructFieldData(rowIdValue, bucketField)); + + // Set the deleted column values for an update/delete operation. + this.deletedOriginalTransaction.set(originalTransaction); + this.deletedBucket.set(bucket); + this.deletedRowId.set(prevRowId); + this.deletedCurrentTransaction.set(currentTransaction); + + // Increment the rowId for the new record that will be inserted for an update/delete event. + rowId += rowIdOffset; + // Set originalTransaction equal to current transaction because + // this current transaction is creating a new record. + originalTransaction = currentTransaction; + } else if (operation == INSERT_OPERATION) { + rowId += rowIdOffset; + // Deleted values should be non-existent for an insert operation. + this.deletedOriginalTransaction.set(NON_EXISTENT_VALUE); + this.deletedBucket.set(NON_EXISTENT_VALUE); + this.deletedRowId.set(NON_EXISTENT_VALUE); + this.deletedCurrentTransaction.set(NON_EXISTENT_VALUE); + } + + this.rowId.set(rowId); + this.originalTransaction.set(originalTransaction); + item.setFieldValue(ROW, (operation == DELETE_OPERATION ? null : row)); + indexBuilder.addSplitUpdateEventKey(operation, originalTransaction, bucket.get(), rowId); writer.addRow(item); } @@ -346,7 +464,11 @@ public void insert(long currentTransaction, Object row) throws IOException { //always true in that case rowIdOffset = findRowIdOffsetForInsert(); } - addEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row); + if (acidOperationalProperties.isSplitUpdate()) { + addSplitUpdateEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row); + } else { + addSimpleEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row); + } rowCountDelta++; } @@ -354,18 +476,29 @@ public void insert(long currentTransaction, Object row) throws IOException { public void update(long currentTransaction, Object row) throws IOException { if (this.currentTransaction.get() != currentTransaction) { insertedRows = 0; + rowIdOffset = findRowIdOffsetForInsert(); + } + if (acidOperationalProperties.isSplitUpdate()) { + addSplitUpdateEvent(UPDATE_OPERATION, currentTransaction, insertedRows++, row); + rowCountDelta++; // An update event creates a new row-id, in case of split-update. + } else { + addSimpleEvent(UPDATE_OPERATION, currentTransaction, -1L, row); } - addEvent(UPDATE_OPERATION, currentTransaction, -1L, row); } @Override public void delete(long currentTransaction, Object row) throws IOException { if (this.currentTransaction.get() != currentTransaction) { insertedRows = 0; + rowIdOffset = findRowIdOffsetForInsert(); + } + if (acidOperationalProperties.isSplitUpdate()) { + addSplitUpdateEvent(DELETE_OPERATION, currentTransaction, insertedRows++, row); + rowCountDelta++; // A delete event creates a new row-id, in case of split-update. + } else { + addSimpleEvent(DELETE_OPERATION, currentTransaction, -1, row); + rowCountDelta--; } - addEvent(DELETE_OPERATION, currentTransaction, -1, row); - rowCountDelta--; - } @Override @@ -465,16 +598,46 @@ public void preFooterWrite(OrcFile.WriterContext context UTF8.encode(acidStats.serialize())); } - void addKey(int op, long transaction, int bucket, long rowId) { + void addKey(int op, long transaction, int bucket, long rowId, + AcidUtils.AcidEventSchemaType acidEventSchemaType) { + if (acidEventSchemaType == AcidUtils.AcidEventSchemaType.SPLIT_UPDATE_SCHEMA) { + addSplitUpdateEventKey(op, transaction, bucket, rowId); + } else { + addSimpleEventKey(op, transaction, bucket, rowId); + } + } + + void addSimpleEventKey(int op, long transaction, int bucket, long rowId) { + switch (op) { + case INSERT_OPERATION: + acidStats.inserts += 1; + break; + case UPDATE_OPERATION: + acidStats.updates += 1; + break; + case DELETE_OPERATION: + acidStats.deletes += 1; + break; + default: + throw new IllegalArgumentException("Unknown operation " + op); + } + lastTransaction = transaction; + lastBucket = bucket; + lastRowId = rowId; + } + + void addSplitUpdateEventKey(int op, long transaction, int bucket, long rowId) { switch (op) { case INSERT_OPERATION: acidStats.inserts += 1; break; case UPDATE_OPERATION: acidStats.updates += 1; + acidStats.inserts += 1; // An update operation also generates an insert. break; case DELETE_OPERATION: acidStats.deletes += 1; + acidStats.inserts += 1; // A delete operation also generates an insert. break; default: throw new IllegalArgumentException("Unknown operation " + op); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java index 8cf261d..1131147 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java @@ -100,6 +100,8 @@ private boolean isMetadataOnly = false; private boolean isAcidTable; + + private AcidUtils.AcidOperationalProperties acidOperationalProperties; private transient TableSample tableSample; @@ -127,6 +129,11 @@ public TableScanDesc(final String alias, List vcs, Table tblMetad this.virtualCols = vcs; this.tableMetadata = tblMetadata; isAcidTable = AcidUtils.isAcidTable(this.tableMetadata); + if (isAcidTable) { + acidOperationalProperties = AcidUtils.getAcidOperationalProperties(this.tableMetadata); + } else { + acidOperationalProperties = null; + } } @Override @@ -158,6 +165,10 @@ public String getTbl() { public boolean isAcidTable() { return isAcidTable; } + + public AcidUtils.AcidOperationalProperties getAcidOperationalProperties() { + return acidOperationalProperties; + } @Explain(displayName = "Output", explainLevels = { Level.USER }) public List getOutputColumnNames() { diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 6caca98..de8b86e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -128,7 +128,7 @@ private JobConf createBaseJobConf(HiveConf conf, String jobName, Table t, Storag job.set(TABLE_PROPS, new StringableMap(t.getParameters()).toString()); job.setInt(NUM_BUCKETS, sd.getNumBuckets()); job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString()); - overrideMRProps(job, t.getParameters()); // override MR properties from tblproperties if applicable + overrideMRProps(job, t.getParameters()); if (ci.properties != null) { // override MR properties and general tblproperties if applicable overrideTblProps(job, t.getParameters(), ci.properties); } @@ -137,6 +137,11 @@ private JobConf createBaseJobConf(HiveConf conf, String jobName, Table t, Storag //to generate the target dir in the Map task, there is no easy way to pass it to OutputCommitter //to do the final move job.setBoolean("mapreduce.map.speculative", false); + + // Set appropriate Acid readers/writers based on the table properties. + AcidUtils.setAcidOperationalProperties(job, + AcidUtils.getAcidOperationalProperties(t.getParameters())); + return job; } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java index ddef4a2..d0db894 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java @@ -152,16 +152,18 @@ private static String value(OrcStruct event) { private Reader createMockReader() throws IOException { Reader reader = Mockito.mock(Reader.class, settings); + int total_number_of_fields = OrcRecordUpdater.getTotalNumberOfFields( + AcidUtils.AcidOperationalProperties.getLegacy().getAcidEventSchemaType()); RecordReader recordReader = Mockito.mock(RecordReader.class, settings); - OrcStruct row1 = new OrcStruct(OrcRecordUpdater.FIELDS); + OrcStruct row1 = new OrcStruct(total_number_of_fields); setRow(row1, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 20, 100, "first"); - OrcStruct row2 = new OrcStruct(OrcRecordUpdater.FIELDS); + OrcStruct row2 = new OrcStruct(total_number_of_fields); setRow(row2, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 30, 110, "second"); - OrcStruct row3 = new OrcStruct(OrcRecordUpdater.FIELDS); + OrcStruct row3 = new OrcStruct(total_number_of_fields); setRow(row3, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 40, 120, "third"); - OrcStruct row4 = new OrcStruct(OrcRecordUpdater.FIELDS); + OrcStruct row4 = new OrcStruct(total_number_of_fields); setRow(row4, OrcRecordUpdater.INSERT_OPERATION, 40, 50, 60, 130, "fourth"); - OrcStruct row5 = new OrcStruct(OrcRecordUpdater.FIELDS); + OrcStruct row5 = new OrcStruct(total_number_of_fields); setRow(row5, OrcRecordUpdater.INSERT_OPERATION, 40, 50, 61, 140, "fifth"); Mockito.when(reader.rowsOptions(Mockito.any(Reader.Options.class))) .thenReturn(recordReader); @@ -186,8 +188,8 @@ public void testReaderPair() throws Exception { Reader reader = createMockReader(); RecordIdentifier minKey = new RecordIdentifier(10, 20, 30); RecordIdentifier maxKey = new RecordIdentifier(40, 50, 60); - ReaderPair pair = new ReaderPair(key, reader, 20, minKey, maxKey, - new Reader.Options(), 0); + ReaderPair pair = new ReaderPair(AcidUtils.AcidOperationalProperties.getDefault(), + key, reader, 20, minKey, maxKey, new Reader.Options(), 0); RecordReader recordReader = pair.recordReader; assertEquals(10, key.getTransactionId()); assertEquals(20, key.getBucketId()); @@ -212,8 +214,8 @@ public void testReaderPairNoMin() throws Exception { ReaderKey key = new ReaderKey(); Reader reader = createMockReader(); - ReaderPair pair = new ReaderPair(key, reader, 20, null, null, - new Reader.Options(), 0); + ReaderPair pair = new ReaderPair(AcidUtils.AcidOperationalProperties.getDefault(), + key, reader, 20, null, null, new Reader.Options(), 0); RecordReader recordReader = pair.recordReader; assertEquals(10, key.getTransactionId()); assertEquals(20, key.getBucketId()); @@ -289,8 +291,8 @@ public void testOriginalReaderPair() throws Exception { RecordIdentifier minKey = new RecordIdentifier(0, 10, 1); RecordIdentifier maxKey = new RecordIdentifier(0, 10, 3); boolean[] includes = new boolean[]{true, true}; - ReaderPair pair = new OriginalReaderPair(key, reader, 10, minKey, maxKey, - new Reader.Options().include(includes)); + ReaderPair pair = new OriginalReaderPair(AcidUtils.AcidOperationalProperties.getDefault(), + key, reader, 10, minKey, maxKey, new Reader.Options().include(includes)); RecordReader recordReader = pair.recordReader; assertEquals(0, key.getTransactionId()); assertEquals(10, key.getBucketId()); @@ -318,8 +320,8 @@ private static ValidTxnList createMaximalTxnList() { public void testOriginalReaderPairNoMin() throws Exception { ReaderKey key = new ReaderKey(); Reader reader = createMockOriginalReader(); - ReaderPair pair = new OriginalReaderPair(key, reader, 10, null, null, - new Reader.Options()); + ReaderPair pair = new OriginalReaderPair(AcidUtils.AcidOperationalProperties.getDefault(), + key, reader, 10, null, null, new Reader.Options()); assertEquals("first", value(pair.nextRecord)); assertEquals(0, key.getTransactionId()); assertEquals(10, key.getBucketId()); @@ -392,16 +394,18 @@ public void testNewBase() throws Exception { Mockito.when(reader.getTypes()).thenReturn(types); Mockito.when(reader.rowsOptions(Mockito.any(Reader.Options.class))) .thenReturn(recordReader); - - OrcStruct row1 = new OrcStruct(OrcRecordUpdater.FIELDS); + + int total_number_of_fields = OrcRecordUpdater.getTotalNumberOfFields( + AcidUtils.AcidOperationalProperties.getLegacy().getAcidEventSchemaType()); + OrcStruct row1 = new OrcStruct(total_number_of_fields); setRow(row1, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 20, 100, "first"); - OrcStruct row2 = new OrcStruct(OrcRecordUpdater.FIELDS); + OrcStruct row2 = new OrcStruct(total_number_of_fields); setRow(row2, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 30, 110, "second"); - OrcStruct row3 = new OrcStruct(OrcRecordUpdater.FIELDS); + OrcStruct row3 = new OrcStruct(total_number_of_fields); setRow(row3, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 40, 120, "third"); - OrcStruct row4 = new OrcStruct(OrcRecordUpdater.FIELDS); + OrcStruct row4 = new OrcStruct(total_number_of_fields); setRow(row4, OrcRecordUpdater.INSERT_OPERATION, 40, 50, 60, 130, "fourth"); - OrcStruct row5 = new OrcStruct(OrcRecordUpdater.FIELDS); + OrcStruct row5 = new OrcStruct(total_number_of_fields); setRow(row5, OrcRecordUpdater.INSERT_OPERATION, 40, 50, 61, 140, "fifth"); Mockito.when(recordReader.hasNext()). @@ -453,7 +457,8 @@ public void testNewBase() throws Exception { (StructObjectInspector) merger.getObjectInspector(); List fields = eventObjectInspector.getAllStructFieldRefs(); - assertEquals(OrcRecordUpdater.FIELDS, fields.size()); + + assertEquals(total_number_of_fields, fields.size()); assertEquals("operation", fields.get(OrcRecordUpdater.OPERATION).getFieldName()); assertEquals("currentTransaction", diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java index 67c473e..03732d9 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java @@ -49,7 +49,8 @@ @Test public void testAccessors() throws Exception { - OrcStruct event = new OrcStruct(OrcRecordUpdater.FIELDS); + OrcStruct event = new OrcStruct(OrcRecordUpdater.getTotalNumberOfFields( + AcidUtils.AcidOperationalProperties.getLegacy().getAcidEventSchemaType())); event.setFieldValue(OrcRecordUpdater.OPERATION, new IntWritable(OrcRecordUpdater.INSERT_OPERATION)); event.setFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION,