diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 680b623..96871a8 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -260,6 +260,7 @@ private static URL checkConfigFile(File f) { HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, HiveConf.ConfVars.HIVE_TXN_MANAGER, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, + HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES, HiveConf.ConfVars.HIVE_TXN_HEARTBEAT_THREADPOOL_SIZE, HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH, HiveConf.ConfVars.HIVE_TXN_RETRYABLE_SQLEX_REGEX, @@ -1748,7 +1749,11 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Set this to true so that when attempt to acquire a lock on resource times out, the current state" + " of the lock manager is dumped to log file. This is for debugging. See also " + "hive.lock.numretries and hive.lock.sleep.between.retries."), - + + HIVE_TXN_OPERATIONAL_PROPERTIES("hive.txn.operational.properties", 0, + "Operational properties that can be used to choose the appropriate set of readers/writers" + + "for various types of allowed Acid files."), + HIVE_MAX_OPEN_TXNS("hive.max.open.txns", 100000, "Maximum number of open transactions. If \n" + "current open transactions reach this limit, future open transaction requests will be \n" + "rejected, until this number goes below the limit."), diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java index 14f7316..fddab85 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java @@ -133,6 +133,9 @@ public void configureInputJobProperties(TableDesc tableDesc, boolean isAcidTable = AcidUtils.isTablePropertyTransactional(tableProperties); AcidUtils.setTransactionalTableScan(jobProperties, isAcidTable); + AcidUtils.AcidOperationalProperties acidOperationalProperties = + AcidUtils.getAcidOperationalProperties(tableProperties); + AcidUtils.setAcidOperationalProperties(jobProperties, acidOperationalProperties); } } catch (IOException e) { throw new IllegalStateException("Failed to set output path", e); diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java index 0c6b9ea..8503702 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java @@ -46,6 +46,7 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.Properties; public abstract class AbstractRecordWriter implements RecordWriter { @@ -238,10 +239,16 @@ public void closeBatch() throws StreamingIOFailure { private RecordUpdater createRecordUpdater(int bucketId, Long minTxnId, Long maxTxnID) throws IOException, SerializationError { try { + // Initialize table properties from the table parameters. This is required because the table + // may define certain table parameters that may be required while writing. The table parameter + // 'transactional_properties' is one such example. + Properties tblProperties = new Properties(); + tblProperties.putAll(tbl.getParameters()); return outf.getRecordUpdater(partitionPath, new AcidOutputFormat.Options(conf) .inspector(getSerde().getObjectInspector()) .bucket(bucketId) + .tableProperties(tblProperties) .minimumTransactionId(minTxnId) .maximumTransactionId(maxTxnID) .statementId(-1) diff --git metastore/if/hive_metastore.thrift metastore/if/hive_metastore.thrift index 4d92b73..84cc4f6 100755 --- metastore/if/hive_metastore.thrift +++ metastore/if/hive_metastore.thrift @@ -1474,5 +1474,6 @@ const string FILE_OUTPUT_FORMAT = "file.outputformat", const string META_TABLE_STORAGE = "storage_handler", const string TABLE_IS_TRANSACTIONAL = "transactional", const string TABLE_NO_AUTO_COMPACT = "no_auto_compaction", +const string TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties", diff --git metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp index f982bf2..1cbd176 100644 --- metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp +++ metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp @@ -57,6 +57,8 @@ hive_metastoreConstants::hive_metastoreConstants() { TABLE_NO_AUTO_COMPACT = "no_auto_compaction"; + TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties"; + } }}} // namespace diff --git metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.h metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.h index ae14bd1..3d068c3 100644 --- metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.h +++ metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.h @@ -38,6 +38,7 @@ class hive_metastoreConstants { std::string META_TABLE_STORAGE; std::string TABLE_IS_TRANSACTIONAL; std::string TABLE_NO_AUTO_COMPACT; + std::string TABLE_TRANSACTIONAL_PROPERTIES; }; extern const hive_metastoreConstants g_hive_metastore_constants; diff --git metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java index 5a666f2..8de8896 100644 --- metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java +++ metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java @@ -82,4 +82,6 @@ public static final String TABLE_NO_AUTO_COMPACT = "no_auto_compaction"; + public static final String TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties"; + } diff --git metastore/src/gen/thrift/gen-php/metastore/Types.php metastore/src/gen/thrift/gen-php/metastore/Types.php index f505208..10b1d97 100644 --- metastore/src/gen/thrift/gen-php/metastore/Types.php +++ metastore/src/gen/thrift/gen-php/metastore/Types.php @@ -18842,6 +18842,7 @@ final class Constant extends \Thrift\Type\TConstant { static protected $META_TABLE_STORAGE; static protected $TABLE_IS_TRANSACTIONAL; static protected $TABLE_NO_AUTO_COMPACT; + static protected $TABLE_TRANSACTIONAL_PROPERTIES; static protected function init_DDL_TIME() { return "transient_lastDdlTime"; @@ -18934,6 +18935,10 @@ final class Constant extends \Thrift\Type\TConstant { static protected function init_TABLE_NO_AUTO_COMPACT() { return "no_auto_compaction"; } + + static protected function init_TABLE_TRANSACTIONAL_PROPERTIES() { + return "transactional_properties"; + } } diff --git metastore/src/gen/thrift/gen-py/hive_metastore/constants.py metastore/src/gen/thrift/gen-py/hive_metastore/constants.py index d1c07a5..5100236 100644 --- metastore/src/gen/thrift/gen-py/hive_metastore/constants.py +++ metastore/src/gen/thrift/gen-py/hive_metastore/constants.py @@ -32,3 +32,4 @@ META_TABLE_STORAGE = "storage_handler" TABLE_IS_TRANSACTIONAL = "transactional" TABLE_NO_AUTO_COMPACT = "no_auto_compaction" +TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties" diff --git metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb index eeccc84..6aa7143 100644 --- metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb +++ metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb @@ -53,3 +53,5 @@ TABLE_IS_TRANSACTIONAL = %q"transactional" TABLE_NO_AUTO_COMPACT = %q"no_auto_compaction" +TABLE_TRANSACTIONAL_PROPERTIES = %q"transactional_properties" + diff --git metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java index 3e74675..524d77e 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java @@ -17,20 +17,29 @@ */ package org.apache.hadoop.hive.metastore; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.events.PreAlterTableEvent; import org.apache.hadoop.hive.metastore.events.PreCreateTableEvent; import org.apache.hadoop.hive.metastore.events.PreEventContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - final class TransactionalValidationListener extends MetaStorePreEventListener { public static final Logger LOG = LoggerFactory.getLogger(TransactionalValidationListener.class); + + // Mirrored from org.apache.hadoop.hive.ql.io.AcidUtils. We could have imported the constant but + // that would create a cyclic dependency between hive.metastore and hive.ql, hence is duplicated. + public static final String DEFAULT_VALUE_STRING = "default"; TransactionalValidationListener(Configuration conf) { super(conf); @@ -80,6 +89,7 @@ private void handleAlterTableTransactionalProp(PreAlterTableEvent context) throw if (transactionalValuePresent) { //normalize prop name parameters.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, transactionalValue); + initializeTransactionalProperties(newTable); } if ("true".equalsIgnoreCase(transactionalValue)) { if (!conformToAcid(newTable)) { @@ -157,6 +167,7 @@ private void handleCreateTableTransactionalProp(PreCreateTableEvent context) thr // normalize prop name parameters.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.TRUE.toString()); + initializeTransactionalProperties(newTable); return; } @@ -187,4 +198,38 @@ private boolean conformToAcid(Table table) throws MetaException { return true; } + + private void initializeTransactionalProperties(Table table) throws MetaException { + // All new versions of Acid tables created after the introduction of Acid version/type system + // must have TRANSACTIONAL_PROPERTIES property defined, otherwise they will be assumed of + // legacy type. + + // Initialize transaction table properties with default string value. + String tableTransactionalProperties = DEFAULT_VALUE_STRING; + + Map parameters = table.getParameters(); + if (parameters != null) { + Set keys = new HashSet<>(parameters.keySet()); + for (String key : keys) { + if (hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES.equalsIgnoreCase(key)) { + tableTransactionalProperties = parameters.get(key); + parameters.remove(key); + String validationError = validateTransactionalProperties(tableTransactionalProperties); + if (validationError != null) { + throw new MetaException("Invalid transactional properties specified for the " + + "table with the error " + validationError); + } + break; + } + } + } + + parameters.put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, + tableTransactionalProperties); + } + + private String validateTransactionalProperties(String transactionalProperties) { + // TODO: Implement validation logic for transactional properties + return null; // All checks passed, return null. + } } \ No newline at end of file diff --git orc/src/java/org/apache/orc/impl/TreeReaderFactory.java orc/src/java/org/apache/orc/impl/TreeReaderFactory.java index c4a2093..5f3ea53 100644 --- orc/src/java/org/apache/orc/impl/TreeReaderFactory.java +++ orc/src/java/org/apache/orc/impl/TreeReaderFactory.java @@ -2037,7 +2037,7 @@ public static TreeReader createTreeReader(TypeDescription readerType, return new NullTreeReader(0); } TypeDescription.Category readerTypeCategory = readerType.getCategory(); - if (!fileType.equals(readerType) && + if (!fileType.getCategory().equals(readerTypeCategory) && (readerTypeCategory != TypeDescription.Category.STRUCT && readerTypeCategory != TypeDescription.Category.MAP && readerTypeCategory != TypeDescription.Category.LIST && diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java index dff1815..1e518ea 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java @@ -79,6 +79,7 @@ public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext HiveInputFormat.pushFilters(job, ts); AcidUtils.setTransactionalTableScan(job, ts.getConf().isAcidTable()); + AcidUtils.setAcidOperationalProperties(job, ts.getConf().getAcidOperationalProperties()); } sink = work.getSink(); fetch = new FetchOperator(work, job, source, getVirtualColumns(source)); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java index 57b6c67..584eff4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java @@ -212,6 +212,7 @@ public void initializeMapredLocalWork(MapJoinDesc mjConf, Configuration hconf, HiveInputFormat.pushFilters(jobClone, ts); AcidUtils.setTransactionalTableScan(jobClone, ts.getConf().isAcidTable()); + AcidUtils.setAcidOperationalProperties(jobClone, ts.getConf().getAcidOperationalProperties()); ts.passExecContext(getExecContext()); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java index a26d2b8..6bb2b8d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java @@ -472,6 +472,7 @@ private void initializeOperators(Map fetchOpJobConfMap) HiveInputFormat.pushFilters(jobClone, ts); AcidUtils.setTransactionalTableScan(job, ts.getConf().isAcidTable()); + AcidUtils.setAcidOperationalProperties(job, ts.getConf().getAcidOperationalProperties()); // create a fetch operator FetchOperator fetchOp = new FetchOperator(entry.getValue(), jobClone); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 36f38f6..cb41d3d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -18,9 +18,14 @@ package org.apache.hadoop.hive.ql.io; -import org.apache.hadoop.hive.metastore.api.DataOperationType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.regex.Pattern; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -29,22 +34,17 @@ import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.api.DataOperationType; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.shims.HadoopShims; -import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.regex.Pattern; - /** * Utilities that are shared by all of the ACID input and output formats. They * are used by the compactor and cleaner and thus must be format agnostic. @@ -59,7 +59,9 @@ public boolean accept(Path path) { return path.getName().startsWith(BASE_PREFIX); } }; - public static final String DELTA_PREFIX = "delta_"; + public static final String DELTA_PREFIX = "delta_"; + public static final String DELETE_DELTA_PREFIX = "delete_delta_"; + public static final String DELETE_DELTA_PREFIX_MODIFIER = "delete_"; public static final String DELTA_SIDE_FILE_SUFFIX = "_flush_length"; public static final PathFilter deltaFileFilter = new PathFilter() { @Override @@ -67,6 +69,12 @@ public boolean accept(Path path) { return path.getName().startsWith(DELTA_PREFIX); } }; + public static final PathFilter deleteEventDeltaDirFilter = new PathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().startsWith(DELETE_DELTA_PREFIX); + } + }; public static final String BUCKET_PREFIX = "bucket_"; public static final PathFilter bucketFileFilter = new PathFilter() { @Override @@ -141,6 +149,23 @@ public static String deltaSubdir(long min, long max, int statementId) { return deltaSubdir(min, max) + "_" + String.format(STATEMENT_DIGITS, statementId); } + /** + * This is format of delete delta dir name prior to Hive 1.3.x + */ + public static String deleteDeltaSubdir(long min, long max) { + return DELETE_DELTA_PREFIX + String.format(DELTA_DIGITS, min) + "_" + + String.format(DELTA_DIGITS, max); + } + + /** + * Each write statement in a transaction creates its own delete delta dir, + * when split-update acid operational property is turned on. + * @since 1.3.x + */ + public static String deleteDeltaSubdir(long min, long max, int statementId) { + return deleteDeltaSubdir(min, max) + "_" + String.format(STATEMENT_DIGITS, statementId); + } + public static String baseDir(long txnId) { return BASE_PREFIX + String.format(DELTA_DIGITS, txnId); } @@ -171,6 +196,33 @@ public static Path createFilename(Path directory, } return createBucketFile(new Path(directory, subdir), options.getBucket()); } + + /** + * Returns a corresponding delete event filepath from a given original delta filepath. + * Typically this means that this function will convert a hadoop file path with the + * pattern "*.delta.*" to "*.delete_delta.*" filepath. + * @param origDeltaFilePath the original delta file path + * @return the filename that points to a corresponding delete event filepath + */ + public static Path getDeleteEventFilePath(Path origDeltaFilePath) { + Path deleteEventFilePath = null; + Path parent = origDeltaFilePath.getParent(); + Path trailingChildPath = new Path(origDeltaFilePath.getName()); + boolean isValidConvert = false; + while (parent != null) { + if (trailingChildPath.toUri().toString().startsWith(DELTA_PREFIX)) { + isValidConvert = true; + break; + } + trailingChildPath = new Path(parent.getName(), trailingChildPath); + parent = parent.getParent(); + } + if (isValidConvert) { + deleteEventFilePath = new Path(parent, + DELETE_DELTA_PREFIX_MODIFIER + trailingChildPath.toUri().toString()); + } + return deleteEventFilePath; + } /** * Get the transaction id from a base directory name. @@ -194,11 +246,10 @@ static long parseBase(Path path) { * @return the options used to create that filename */ public static AcidOutputFormat.Options - parseBaseBucketFilename(Path bucketFile, - Configuration conf) { + parseBaseOrDeltaBucketFilename(Path bucketFile, + Configuration conf) { AcidOutputFormat.Options result = new AcidOutputFormat.Options(conf); String filename = bucketFile.getName(); - result.writingBase(true); if (ORIGINAL_PATTERN.matcher(filename).matches()) { int bucket = Integer.parseInt(filename.substring(0, filename.indexOf('_'))); @@ -206,18 +257,37 @@ static long parseBase(Path path) { .setOldStyle(true) .minimumTransactionId(0) .maximumTransactionId(0) - .bucket(bucket); + .bucket(bucket) + .writingBase(true); } else if (filename.startsWith(BUCKET_PREFIX)) { int bucket = Integer.parseInt(filename.substring(filename.indexOf('_') + 1)); - result - .setOldStyle(false) - .minimumTransactionId(0) - .maximumTransactionId(parseBase(bucketFile.getParent())) - .bucket(bucket); + if (bucketFile.getParent().getName().startsWith(BASE_PREFIX)) { + result + .setOldStyle(false) + .minimumTransactionId(0) + .maximumTransactionId(parseBase(bucketFile.getParent())) + .bucket(bucket) + .writingBase(true); + } else if (bucketFile.getParent().getName().startsWith(DELTA_PREFIX)) { + ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELTA_PREFIX); + result + .setOldStyle(false) + .minimumTransactionId(parsedDelta.minTransaction) + .maximumTransactionId(parsedDelta.maxTransaction) + .bucket(bucket); + } else if (bucketFile.getParent().getName().startsWith(DELETE_DELTA_PREFIX)) { + ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELETE_DELTA_PREFIX); + result + .setOldStyle(false) + .minimumTransactionId(parsedDelta.minTransaction) + .maximumTransactionId(parsedDelta.maxTransaction) + .bucket(bucket); + } } else { result.setOldStyle(true).bucket(-1).minimumTransactionId(0) - .maximumTransactionId(0); + .maximumTransactionId(0) + .writingBase(true); } return result; } @@ -246,6 +316,167 @@ public static DataOperationType toDataOperationType(Operation op) { throw new IllegalArgumentException("Unexpected Operation: " + op); } } + + public enum AcidEventSchemaType { + SIMPLE_SCHEMA("simple_schema"), + SPLIT_UPDATE_SCHEMA("split_update_schema"); + + private final String acidEventSchemaType; + private AcidEventSchemaType(String acidEventSchemaType) { + this.acidEventSchemaType = acidEventSchemaType; + } + + @Override + public String toString() { + return acidEventSchemaType; + } + } + + public static class AcidOperationalProperties { + private int description = 0x00; + public static final int SPLIT_UPDATE_BIT = 0x01; + public static final String SPLIT_UPDATE_STRING = "split_update"; + public static final int HASH_BASED_MERGE_BIT = 0x02; + public static final String HASH_BASED_MERGE_STRING = "hash_merge"; + public static final String DEFAULT_VALUE_STRING = "default"; + public static final String LEGACY_VALUE_STRING = "legacy"; + + private AcidOperationalProperties() { + } + + /** + * Returns an acidOperationalProperties object that represents ACID behavior for legacy tables + * that were created before ACID type system using operational properties was put in place. + * @return the acidOperationalProperties object + */ + public static AcidOperationalProperties getLegacy() { + AcidOperationalProperties obj = new AcidOperationalProperties(); + // In legacy mode, none of these properties are turned on. + return obj; + } + + /** + * Returns an acidOperationalProperties object that represents default ACID behavior for tables + * that do no explicitly specify/override the default behavior. + * @return the acidOperationalProperties object. + */ + public static AcidOperationalProperties getDefault() { + AcidOperationalProperties obj = new AcidOperationalProperties(); + obj.setSplitUpdate(true); + obj.setHashBasedMerge(false); + return obj; + } + + /** + * Returns an acidOperationalProperties object that is represented by an encoded string. + * @param propertiesStr an encoded string representing the acidOperationalProperties. + * @return the acidOperationalProperties object. + */ + public static AcidOperationalProperties parseString(String propertiesStr) { + if (propertiesStr == null) { + return AcidOperationalProperties.getLegacy(); + } + if (propertiesStr.equalsIgnoreCase(DEFAULT_VALUE_STRING)) { + return AcidOperationalProperties.getDefault(); + } + if (propertiesStr.equalsIgnoreCase(LEGACY_VALUE_STRING)) { + return AcidOperationalProperties.getLegacy(); + } + AcidOperationalProperties obj = new AcidOperationalProperties(); + String[] options = propertiesStr.split("|"); + for (String option : options) { + switch (option) { + case SPLIT_UPDATE_STRING: + obj.setSplitUpdate(true); + break; + case HASH_BASED_MERGE_STRING: + obj.setHashBasedMerge(true); + break; + default: + break; + } + } + return obj; + } + + /** + * Returns an acidOperationalProperties object that is represented by an encoded 32-bit integer. + * @param properties an encoded 32-bit representing the acidOperationalProperties. + * @return the acidOperationalProperties object. + */ + public static AcidOperationalProperties parseInt(int properties) { + AcidOperationalProperties obj = new AcidOperationalProperties(); + if ((properties & SPLIT_UPDATE_BIT) > 0) { + obj.setSplitUpdate(true); + } + if ((properties & HASH_BASED_MERGE_BIT) > 0) { + obj.setHashBasedMerge(true); + } + return obj; + } + + /** + * Sets the split update property for ACID operations based on the boolean argument. + * When split update is turned on, an update ACID event is interpreted as a combination of + * delete event followed by an update event. + * @param isSplitUpdate a boolean property that turns on split update when true. + * @return the acidOperationalProperties object. + */ + public AcidOperationalProperties setSplitUpdate(boolean isSplitUpdate) { + description = (isSplitUpdate + ? (description | SPLIT_UPDATE_BIT) : (description & ~SPLIT_UPDATE_BIT)); + return this; + } + + /** + * Sets the hash-based merge property for ACID operations that combines delta files using + * GRACE hash join based approach, when turned on. (Currently unimplemented!) + * @param isHashBasedMerge a boolean property that turns on hash-based merge when true. + * @return the acidOperationalProperties object. + */ + public AcidOperationalProperties setHashBasedMerge(boolean isHashBasedMerge) { + description = (isHashBasedMerge + ? (description | HASH_BASED_MERGE_BIT) : (description & ~HASH_BASED_MERGE_BIT)); + return this; + } + + public boolean isSplitUpdate() { + return (description & SPLIT_UPDATE_BIT) > 0; + } + + /** + * Returns the appropriate schema type for the decorated ACID columns based on the + * acid operational properties defined. + * @return the acid event schema type. + */ + public AcidEventSchemaType getAcidEventSchemaType() { + if (isSplitUpdate()) { + return AcidEventSchemaType.SPLIT_UPDATE_SCHEMA; + } else { + return AcidEventSchemaType.SIMPLE_SCHEMA; + } + } + + public boolean isHashBasedMerge() { + return (description & HASH_BASED_MERGE_BIT) > 0; + } + + public int toInt() { + return description; + } + + @Override + public String toString() { + StringBuilder str = new StringBuilder(); + if (isSplitUpdate()) { + str.append("|" + SPLIT_UPDATE_STRING); + } + if (isHashBasedMerge()) { + str.append("|" + HASH_BASED_MERGE_STRING); + } + return str.toString(); + } + } public static interface Directory { @@ -255,6 +486,8 @@ public static DataOperationType toDataOperationType(Operation op) { */ Path getBaseDirectory(); + List getDeltaFiles(); + /** * Get the list of original files. Not {@code null}. * @return the list of original files (eg. 000000_0) @@ -381,6 +614,35 @@ else if(statementId != parsedDelta.statementId) { List result = new ArrayList<>(deltas.size()); AcidInputFormat.DeltaMetaData last = null; for(ParsedDelta parsedDelta : deltas) { + if (!parsedDelta.getPath().getName().startsWith(DELTA_PREFIX)) { + continue; // when serializing the deltas, skip anything which is not a delta file. + } + if(last != null && last.getMinTxnId() == parsedDelta.getMinTransaction() && last.getMaxTxnId() == parsedDelta.getMaxTransaction()) { + last.getStmtIds().add(parsedDelta.getStatementId()); + continue; + } + last = new AcidInputFormat.DeltaMetaData(parsedDelta.getMinTransaction(), parsedDelta.getMaxTransaction(), new ArrayList()); + result.add(last); + if(parsedDelta.statementId >= 0) { + last.getStmtIds().add(parsedDelta.getStatementId()); + } + } + return result; + } + + /** + * Convert the list of delete deltas into an equivalent list of begin/end + * transaction id pairs. Assumes {@code deltas} is sorted. + * @param deleteDeltas + * @return the list of transaction ids to serialize + */ + public static List serializeDeleteDeltas(List deleteDeltas) { + List result = new ArrayList<>(deleteDeltas.size()); + AcidInputFormat.DeltaMetaData last = null; + for(ParsedDelta parsedDelta : deleteDeltas) { + if (!parsedDelta.getPath().getName().startsWith(DELETE_DELTA_PREFIX)) { + continue; // when serializing the deltas, skip anything which is not a delta file. + } if(last != null && last.getMinTxnId() == parsedDelta.getMinTransaction() && last.getMaxTxnId() == parsedDelta.getMaxTransaction()) { last.getStmtIds().add(parsedDelta.getStatementId()); continue; @@ -416,16 +678,48 @@ else if(statementId != parsedDelta.statementId) { } return results.toArray(new Path[results.size()]); } + + /** + * Convert the list of begin/end transaction id pairs to a list of delete delta + * directories. Note that there may be multiple delta files for the exact same txn range starting + * with 1.3.x; + * see {@link org.apache.hadoop.hive.ql.io.AcidUtils#deltaSubdir(long, long, int)} + * @param root the root directory + * @param deltas list of begin/end transaction id pairs + * @return the list of delta paths + */ + public static Path[] deserializeDeleteDeltas(Path root, final List deltas) throws IOException { + List results = new ArrayList(deltas.size()); + for(AcidInputFormat.DeltaMetaData dmd : deltas) { + if(dmd.getStmtIds().isEmpty()) { + results.add(new Path(root, deleteDeltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId()))); + continue; + } + for(Integer stmtId : dmd.getStmtIds()) { + results.add(new Path(root, deleteDeltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId(), stmtId))); + } + } + return results.toArray(new Path[results.size()]); + } - private static ParsedDelta parseDelta(FileStatus path) { - ParsedDelta p = parsedDelta(path.getPath()); + public static ParsedDelta parsedDelta(Path deltaDir) { + String deltaDirName = deltaDir.getName(); + if (deltaDirName.startsWith(DELETE_DELTA_PREFIX)) { + return parsedDelta(deltaDir, DELETE_DELTA_PREFIX); + } + return parsedDelta(deltaDir, DELTA_PREFIX); // default prefix is delta_prefix + } + + private static ParsedDelta parseDelta(FileStatus path, String deltaPrefix) { + ParsedDelta p = parsedDelta(path.getPath(), deltaPrefix); return new ParsedDelta(p.getMinTransaction(), p.getMaxTransaction(), path, p.statementId); } - public static ParsedDelta parsedDelta(Path deltaDir) { + + public static ParsedDelta parsedDelta(Path deltaDir, String deltaPrefix) { String filename = deltaDir.getName(); - if (filename.startsWith(DELTA_PREFIX)) { - String rest = filename.substring(DELTA_PREFIX.length()); + if (filename.startsWith(deltaPrefix)) { + String rest = filename.substring(deltaPrefix.length()); int split = rest.indexOf('_'); int split2 = rest.indexOf('_', split + 1);//may be -1 if no statementId long min = Long.parseLong(rest.substring(0, split)); @@ -439,7 +733,7 @@ public static ParsedDelta parsedDelta(Path deltaDir) { return new ParsedDelta(min, max, null, statementId); } throw new IllegalArgumentException(deltaDir + " does not start with " + - DELTA_PREFIX); + deltaPrefix); } /** @@ -455,7 +749,8 @@ public static boolean isAcid(Path directory, for(FileStatus file: fs.listStatus(directory)) { String filename = file.getPath().getName(); if (filename.startsWith(BASE_PREFIX) || - filename.startsWith(DELTA_PREFIX)) { + filename.startsWith(DELTA_PREFIX) || + filename.startsWith(DELETE_DELTA_PREFIX)) { if (file.isDir()) { return true; } @@ -477,29 +772,42 @@ public static Directory getAcidState(Path directory, private FileStatus status; private long txn; } + + public static Directory getAcidState(Path directory, + Configuration conf, + ValidTxnList txnList, + boolean useFileIds, + boolean ignoreEmptyFiles) throws IOException { + FileSystem fs = directory.getFileSystem(conf); + return getAcidState(fs, directory, conf, txnList, useFileIds, ignoreEmptyFiles); + } + + /** * Get the ACID state of the given directory. It finds the minimal set of * base and diff directories. Note that because major compactions don't * preserve the history, we can't use a base directory that includes a * transaction id that we must exclude. + * @param fs the filesystem to be used to retrieve the file status * @param directory the partition directory to analyze * @param conf the configuration * @param txnList the list of transactions that we are reading * @return the state of the directory * @throws IOException */ - public static Directory getAcidState(Path directory, + public static Directory getAcidState(FileSystem fs, + Path directory, Configuration conf, ValidTxnList txnList, boolean useFileIds, boolean ignoreEmptyFiles ) throws IOException { - FileSystem fs = directory.getFileSystem(conf); final List deltas = new ArrayList(); List working = new ArrayList(); List originalDirectories = new ArrayList(); final List obsolete = new ArrayList(); + final List deltaFilesWithId = new ArrayList(); List childrenWithId = null; if (useFileIds) { try { @@ -514,13 +822,13 @@ public static Directory getAcidState(Path directory, if (childrenWithId != null) { for (HdfsFileStatusWithId child : childrenWithId) { getChildState(child.getFileStatus(), child, txnList, working, - originalDirectories, original, obsolete, bestBase, ignoreEmptyFiles); + originalDirectories, original, deltaFilesWithId, obsolete, bestBase, ignoreEmptyFiles); } } else { List children = HdfsUtils.listLocatedStatus(fs, directory, hiddenFileFilter); for (FileStatus child : children) { - getChildState( - child, null, txnList, working, originalDirectories, original, obsolete, bestBase, ignoreEmptyFiles); + getChildState(child, null, txnList, working, + originalDirectories, original, deltaFilesWithId, obsolete, bestBase, ignoreEmptyFiles); } } @@ -550,14 +858,18 @@ public static Directory getAcidState(Path directory, //subject to list of 'exceptions' in 'txnList' (not show in above example). long current = bestBase.txn; int lastStmtId = -1; + ParsedDelta prev = null; for(ParsedDelta next: working) { + boolean isDeltaAdded = false; if (next.maxTransaction > current) { // are any of the new transactions ones that we care about? if (txnList.isTxnRangeValid(current+1, next.maxTransaction) != ValidTxnList.RangeResponse.NONE) { deltas.add(next); + isDeltaAdded = true; current = next.maxTransaction; lastStmtId = next.statementId; + prev = next; } } else if(next.maxTransaction == current && lastStmtId >= 0) { @@ -565,10 +877,36 @@ else if(next.maxTransaction == current && lastStmtId >= 0) { //generate multiple delta files with the same txnId range //of course, if maxTransaction has already been minor compacted, all per statement deltas are obsolete deltas.add(next); + isDeltaAdded = true; + prev = next; + } + else if (prev != null && next.maxTransaction == prev.maxTransaction + && next.minTransaction == prev.minTransaction + && next.statementId == prev.statementId) { + // The 'next' parsedDelta may have everything equal to the 'prev' parsedDelta, except + // the path. This may happen when we have split update and we have two types of delta + // directories- 'delta_x_y' and 'delete_delta_x_y' for same the txn range. + deltas.add(next); + isDeltaAdded = true; + prev = next; } else { obsolete.add(next.path); } + if (isDeltaAdded) { + try { + List deltaHdfsFileStatuses + = SHIMS.listLocatedHdfsStatus(fs, directory, hiddenFileFilter); + deltaFilesWithId.addAll(deltaHdfsFileStatuses); + } catch (Throwable t) { + // Failed to get files with ID; using regular API + List deltaFileStatuses = + HdfsUtils.listLocatedStatus(fs, next.getPath(), hiddenFileFilter); + for (FileStatus deltaFileStatus : deltaFileStatuses) { + deltaFilesWithId.add(createOriginalObj(null, deltaFileStatus)); + } + } + } } final Path base = bestBase.status == null ? null : bestBase.status.getPath(); @@ -596,12 +934,18 @@ public Path getBaseDirectory() { public List getObsolete() { return obsolete; } + + @Override + public List getDeltaFiles() { + return deltaFilesWithId; + } }; } private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId, ValidTxnList txnList, List working, List originalDirectories, - List original, List obsolete, TxnBase bestBase, boolean ignoreEmptyFiles) { + List original, List deltaFilesWithId, + List obsolete, TxnBase bestBase, boolean ignoreEmptyFiles) throws IOException { Path p = child.getPath(); String fn = p.getName(); if (fn.startsWith(BASE_PREFIX) && child.isDir()) { @@ -616,8 +960,11 @@ private static void getChildState(FileStatus child, HdfsFileStatusWithId childWi } else { obsolete.add(child); } - } else if (fn.startsWith(DELTA_PREFIX) && child.isDir()) { - ParsedDelta delta = parseDelta(child); + } else if ((fn.startsWith(DELTA_PREFIX) || fn.startsWith(DELETE_DELTA_PREFIX)) + && child.isDir()) { + String deltaPrefix = + (fn.startsWith(DELTA_PREFIX)) ? DELTA_PREFIX : DELETE_DELTA_PREFIX; + ParsedDelta delta = parseDelta(child, deltaPrefix); if (txnList.isTxnRangeValid(delta.minTransaction, delta.maxTransaction) != ValidTxnList.RangeResponse.NONE) { @@ -745,4 +1092,84 @@ public static boolean isAcidTable(Table table) { return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true"); } + + /** + * Sets the acidOperationalProperties in the configuration object argument. + * @param conf Mutable configuration object + * @param properties An acidOperationalProperties object to initialize from. + */ + public static void setAcidOperationalProperties(Configuration conf, + AcidOperationalProperties properties) { + if (properties != null) { + HiveConf.setIntVar(conf, ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES, properties.toInt()); + } + } + + /** + * Sets the acidOperationalProperties in the map object argument. + * @param parameters Mutable map object + * @param properties An acidOperationalProperties object to initialize from. + */ + public static void setAcidOperationalProperties( + Map parameters, AcidOperationalProperties properties) { + if (properties != null) { + parameters.put(ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, properties.toString()); + } + } + + /** + * Returns the acidOperationalProperties for a given table. + * @param table A table object + * @return the acidOperationalProperties object for the corresponding table. + */ + public static AcidOperationalProperties getAcidOperationalProperties(Table table) { + String transactionalProperties = table.getProperty( + hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); + if (transactionalProperties == null) { + // If the table does not define any transactional properties, we return a legacy type. + return AcidOperationalProperties.getLegacy(); + } + return AcidOperationalProperties.parseString(transactionalProperties); + } + + /** + * Returns the acidOperationalProperties for a given configuration. + * @param conf A configuration object + * @return the acidOperationalProperties object for the corresponding configuration. + */ + public static AcidOperationalProperties getAcidOperationalProperties(Configuration conf) { + // If the conf does not define any transactional properties, the parseInt() should receive + // a value of zero, which will set AcidOperationalProperties to a legacy type and return that. + return AcidOperationalProperties.parseInt( + HiveConf.getIntVar(conf, ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES)); + } + + /** + * Returns the acidOperationalProperties for a given set of properties. + * @param props A properties object + * @return the acidOperationalProperties object for the corresponding properties. + */ + public static AcidOperationalProperties getAcidOperationalProperties(Properties props) { + String resultStr = props.getProperty(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); + if (resultStr == null) { + // If the properties does not define any transactional properties, we return a legacy type. + return AcidOperationalProperties.getLegacy(); + } + return AcidOperationalProperties.parseString(resultStr); + } + + /** + * Returns the acidOperationalProperties for a given map. + * @param parameters A parameters object + * @return the acidOperationalProperties object for the corresponding map. + */ + public static AcidOperationalProperties getAcidOperationalProperties( + Map parameters) { + String resultStr = parameters.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); + if (resultStr == null) { + // If the parameters does not define any transactional properties, we return a legacy type. + return AcidOperationalProperties.getLegacy(); + } + return AcidOperationalProperties.parseString(resultStr); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 227a051..24990d6 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -659,6 +659,7 @@ protected void pushProjectionsAndFilters(JobConf jobConf, Class inputFormatClass pushFilters(jobConf, ts); AcidUtils.setTransactionalTableScan(job, ts.getConf().isAcidTable()); + AcidUtils.setAcidOperationalProperties(job, ts.getConf().getAcidOperationalProperties()); } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 69d58d6..bcc0480 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -80,6 +80,7 @@ import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.AcidUtils.Directory; +import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta; import org.apache.hadoop.hive.ql.io.BatchToRowInputFormat; import org.apache.hadoop.hive.ql.io.BatchToRowReader; import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; @@ -1020,7 +1021,7 @@ public AcidDirInfo run() throws Exception { } private AcidDirInfo callInternal() throws IOException { - AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir, + AcidUtils.Directory dirInfo = AcidUtils.getAcidState(fs, dir, context.conf, context.transactionList, useFileIds, true); Path base = dirInfo.getBaseDirectory(); // find the base files (original or new style) @@ -1698,7 +1699,20 @@ public float getProgress() throws IOException { } else { root = path; } - final Path[] deltas = AcidUtils.deserializeDeltas(root, split.getDeltas()); + + // Retrieve the acidOperationalProperties for the table, initialized in HiveInputFormat. + AcidUtils.AcidOperationalProperties acidOperationalProperties + = AcidUtils.getAcidOperationalProperties(options.getConfiguration()); + + // The deltas are decided based on whether split-update has been turned on for the table or not. + // When split-update is turned off, everything in the delta_x_y/ directory should be treated + // as delta. However if split-update is turned on, only the files in delete_delta_x_y/ directory + // need to be considered as delta, because files in delta_x_y/ will be processed as base files + // since they only have insert events in them. + final Path[] deltas = + acidOperationalProperties.isSplitUpdate() ? + AcidUtils.deserializeDeleteDeltas(root, split.getDeltas()) + : AcidUtils.deserializeDeltas(root, split.getDeltas()); final Configuration conf = options.getConfiguration(); @@ -1718,7 +1732,7 @@ public float getProgress() throws IOException { setSearchArgument(readOptions, schemaTypes, conf, SCHEMA_TYPES_IS_ORIGINAL); if (split.hasBase()) { - bucket = AcidUtils.parseBaseBucketFilename(split.getPath(), conf) + bucket = AcidUtils.parseBaseOrDeltaBucketFilename(split.getPath(), conf) .getBucket(); OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf) .maxLength(split.getFileLength()); @@ -1874,15 +1888,59 @@ private static boolean isStripeSatisfyPredicate( List original = dirInfo.getOriginalFiles(); List deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories()); boolean[] covered = new boolean[context.numBuckets]; - boolean isOriginal = base == null; + boolean isOriginal = (base == null) && (!original.isEmpty()); + + // Determine the transactional_properties of the table from the job conf stored in context. + // The table properties are copied to job conf at HiveInputFormat::addSplitsForGroup(), + // & therefore we should be able to retrieve them here and determine appropriate behavior. + String transactional_properties = + context.conf.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); + AcidUtils.AcidOperationalProperties acidOperationalProperties + = AcidUtils.AcidOperationalProperties.parseString(transactional_properties); + + if (acidOperationalProperties.isSplitUpdate()) { + // If we have split-update turned on for this table, then the delta events have already been + // split into two directories- delta_x_y/ and delete_delta_x_y/. + // When you have split-update turned on, the insert events go to delta_x_y/ directory and all + // the delete events go to delete_x_y/. An update event will generate two events- + // a delete event for the old record that is put into delete_delta_x_y/, + // followed by an insert event for the updated record put into the usual delta_x_y/. + // Therefore, everything inside delta_x_y/ is an insert event and all the files in delta_x_y/ + // can be treated like base files. Hence, each of these are added to baseOrOriginalFiles list. + + List deltaFiles = dirInfo.getDeltaFiles(); + // We make a copy and do not mutate the original list. + baseOrOriginalFiles = new ArrayList(baseOrOriginalFiles); + for (HdfsFileStatusWithId deltaFile : deltaFiles) { + // All the delta_x_y files are being considered as base files for split-update. + // The path structure of the deltaFile is assumed to be ../delta_x_y/bucket_a. + Path deltaDirRoot = deltaFile.getFileStatus().getPath().getParent(); + if (deltaDirRoot.getName().startsWith(AcidUtils.DELTA_PREFIX)) { + baseOrOriginalFiles.add(deltaFile); + } + } + + // Now, once we have considered the delta_x_y as base, we need to remove them from the + // original deltas, so that the 'deltas' only contain delete_delta_x_y directories. + List parsedDeltas = dirInfo.getCurrentDirectories(); + List deleteParsedDeltas = new ArrayList(); + for (ParsedDelta parsedDelta : parsedDeltas) { + if (parsedDelta.getPath().getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)) { + deleteParsedDeltas.add(parsedDelta); + } + } + // Update the 'deltas' to include only the delete_deltas. + deltas = AcidUtils.serializeDeleteDeltas(deleteParsedDeltas); + } // if we have a base to work from - if (base != null || !original.isEmpty()) { + if (base != null || !original.isEmpty() || !baseOrOriginalFiles.isEmpty()) { long totalFileSize = 0; for (HdfsFileStatusWithId child : baseOrOriginalFiles) { totalFileSize += child.getFileStatus().getLen(); - AcidOutputFormat.Options opts = AcidUtils.parseBaseBucketFilename + AcidOutputFormat.Options opts = AcidUtils.parseBaseOrDeltaBucketFilename (child.getFileStatus().getPath(), context.conf); + opts.writingBase(true); int b = opts.getBucket(); // If the bucket is in the valid range, mark it as covered. // I wish Hive actually enforced bucketing all of the time. @@ -1933,6 +1991,10 @@ private static boolean isStripeSatisfyPredicate( Path bucketFile; if (baseDirectory.getName().startsWith(AcidUtils.BASE_PREFIX)) { bucketFile = AcidUtils.createBucketFile(baseDirectory, bucket); + } else if (baseDirectory.getName().startsWith(AcidUtils.DELTA_PREFIX)) { + // This is invoked when we have split-update and delta files consisting of only + // the insert events are also considered base files. + bucketFile = AcidUtils.createBucketFile(baseDirectory, bucket); } else { isOriginal = true; bucketFile = findOriginalBucket(baseDirectory.getFileSystem(conf), diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java index b0f8c8b..d0552f3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.RecordUpdater; import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter; +import org.apache.hadoop.hive.ql.io.AcidUtils.AcidOperationalProperties; import org.apache.hadoop.hive.ql.io.orc.OrcSerde.OrcSerdeRow; import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -293,26 +294,57 @@ public RecordUpdater getRecordUpdater(Path path, .rowIndexStride(0); } final OrcRecordUpdater.KeyIndexBuilder watcher = - new OrcRecordUpdater.KeyIndexBuilder(); + new OrcRecordUpdater.KeyIndexBuilder(); opts.inspector(options.getInspector()) .callback(watcher); final Writer writer = OrcFile.createWriter(filename, opts); + final Writer deleteEventWriter; + final OrcRecordUpdater.KeyIndexBuilder deleteEventWatcher; + + final AcidOperationalProperties acidOperationalProperties + = AcidUtils.getAcidOperationalProperties(options.getConfiguration()); + if (acidOperationalProperties.isSplitUpdate() && !options.isWritingBase()) { + deleteEventWatcher = new OrcRecordUpdater.KeyIndexBuilder(); + opts.callback(deleteEventWatcher); + Path deleteEventFilename = AcidUtils.getDeleteEventFilePath(filename); + deleteEventWriter = OrcFile.createWriter(deleteEventFilename, opts); + } else { + deleteEventWriter = null; + deleteEventWatcher = null; + } return new org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter() { @Override public void write(Writable w) throws IOException { OrcStruct orc = (OrcStruct) w; - watcher.addKey( - ((IntWritable) orc.getFieldValue(OrcRecordUpdater.OPERATION)).get(), - ((LongWritable) - orc.getFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION)).get(), - ((IntWritable) orc.getFieldValue(OrcRecordUpdater.BUCKET)).get(), - ((LongWritable) orc.getFieldValue(OrcRecordUpdater.ROW_ID)).get()); - writer.addRow(w); + int operation = ((IntWritable) orc.getFieldValue(OrcRecordUpdater.OPERATION)).get(); + if (!acidOperationalProperties.isSplitUpdate() + || operation != OrcRecordUpdater.DELETE_OPERATION) { + watcher.addKey( + ((IntWritable) orc.getFieldValue(OrcRecordUpdater.OPERATION)).get(), + ((LongWritable) + orc.getFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION)).get(), + ((IntWritable) orc.getFieldValue(OrcRecordUpdater.BUCKET)).get(), + ((LongWritable) orc.getFieldValue(OrcRecordUpdater.ROW_ID)).get()); + writer.addRow(w); + } else { + // This is a split-update with delete operation, hence we will be writing all delete + // events to a separate file. + deleteEventWriter.addRow(w); + deleteEventWatcher.addKey( + ((IntWritable) orc.getFieldValue(OrcRecordUpdater.OPERATION)).get(), + ((LongWritable) + orc.getFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION)).get(), + ((IntWritable) orc.getFieldValue(OrcRecordUpdater.BUCKET)).get(), + ((LongWritable) orc.getFieldValue(OrcRecordUpdater.ROW_ID)).get()); + } } @Override public void close(boolean abort) throws IOException { writer.close(); + if (deleteEventWriter != null) { + deleteEventWriter.close(); + } } }; } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index e577961..e13543c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.io.IntWritable; @@ -79,9 +80,12 @@ private static final Charset UTF8 = Charset.forName("UTF-8"); private final AcidOutputFormat.Options options; + private final AcidUtils.AcidOperationalProperties acidOperationalProperties; private final Path path; + private Path deleteEventPath; private final FileSystem fs; private Writer writer; + private Writer deleteEventWriter; private final FSDataOutputStream flushLengths; private final OrcStruct item; private final IntWritable operation = new IntWritable(); @@ -95,13 +99,16 @@ // because that is monotonically increasing to give new unique row ids. private long rowCountDelta = 0; private final KeyIndexBuilder indexBuilder = new KeyIndexBuilder(); + private final KeyIndexBuilder deleteEventIndexBuilder = new KeyIndexBuilder(); private StructField recIdField = null; // field to look for the record identifier in private StructField rowIdField = null; // field inside recId to look for row id in private StructField originalTxnField = null; // field inside recId to look for original txn in + private StructField bucketField = null; // field inside recId to look for bucket in private StructObjectInspector rowInspector; // OI for the original row private StructObjectInspector recIdInspector; // OI for the record identifier struct private LongObjectInspector rowIdInspector; // OI for the long row id inside the recordIdentifier private LongObjectInspector origTxnInspector; // OI for the original txn inside the record + private IntObjectInspector bucketInspector; // OI for the bucket inside the record // identifer static int getOperation(OrcStruct struct) { @@ -180,8 +187,19 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { OrcRecordUpdater(Path path, AcidOutputFormat.Options options) throws IOException { this.options = options; + // Initialize acidOperationalProperties based on table properties, and + // if they are not available, see if we can find it in the job configuration. + if (options.getTableProperties() != null) { + this.acidOperationalProperties = + AcidUtils.getAcidOperationalProperties(options.getTableProperties()); + } else { + this.acidOperationalProperties = + AcidUtils.getAcidOperationalProperties(options.getConfiguration()); + } this.bucket.set(options.getBucket()); - this.path = AcidUtils.createFilename(path, options); + this.path = AcidUtils.createFilename(path, options); + this.deleteEventWriter = null; + this.deleteEventPath = null; FileSystem fs = options.getFilesystem(); if (fs == null) { fs = path.getFileSystem(options.getConfiguration()); @@ -241,6 +259,14 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { writerOptions.inspector(createEventSchema(findRecId(options.getInspector(), options.getRecordIdColumn()))); this.writer = OrcFile.createWriter(this.path, writerOptions); + if (this.acidOperationalProperties.isSplitUpdate()) { + // If this is a split-update, we also open a writer that would write update/delete events to + // a separate file. This writes to a file in directory which starts with "delete_delta_..." + this.deleteEventPath = AcidUtils.getDeleteEventFilePath(this.path); + // Change the indexBuilder callback too for the deleteEvent file. + writerOptions.fileSystem(fs).callback(deleteEventIndexBuilder); + this.deleteEventWriter = OrcFile.createWriter(deleteEventPath, writerOptions); + } item = new OrcStruct(FIELDS); item.setFieldValue(OPERATION, operation); item.setFieldValue(CURRENT_TRANSACTION, currentTransaction); @@ -265,12 +291,13 @@ private long findRowIdOffsetForInsert() throws IOException { * Then, * 1. find the same bucket file in previous delta dir for this txn * 2. read the footer and get AcidStats which has insert count - * 2.1 if AcidStats.inserts>0 done + * 2.1 if AcidStats.inserts>0 add to the insert count. * else go to previous delta file * For example, consider insert/update/insert case...*/ if(options.getStatementId() <= 0) { return 0;//there is only 1 statement in this transaction (so far) } + long totalInserts = 0; for(int pastStmt = options.getStatementId() - 1; pastStmt >= 0; pastStmt--) { Path matchingBucket = AcidUtils.createFilename(options.getFinalDestination(), options.clone().statementId(pastStmt)); if(!fs.exists(matchingBucket)) { @@ -280,12 +307,10 @@ private long findRowIdOffsetForInsert() throws IOException { //no close() on Reader?! AcidStats acidStats = OrcAcidUtils.parseAcidStats(reader); if(acidStats.inserts > 0) { - return acidStats.inserts; + totalInserts += acidStats.inserts; } } - //if we got here, we looked at all delta files in this txn, prior to current statement and didn't - //find any inserts... - return 0; + return totalInserts; } // Find the record identifier column (if there) and return a possibly new ObjectInspector that // will strain out the record id for the underlying writer. @@ -306,6 +331,8 @@ private ObjectInspector findRecId(ObjectInspector inspector, int rowIdColNum) { // in RecordIdentifier is transactionId, bucketId, rowId originalTxnField = fields.get(0); origTxnInspector = (LongObjectInspector)originalTxnField.getFieldObjectInspector(); + bucketField = fields.get(1); + bucketInspector = (IntObjectInspector) bucketField.getFieldObjectInspector(); rowIdField = fields.get(2); rowIdInspector = (LongObjectInspector)rowIdField.getFieldObjectInspector(); @@ -315,7 +342,7 @@ private ObjectInspector findRecId(ObjectInspector inspector, int rowIdColNum) { } } - private void addEvent(int operation, long currentTransaction, long rowId, Object row) + private void addSimpleEvent(int operation, long currentTransaction, long rowId, Object row) throws IOException { this.operation.set(operation); this.currentTransaction.set(currentTransaction); @@ -333,11 +360,43 @@ else if(operation == INSERT_OPERATION) { } this.rowId.set(rowId); this.originalTransaction.set(originalTransaction); + item.setFieldValue(OrcRecordUpdater.OPERATION, new IntWritable(operation)); item.setFieldValue(OrcRecordUpdater.ROW, (operation == DELETE_OPERATION ? null : row)); indexBuilder.addKey(operation, originalTransaction, bucket.get(), rowId); writer.addRow(item); } + private void addSplitUpdateEvent(int operation, long currentTransaction, long rowId, Object row) + throws IOException { + if (operation == INSERT_OPERATION) { + // Just insert the record in the usual way, i.e., default to the simple behavior. + addSimpleEvent(operation, currentTransaction, rowId, row); + return; + } + this.operation.set(operation); + this.currentTransaction.set(currentTransaction); + Object rowValue = rowInspector.getStructFieldData(row, recIdField); + long originalTransaction = origTxnInspector.get( + recIdInspector.getStructFieldData(rowValue, originalTxnField)); + long prevRowId = rowIdInspector.get( + recIdInspector.getStructFieldData(rowValue, rowIdField)); + + if (operation == DELETE_OPERATION || operation == UPDATE_OPERATION) { + // A delete/update generates a delete event for the original row. + this.rowId.set(prevRowId); + this.originalTransaction.set(originalTransaction); + item.setFieldValue(OrcRecordUpdater.OPERATION, new IntWritable(DELETE_OPERATION)); + item.setFieldValue(OrcRecordUpdater.ROW, null); // ROW is null for delete events. + deleteEventIndexBuilder.addKey(DELETE_OPERATION, originalTransaction, bucket.get(), rowId); + deleteEventWriter.addRow(item); + } + + if (operation == UPDATE_OPERATION) { + // A new row is also inserted in the usual delta file for an update event. + addSimpleEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row); + } + } + @Override public void insert(long currentTransaction, Object row) throws IOException { if (this.currentTransaction.get() != currentTransaction) { @@ -346,7 +405,11 @@ public void insert(long currentTransaction, Object row) throws IOException { //always true in that case rowIdOffset = findRowIdOffsetForInsert(); } - addEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row); + if (acidOperationalProperties.isSplitUpdate()) { + addSplitUpdateEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row); + } else { + addSimpleEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row); + } rowCountDelta++; } @@ -354,8 +417,14 @@ public void insert(long currentTransaction, Object row) throws IOException { public void update(long currentTransaction, Object row) throws IOException { if (this.currentTransaction.get() != currentTransaction) { insertedRows = 0; + rowIdOffset = findRowIdOffsetForInsert(); + } + if (acidOperationalProperties.isSplitUpdate()) { + addSplitUpdateEvent(UPDATE_OPERATION, currentTransaction, -1L, row); + rowCountDelta++; // An update event creates a new row-id, in case of split-update. + } else { + addSimpleEvent(UPDATE_OPERATION, currentTransaction, -1L, row); } - addEvent(UPDATE_OPERATION, currentTransaction, -1L, row); } @Override @@ -363,9 +432,12 @@ public void delete(long currentTransaction, Object row) throws IOException { if (this.currentTransaction.get() != currentTransaction) { insertedRows = 0; } - addEvent(DELETE_OPERATION, currentTransaction, -1, row); + if (acidOperationalProperties.isSplitUpdate()) { + addSplitUpdateEvent(DELETE_OPERATION, currentTransaction, -1L, row); + } else { + addSimpleEvent(DELETE_OPERATION, currentTransaction, -1L, row); + } rowCountDelta--; - } @Override @@ -380,6 +452,12 @@ public void flush() throws IOException { long len = writer.writeIntermediateFooter(); flushLengths.writeLong(len); OrcInputFormat.SHIMS.hflush(flushLengths); + // Flush deleteEvent writers too, if any. + if (deleteEventWriter != null && deleteEventIndexBuilder.acidStats.deletes > 0) { + len = deleteEventWriter.writeIntermediateFooter(); + flushLengths.writeLong(len); + OrcInputFormat.SHIMS.hflush(flushLengths); + } } @Override @@ -387,15 +465,32 @@ public void close(boolean abort) throws IOException { if (abort) { if (flushLengths == null) { fs.delete(path, false); + if (deleteEventPath != null) { + fs.delete(deleteEventPath, false); + } } } else { if (writer != null) writer.close(); + if (deleteEventWriter != null) { + if (deleteEventIndexBuilder.acidStats.deletes > 0) { + // Only need to write out & close the delete_delta if there have been any. + deleteEventWriter.close(); + } else { + // Just remove delete_delta, if there have been no delete events. + fs.delete(deleteEventPath, false); + } + } + } if (flushLengths != null) { flushLengths.close(); fs.delete(OrcAcidUtils.getSideFile(path), false); + if (deleteEventPath != null) { + fs.delete(OrcAcidUtils.getSideFile(deleteEventPath), false); + } } writer = null; + deleteEventWriter = null; } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 82abd52..1ca62c0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -59,6 +59,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.common.ObjectPair; @@ -3062,10 +3063,11 @@ static protected void copyFiles(HiveConf conf, Path srcf, Path destf, private static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst, List newFiles) throws HiveException { - // The layout for ACID files is table|partname/base|delta/bucket + // The layout for ACID files is table|partname/base|delta|delete_delta/bucket // We will always only be writing delta files. In the buckets created by FileSinkOperator - // it will look like bucket/delta/bucket. So we need to move that into the above structure. - // For the first mover there will be no delta directory, so we can move the whole directory. + // it will look like bucket/delta|delete_delta/bucket. So we need to move that into + // the above structure. For the first mover there will be no delta directory, + // so we can move the whole directory. // For everyone else we will need to just move the buckets under the existing delta // directory. @@ -3088,49 +3090,58 @@ private static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst, for (FileStatus origBucketStat : origBucketStats) { Path origBucketPath = origBucketStat.getPath(); - LOG.debug("Acid move looking for delta files in bucket " + origBucketPath); + moveAcidDeltaFiles(AcidUtils.DELTA_PREFIX, AcidUtils.deltaFileFilter, + fs, dst, origBucketPath, createdDeltaDirs, newFiles); + moveAcidDeltaFiles(AcidUtils.DELETE_DELTA_PREFIX, AcidUtils.deleteEventDeltaDirFilter, + fs, dst,origBucketPath, createdDeltaDirs, newFiles); + } + } + } + + private static void moveAcidDeltaFiles(String deltaFileType, PathFilter pathFilter, FileSystem fs, + Path dst, Path origBucketPath, Set createdDeltaDirs, + List newFiles) throws HiveException { + LOG.debug("Acid move looking for " + deltaFileType + " files in bucket " + origBucketPath); - FileStatus[] deltaStats = null; - try { - deltaStats = fs.listStatus(origBucketPath, AcidUtils.deltaFileFilter); - } catch (IOException e) { - throw new HiveException("Unable to look for delta files in original bucket " + - origBucketPath.toUri().toString(), e); - } - LOG.debug("Acid move found " + deltaStats.length + " delta files"); - - for (FileStatus deltaStat : deltaStats) { - Path deltaPath = deltaStat.getPath(); - // Create the delta directory. Don't worry if it already exists, - // as that likely means another task got to it first. Then move each of the buckets. - // it would be more efficient to try to move the delta with it's buckets but that is - // harder to make race condition proof. - Path deltaDest = new Path(dst, deltaPath.getName()); + FileStatus[] deltaStats = null; + try { + deltaStats = fs.listStatus(origBucketPath, pathFilter); + } catch (IOException e) { + throw new HiveException("Unable to look for " + deltaFileType + " files in original bucket " + + origBucketPath.toUri().toString(), e); + } + LOG.debug("Acid move found " + deltaStats.length + " " + deltaFileType + " files"); + + for (FileStatus deltaStat : deltaStats) { + Path deltaPath = deltaStat.getPath(); + // Create the delta directory. Don't worry if it already exists, + // as that likely means another task got to it first. Then move each of the buckets. + // it would be more efficient to try to move the delta with it's buckets but that is + // harder to make race condition proof. + Path deltaDest = new Path(dst, deltaPath.getName()); + try { + if (!createdDeltaDirs.contains(deltaDest)) { try { - if (!createdDeltaDirs.contains(deltaDest)) { - try { - fs.mkdirs(deltaDest); - createdDeltaDirs.add(deltaDest); - } catch (IOException swallowIt) { - // Don't worry about this, as it likely just means it's already been created. - LOG.info("Unable to create delta directory " + deltaDest + - ", assuming it already exists: " + swallowIt.getMessage()); - } - } - FileStatus[] bucketStats = fs.listStatus(deltaPath, AcidUtils.bucketFileFilter); - LOG.debug("Acid move found " + bucketStats.length + " bucket files"); - for (FileStatus bucketStat : bucketStats) { - Path bucketSrc = bucketStat.getPath(); - Path bucketDest = new Path(deltaDest, bucketSrc.getName()); - LOG.info("Moving bucket " + bucketSrc.toUri().toString() + " to " + - bucketDest.toUri().toString()); - fs.rename(bucketSrc, bucketDest); - if (newFiles != null) newFiles.add(bucketDest); - } - } catch (IOException e) { - throw new HiveException("Error moving acid files " + e.getMessage(), e); + fs.mkdirs(deltaDest); + createdDeltaDirs.add(deltaDest); + } catch (IOException swallowIt) { + // Don't worry about this, as it likely just means it's already been created. + LOG.info("Unable to create " + deltaFileType + " directory " + deltaDest + + ", assuming it already exists: " + swallowIt.getMessage()); } } + FileStatus[] bucketStats = fs.listStatus(deltaPath, AcidUtils.bucketFileFilter); + LOG.debug("Acid move found " + bucketStats.length + " bucket files"); + for (FileStatus bucketStat : bucketStats) { + Path bucketSrc = bucketStat.getPath(); + Path bucketDest = new Path(deltaDest, bucketSrc.getName()); + LOG.info("Moving bucket " + bucketSrc.toUri().toString() + " to " + + bucketDest.toUri().toString()); + fs.rename(bucketSrc, bucketDest); + if (newFiles != null) newFiles.add(bucketDest); + } + } catch (IOException e) { + throw new HiveException("Error moving acid files " + e.getMessage(), e); } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java index 8cf261d..1131147 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java @@ -100,6 +100,8 @@ private boolean isMetadataOnly = false; private boolean isAcidTable; + + private AcidUtils.AcidOperationalProperties acidOperationalProperties; private transient TableSample tableSample; @@ -127,6 +129,11 @@ public TableScanDesc(final String alias, List vcs, Table tblMetad this.virtualCols = vcs; this.tableMetadata = tblMetadata; isAcidTable = AcidUtils.isAcidTable(this.tableMetadata); + if (isAcidTable) { + acidOperationalProperties = AcidUtils.getAcidOperationalProperties(this.tableMetadata); + } else { + acidOperationalProperties = null; + } } @Override @@ -158,6 +165,10 @@ public String getTbl() { public boolean isAcidTable() { return isAcidTable; } + + public AcidUtils.AcidOperationalProperties getAcidOperationalProperties() { + return acidOperationalProperties; + } @Explain(displayName = "Output", explainLevels = { Level.USER }) public List getOutputColumnNames() { diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 6caca98..8f58b16 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -128,7 +128,7 @@ private JobConf createBaseJobConf(HiveConf conf, String jobName, Table t, Storag job.set(TABLE_PROPS, new StringableMap(t.getParameters()).toString()); job.setInt(NUM_BUCKETS, sd.getNumBuckets()); job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString()); - overrideMRProps(job, t.getParameters()); // override MR properties from tblproperties if applicable + overrideMRProps(job, t.getParameters()); if (ci.properties != null) { // override MR properties and general tblproperties if applicable overrideTblProps(job, t.getParameters(), ci.properties); } @@ -137,6 +137,11 @@ private JobConf createBaseJobConf(HiveConf conf, String jobName, Table t, Storag //to generate the target dir in the Map task, there is no easy way to pass it to OutputCommitter //to do the final move job.setBoolean("mapreduce.map.speculative", false); + + // Set appropriate Acid readers/writers based on the table properties. + AcidUtils.setAcidOperationalProperties(job, + AcidUtils.getAcidOperationalProperties(t.getParameters())); + return job; } @@ -494,6 +499,8 @@ public String toString() { @Override public InputSplit[] getSplits(JobConf entries, int i) throws IOException { Path baseDir = null; + AcidUtils.AcidOperationalProperties acidOperationalProperties = + AcidUtils.getAcidOperationalProperties(entries); if (entries.get(BASE_DIR) != null) baseDir = new Path(entries.get(BASE_DIR)); StringableList tmpDeltaDirs = new StringableList(entries.get(DELTA_DIRS)); Path[] deltaDirs = tmpDeltaDirs.toArray(new Path[tmpDeltaDirs.size()]); @@ -502,23 +509,56 @@ public String toString() { for (Path dir : dirsToSearch) { FileSystem fs = dir.getFileSystem(entries); - // If this is a base or delta directory, then we need to be looking for the bucket files. - // But if it's a legacy file then we need to add it directly. - if (dir.getName().startsWith(AcidUtils.BASE_PREFIX) || - dir.getName().startsWith(AcidUtils.DELTA_PREFIX)) { - boolean sawBase = dir.getName().startsWith(AcidUtils.BASE_PREFIX); - FileStatus[] files = fs.listStatus(dir, AcidUtils.bucketFileFilter); - for(FileStatus f : files) { - // For each file, figure out which bucket it is. - Matcher matcher = AcidUtils.BUCKET_DIGIT_PATTERN.matcher(f.getPath().getName()); - addFileToMap(matcher, f.getPath(), sawBase, splitToBucketMap); + if (acidOperationalProperties.isSplitUpdate()) { + // TODO: When we have split-update and there are two kinds of delta directories- + // the delta_x_y/ directory one which has only insert events and + // the delete_delta_x_y/ directory which has only the delete events. + // The clever thing about this kind of splitting is that everything in the delta_x_y/ + // directory can be processed as base files. However, this is left out currently + // as an improvement for the future. + + if (dir.getName().startsWith(AcidUtils.BASE_PREFIX) || + dir.getName().startsWith(AcidUtils.DELTA_PREFIX) || + dir.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)) { + boolean sawBase = + dir.getName().startsWith(AcidUtils.BASE_PREFIX) ? true : false; + + FileStatus[] files = fs.listStatus(dir, AcidUtils.bucketFileFilter); + for(FileStatus f : files) { + // For each file, figure out which bucket it is. + Matcher matcher = AcidUtils.BUCKET_DIGIT_PATTERN.matcher(f.getPath().getName()); + addFileToMap(matcher, f.getPath(), sawBase, splitToBucketMap); + } + } else { + // Legacy file, see if it's a bucket file + Matcher matcher = AcidUtils.LEGACY_BUCKET_DIGIT_PATTERN.matcher(dir.getName()); + addFileToMap(matcher, dir, true, splitToBucketMap); } + } else { - // Legacy file, see if it's a bucket file - Matcher matcher = AcidUtils.LEGACY_BUCKET_DIGIT_PATTERN.matcher(dir.getName()); - addFileToMap(matcher, dir, true, splitToBucketMap); + // There is only kind of delta directory and therefore all the delta files are treated + // the usual way. + + // If this is a base or delta directory, then we need to be looking for the bucket files. + // But if it's a legacy file then we need to add it directly. + if (dir.getName().startsWith(AcidUtils.BASE_PREFIX) || + dir.getName().startsWith(AcidUtils.DELTA_PREFIX)) { + boolean sawBase = dir.getName().startsWith(AcidUtils.BASE_PREFIX); + FileStatus[] files = fs.listStatus(dir, AcidUtils.bucketFileFilter); + for(FileStatus f : files) { + // For each file, figure out which bucket it is. + Matcher matcher = AcidUtils.BUCKET_DIGIT_PATTERN.matcher(f.getPath().getName()); + addFileToMap(matcher, f.getPath(), sawBase, splitToBucketMap); + } + } else { + // Legacy file, see if it's a bucket file + Matcher matcher = AcidUtils.LEGACY_BUCKET_DIGIT_PATTERN.matcher(dir.getName()); + addFileToMap(matcher, dir, true, splitToBucketMap); + } } } + + List splits = new ArrayList(splitToBucketMap.size()); for (Map.Entry e : splitToBucketMap.entrySet()) { BucketTracker bt = e.getValue(); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index e76c925..100822f 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -299,7 +299,8 @@ public void testNonAcidToAcidConversion1() throws Exception { Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); // 2. Convert NONACIDORCTBL to ACID table - runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("alter table " + Table.NONACIDORCTBL + + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='legacy')"); status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); // Everything should be same as before @@ -430,7 +431,8 @@ public void testNonAcidToAcidConversion2() throws Exception { Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); // 2. Convert NONACIDORCTBL to ACID table - runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("alter table " + Table.NONACIDORCTBL + + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='legacy')"); status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); // Everything should be same as before @@ -552,7 +554,8 @@ public void testNonAcidToAcidConversion3() throws Exception { Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); // 2. Convert NONACIDORCTBL to ACID table - runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("alter table " + Table.NONACIDORCTBL + + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='legacy')"); status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); // Everything should be same as before diff --git ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java index 5745dee..8520256 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java @@ -92,14 +92,14 @@ public void testParsing() throws Exception { Path dir = new Path("/tmp/tbl"); Configuration conf = new Configuration(); AcidOutputFormat.Options opts = - AcidUtils.parseBaseBucketFilename(new Path(dir, "base_567/bucket_123"), + AcidUtils.parseBaseOrDeltaBucketFilename(new Path(dir, "base_567/bucket_123"), conf); assertEquals(false, opts.getOldStyle()); assertEquals(true, opts.isWritingBase()); assertEquals(567, opts.getMaximumTransactionId()); assertEquals(0, opts.getMinimumTransactionId()); assertEquals(123, opts.getBucket()); - opts = AcidUtils.parseBaseBucketFilename(new Path(dir, "000123_0"), conf); + opts = AcidUtils.parseBaseOrDeltaBucketFilename(new Path(dir, "000123_0"), conf); assertEquals(true, opts.getOldStyle()); assertEquals(true, opts.isWritingBase()); assertEquals(123, opts.getBucket()); diff --git ql/src/test/results/clientpositive/acid_globallimit.q.out ql/src/test/results/clientpositive/acid_globallimit.q.out index 93246e8..b0dfa40 100644 --- ql/src/test/results/clientpositive/acid_globallimit.q.out +++ ql/src/test/results/clientpositive/acid_globallimit.q.out @@ -47,6 +47,7 @@ Table Parameters: rawDataSize 0 totalSize 102202 transactional true + transactional_properties default #### A masked pattern was here #### # Storage Information diff --git ql/src/test/results/clientpositive/acid_table_stats.q.out ql/src/test/results/clientpositive/acid_table_stats.q.out index f662a48..eb50b01 100644 --- ql/src/test/results/clientpositive/acid_table_stats.q.out +++ ql/src/test/results/clientpositive/acid_table_stats.q.out @@ -36,6 +36,7 @@ Retention: 0 Table Type: MANAGED_TABLE Table Parameters: transactional true + transactional_properties default #### A masked pattern was here #### # Storage Information @@ -91,10 +92,10 @@ Database: default Table: acid #### A masked pattern was here #### Partition Parameters: - numFiles 2 + numFiles 4 numRows 0 rawDataSize 0 - totalSize 3837 + totalSize 4249 #### A masked pattern was here #### # Storage Information @@ -132,9 +133,9 @@ STAGE PLANS: Map Operator Tree: TableScan alias: acid - Statistics: Num rows: 1 Data size: 3837 Basic stats: PARTIAL Column stats: NONE + Statistics: Num rows: 1 Data size: 4249 Basic stats: PARTIAL Column stats: NONE Select Operator - Statistics: Num rows: 1 Data size: 3837 Basic stats: PARTIAL Column stats: NONE + Statistics: Num rows: 1 Data size: 4249 Basic stats: PARTIAL Column stats: NONE Group By Operator aggregations: count() mode: hash @@ -208,10 +209,10 @@ Table: acid #### A masked pattern was here #### Partition Parameters: COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} - numFiles 2 + numFiles 4 numRows 1000 - rawDataSize 208000 - totalSize 3837 + rawDataSize 208336 + totalSize 4249 #### A masked pattern was here #### # Storage Information @@ -257,10 +258,10 @@ Table: acid #### A masked pattern was here #### Partition Parameters: COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"key\":\"true\",\"value\":\"true\"}} - numFiles 2 + numFiles 4 numRows 1000 - rawDataSize 208000 - totalSize 3837 + rawDataSize 208336 + totalSize 4249 #### A masked pattern was here #### # Storage Information @@ -316,10 +317,10 @@ Database: default Table: acid #### A masked pattern was here #### Partition Parameters: - numFiles 4 + numFiles 8 numRows 1000 - rawDataSize 208000 - totalSize 7689 + rawDataSize 208336 + totalSize 8513 #### A masked pattern was here #### # Storage Information @@ -365,10 +366,10 @@ Table: acid #### A masked pattern was here #### Partition Parameters: COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} - numFiles 4 + numFiles 8 numRows 2000 - rawDataSize 416000 - totalSize 7689 + rawDataSize 416672 + totalSize 8513 #### A masked pattern was here #### # Storage Information diff --git ql/src/test/results/clientpositive/autoColumnStats_4.q.out ql/src/test/results/clientpositive/autoColumnStats_4.q.out index bf4e0bb..1b16804 100644 --- ql/src/test/results/clientpositive/autoColumnStats_4.q.out +++ ql/src/test/results/clientpositive/autoColumnStats_4.q.out @@ -30,6 +30,7 @@ Table Parameters: rawDataSize 0 totalSize 0 transactional true + transactional_properties default #### A masked pattern was here #### # Storage Information @@ -202,6 +203,7 @@ Table Parameters: rawDataSize 0 totalSize 1714 transactional true + transactional_properties default #### A masked pattern was here #### # Storage Information @@ -240,11 +242,12 @@ Retention: 0 #### A masked pattern was here #### Table Type: MANAGED_TABLE Table Parameters: - numFiles 4 + numFiles 6 numRows 0 rawDataSize 0 - totalSize 2719 + totalSize 3081 transactional true + transactional_properties default #### A masked pattern was here #### # Storage Information diff --git ql/src/test/results/clientpositive/insert_values_orig_table_use_metadata.q.out ql/src/test/results/clientpositive/insert_values_orig_table_use_metadata.q.out index 2a9eff0..57af5e2 100644 --- ql/src/test/results/clientpositive/insert_values_orig_table_use_metadata.q.out +++ ql/src/test/results/clientpositive/insert_values_orig_table_use_metadata.q.out @@ -64,6 +64,7 @@ Table Parameters: rawDataSize 0 totalSize 0 transactional true + transactional_properties default #### A masked pattern was here #### # Storage Information @@ -117,6 +118,7 @@ Table Parameters: rawDataSize 0 totalSize 377237 transactional true + transactional_properties default #### A masked pattern was here #### # Storage Information @@ -314,6 +316,7 @@ Table Parameters: rawDataSize 0 totalSize 1508 transactional true + transactional_properties default #### A masked pattern was here #### # Storage Information @@ -438,6 +441,7 @@ Table Parameters: rawDataSize 0 totalSize 3016 transactional true + transactional_properties default #### A masked pattern was here #### # Storage Information @@ -546,6 +550,7 @@ Table Parameters: rawDataSize 0 totalSize 380253 transactional true + transactional_properties default #### A masked pattern was here #### # Storage Information