diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 2405dc1..6f6b245 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -41,26 +41,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.hadoop.hive.ql.ErrorMsg; -import org.apache.hadoop.hive.ql.io.IOConstants; -import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; -import org.apache.orc.ColumnStatistics; -import org.apache.orc.OrcUtils; -import org.apache.orc.StripeInformation; -import org.apache.orc.StripeStatistics; -import org.apache.orc.TypeDescription; -import org.apache.orc.impl.OrcTail; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; @@ -73,6 +53,7 @@ import org.apache.hadoop.hive.metastore.Metastore; import org.apache.hadoop.hive.metastore.Metastore.SplitInfos; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; @@ -87,13 +68,14 @@ import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; import org.apache.hadoop.hive.ql.io.HdfsUtils; import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.InputFormatChecker; import org.apache.hadoop.hive.ql.io.LlapWrappableInputFormatInterface; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface; import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader; -import org.apache.hadoop.hive.ql.io.orc.ExternalCache.ExternalFooterCachesByConf; import org.apache.hadoop.hive.ql.io.SyntheticFileId; +import org.apache.hadoop.hive.ql.io.orc.ExternalCache.ExternalFooterCachesByConf; import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; @@ -101,9 +83,19 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; import org.apache.hadoop.hive.shims.ShimLoader; @@ -116,7 +108,16 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; +import org.apache.orc.ColumnStatistics; import org.apache.orc.OrcProto; +import org.apache.orc.OrcUtils; +import org.apache.orc.StripeInformation; +import org.apache.orc.StripeStatistics; +import org.apache.orc.TypeDescription; +import org.apache.orc.impl.InStream; +import org.apache.orc.impl.OrcTail; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; @@ -1710,6 +1711,12 @@ private static void scheduleSplits(ETLSplitStrategy splitStrategy, Context conte final RowReader inner = getReader(inputSplit, options); if (vectorMode) { + if (AcidUtils.getAcidOperationalProperties(conf).isSplitUpdate()) { + // When split-update is turned on for ACID, a more optimized vectorized batch reader + // can be created. + return (org.apache.hadoop.mapred.RecordReader) + new VectorizedOrcAcidRowBatchReader(inputSplit, conf, reporter); + } return (org.apache.hadoop.mapred.RecordReader) new VectorizedOrcAcidRowReader(inner, conf, Utilities.getMapWork(conf).getVectorizedRowBatchCtx(), (FileSplit) inputSplit); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java index e46ca51..f4e06ee 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; @@ -49,7 +50,6 @@ import org.apache.orc.TypeDescription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; public class RecordReaderImpl extends org.apache.orc.impl.RecordReaderImpl implements RecordReader { @@ -79,6 +79,10 @@ boolean ensureBatch() throws IOException { return true; } + public VectorizedRowBatch createRowBatch() { + return this.schema.createRowBatch(); + } + @Override public long getRowNumber() { return baseRow + rowInBatch; @@ -129,6 +133,7 @@ public Object next(Object previous) throws IOException { return previous; } + @Override public boolean nextBatch(VectorizedRowBatch theirBatch) throws IOException { // If the user hasn't been reading by row, use the fast path. if (rowInBatch >= batch.size) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java new file mode 100644 index 0000000..68d0d15 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -0,0 +1,707 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; +import java.util.Arrays; +import java.util.BitSet; +import java.util.List; +import java.util.Map.Entry; +import java.util.TreeMap; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ValidReadTxnList; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.orc.OrcProto; +import org.apache.orc.OrcUtils; +import org.apache.orc.TypeDescription; +import org.apache.orc.impl.AcidStats; +import org.apache.orc.impl.OrcAcidUtils; + +public class VectorizedOrcAcidRowBatchReader + implements org.apache.hadoop.mapred.RecordReader { + + private org.apache.hadoop.hive.ql.io.orc.RecordReader baseReader; + private VectorizedRowBatchCtx rbCtx; + private VectorizedRowBatch vectorizedRowBatchDup; + private long offset; + private long length; + private float progress = 0.0f; + private Object[] partitionValues; + private boolean addPartitionCols = true; + private ValidTxnList validTxnList; + private DeleteEventRegistry deleteEventRegistry; + + public VectorizedOrcAcidRowBatchReader(InputSplit inputSplit, JobConf conf, + Reporter reporter) throws IOException { + + final boolean isAcidRead = HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN); + final AcidUtils.AcidOperationalProperties acidOperationalProperties + = AcidUtils.getAcidOperationalProperties(conf); + + // This type of VectorizedOrcAcidRowBatchReader can only be created when split-update is + // enabled for an ACID case and the file format is ORC. + boolean isReadNotAllowed = !isAcidRead || !acidOperationalProperties.isSplitUpdate() + || !(inputSplit instanceof OrcSplit); + if (isReadNotAllowed) { + OrcInputFormat.raiseAcidTablesMustBeReadWithAcidReaderException(conf); + } + final OrcSplit orcSplit = (OrcSplit) inputSplit; + + rbCtx = Utilities.getVectorizedRowBatchCtx(conf); + + reporter.setStatus(orcSplit.toString()); + Path path = orcSplit.getPath(); + + OrcFile.ReaderOptions opts = OrcFile.readerOptions(conf); + if (orcSplit.hasFooter()) { + opts.orcTail(orcSplit.getOrcTail()); + } + opts.maxLength(orcSplit.getFileLength()); + Reader reader = OrcFile.createReader(path, opts); + /** + * Do we have schema on read in the configuration variables? + */ + List types = reader.getTypes(); + int dataColumns = rbCtx.getDataColumnCount(); + TypeDescription schema = + OrcInputFormat.getDesiredRowTypeDescr(conf, true, Integer.MAX_VALUE); + Reader.Options readerOptions = new Reader.Options().schema(schema); + + final List schemaTypes = OrcUtils.getOrcTypes(schema); + readerOptions.include(OrcInputFormat.genIncludedColumns(schemaTypes, conf, true)); + OrcInputFormat.setSearchArgument(readerOptions, schemaTypes, conf, true); + readerOptions = OrcRawRecordMerger.createEventOptions(readerOptions); + + this.offset = orcSplit.getStart(); + this.length = orcSplit.getLength(); + + // Careful with the range here now, we do not want to read the whole base file like deltas. + this.baseReader = reader.rowsOptions(readerOptions.range(offset, length)); + + // VectorizedRowBatchDup schema is always picked up from the file schema because it is + // supposed to read the actual vectorized row batches from the file irrespective of what is + // to be passed up the operator pipeline. + if (this.baseReader instanceof RecordReaderImpl) { + this.vectorizedRowBatchDup = ((RecordReaderImpl) this.baseReader).createRowBatch(); + } else { + throw new IOException("Failed to create vectorized row batch for the given reader."); + } + + int partitionColumnCount = rbCtx.getPartitionColumnCount(); + if (partitionColumnCount > 0) { + partitionValues = new Object[partitionColumnCount]; + rbCtx.getPartitionValues(rbCtx, conf, orcSplit, partitionValues); + } else { + partitionValues = null; + } + + String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY, + Long.MAX_VALUE + ":"); + this.validTxnList = new ValidReadTxnList(txnString); + + try { + // See if we can load all the delete events from all the delete deltas in memory... + this.deleteEventRegistry = new ColumnizedDeleteEventRegistry(conf, orcSplit, readerOptions); + } catch (DeleteEventsOverflowMemoryException e) { + // If not, then create a set of hanging readers that do sort-merge to find the next smallest + // delete event on-demand. Caps the memory consumption to (some_const * no. of readers). + this.deleteEventRegistry = new SortMergedDeleteEventRegistry(conf, orcSplit, readerOptions); + } + } + + public static Path[] getDeleteDeltaDirsFromSplit(OrcSplit orcSplit) throws IOException { + Path path = orcSplit.getPath(); + // Infer whether its an original file or base/delta file based on the path. + boolean isOriginal = true; + if (path.getParent().getName().startsWith(AcidUtils.BASE_PREFIX) + || path.getParent().getName().startsWith(AcidUtils.DELTA_PREFIX) + || path.getParent().getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)) { + isOriginal = false; + } + + Path root; + if (orcSplit.hasBase()) { + if (isOriginal) { + root = path.getParent(); + } else { + root = path.getParent().getParent(); + } + } else { + root = path; + } + return AcidUtils.deserializeDeleteDeltas(root, orcSplit.getDeltas()); + } + + @Override + public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException { + try { + // Check and update partition cols if necessary. Ideally, this should be done + // in CreateValue as the partition is constant per split. But since Hive uses + // CombineHiveRecordReader and + // as this does not call CreateValue for each new RecordReader it creates, this check is + // required in next() + if (addPartitionCols) { + if (partitionValues != null) { + rbCtx.addPartitionColsToBatch(value, partitionValues); + } + addPartitionCols = false; + } + if (vectorizedRowBatchDup == null) { + if (!baseReader.nextBatch(value)) { + return false; + } + } else { + if (!baseReader.nextBatch(vectorizedRowBatchDup)) { + return false; + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + + // Once we have read the VectorizedRowBatchDup from the file, there are two kinds of cases + // for which we might have to discard rows from the batch: + // Case 1- when the row is created by a transaction that is not valid, or + // Case 2- when the row has been deleted. + // We will go through the batch to discover rows which match any of the cases and specifically + // remove them from the selected vector. Of course, selectedInUse should also be set. + + BitSet selectedBitSet = new BitSet(vectorizedRowBatchDup.size); + if (vectorizedRowBatchDup.selectedInUse) { + // When selectedInUse is true, start with every bit set to false and selectively set + // certain bits to true based on the selected[] vector. + selectedBitSet.set(0, vectorizedRowBatchDup.size, false); + for (int j = 0; j < vectorizedRowBatchDup.size; ++j) { + int i = vectorizedRowBatchDup.selected[j]; + selectedBitSet.set(i); + } + } else { + // When selectedInUse is set to false, everything in the batch is selected. + selectedBitSet.set(0, vectorizedRowBatchDup.size, true); + } + + // Case 1- find rows which belong to transactions that are not valid. + findRecordsWithInvalidTransactionIds(vectorizedRowBatchDup, selectedBitSet); + + // Case 2- find rows which have been deleted. + this.deleteEventRegistry.findDeletedRecords(vectorizedRowBatchDup, selectedBitSet); + + if (selectedBitSet.cardinality() == vectorizedRowBatchDup.size) { + // None of the cases above matched and everything is selected. Hence, we will use the + // same values for the selected and selectedInUse. + value.size = vectorizedRowBatchDup.size; + value.selected = vectorizedRowBatchDup.selected; + value.selectedInUse = vectorizedRowBatchDup.selectedInUse; + } else { + value.size = selectedBitSet.cardinality(); + value.selectedInUse = true; + value.selected = new int[selectedBitSet.cardinality()]; + // This loop fills up the selected[] vector with all the index positions that are selected. + for (int setBitIndex = selectedBitSet.nextSetBit(0), selectedItr = 0; + setBitIndex >= 0; + setBitIndex = selectedBitSet.nextSetBit(setBitIndex+1), ++selectedItr) { + value.selected[selectedItr] = setBitIndex; + } + } + + // Finally, link up the columnVector from the duplicate VectorizedRowBatch to outgoing batch. + StructColumnVector payloadStruct = (StructColumnVector) vectorizedRowBatchDup.cols[OrcRecordUpdater.ROW]; + for (int i = 0; i < value.getDataColumnCount(); ++i) { + // Transfer columnVector objects from duplicate to outgoing. + value.cols[i] = payloadStruct.fields[i]; + payloadStruct.fields[i] = null; + } + + progress = baseReader.getProgress(); + return true; + } + + private void findRecordsWithInvalidTransactionIds(VectorizedRowBatch batch, BitSet selectedBitSet) { + if (batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION].isRepeating) { + // When we have repeating values, we can unset the whole bitset at once + // if the repeating value is not a valid transaction. + long currentTransactionIdForBatch = ((LongColumnVector) + batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector[0]; + if (!validTxnList.isTxnValid(currentTransactionIdForBatch)) { + selectedBitSet.clear(0, batch.size); + } + return; + } + long[] currentTransactionVector = + ((LongColumnVector) batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector; + // Loop through the bits that are set to true and mark those rows as false, if their + // current transactions are not valid. + for (int setBitIndex = selectedBitSet.nextSetBit(0); + setBitIndex >= 0; + setBitIndex = selectedBitSet.nextSetBit(setBitIndex+1)) { + if (!validTxnList.isTxnValid(currentTransactionVector[setBitIndex])) { + selectedBitSet.clear(setBitIndex); + } + } + } + + @Override + public NullWritable createKey() { + return NullWritable.get(); + } + + @Override + public VectorizedRowBatch createValue() { + return rbCtx.createVectorizedRowBatch(); + } + + @Override + public long getPos() throws IOException { + return offset + (long) (progress * length); + } + + @Override + public void close() throws IOException { + this.baseReader.close(); + this.deleteEventRegistry.close(); + } + + @Override + public float getProgress() throws IOException { + return progress; + } + + public interface DeleteEventRegistry { + public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet) throws IOException; + public void close() throws IOException; + } + + public class SortMergedDeleteEventRegistry implements DeleteEventRegistry { + private OrcRawRecordMerger deleteRecords; + private OrcRawRecordMerger.ReaderKey deleteRecordKey; + private OrcStruct deleteRecordValue; + private boolean isDeleteRecordAvailable = true; + + public SortMergedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, Reader.Options readerOptions) + throws IOException { + // Set the range on the readerOptions to 0 to INTEGER_MAX, because we always want to read + // all the delete delta files. + readerOptions.range(0, Long.MAX_VALUE); + + final Path[] deleteDeltas = getDeleteDeltaDirsFromSplit(orcSplit); + if (deleteDeltas.length > 0) { + int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucket(); + this.deleteRecords = new OrcRawRecordMerger(conf, true, null, false, bucket, + validTxnList, readerOptions, deleteDeltas); + this.deleteRecordKey = new OrcRawRecordMerger.ReaderKey(); + this.deleteRecordValue = this.deleteRecords.createValue(); + // Initialize the first value in the delete reader. + this.isDeleteRecordAvailable = this.deleteRecords.next(deleteRecordKey, deleteRecordValue); + } else { + this.isDeleteRecordAvailable = false; + this.deleteRecordKey = null; + this.deleteRecordValue = null; + this.deleteRecords = null; + } + } + + @Override + public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet) + throws IOException { + if (!isDeleteRecordAvailable) { + return; + } + + long[] originalTransaction = + batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? null + : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector; + long[] bucket = + batch.cols[OrcRecordUpdater.BUCKET].isRepeating ? null + : ((LongColumnVector) batch.cols[OrcRecordUpdater.BUCKET]).vector; + long[] rowId = + batch.cols[OrcRecordUpdater.ROW_ID].isRepeating ? null + : ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector; + + // The following repeatedX values will be set, if any of the columns are repeating. + long repeatedOriginalTransaction = (originalTransaction != null) ? -1 + : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[0]; + long repeatedBucket = (bucket != null) ? -1 + : ((LongColumnVector) batch.cols[OrcRecordUpdater.BUCKET]).vector[0]; + long repeatedRowId = (rowId != null) ? -1 + : ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector[0]; + + + // Get the first valid row in the batch still available. + int firstValidIndex = selectedBitSet.nextSetBit(0); + if (firstValidIndex == -1 || firstValidIndex >= batch.size) { + return; // Everything in the batch has already been filtered out. + } + RecordIdentifier firstRecordIdInBatch = + new RecordIdentifier( + originalTransaction != null ? originalTransaction[firstValidIndex] : repeatedOriginalTransaction, + bucket != null ? (int) bucket[firstValidIndex] : (int) repeatedBucket, + rowId != null ? (int) rowId[firstValidIndex] : repeatedRowId); + + // Get the last valid row in the batch still available. + int lastValidIndex = selectedBitSet.previousSetBit(batch.size - 1); + RecordIdentifier lastRecordIdInBatch = + new RecordIdentifier( + originalTransaction != null ? originalTransaction[lastValidIndex] : repeatedOriginalTransaction, + bucket != null ? (int) bucket[lastValidIndex] : (int) repeatedBucket, + rowId != null ? (int) rowId[lastValidIndex] : repeatedRowId); + + // We must iterate over all the delete records, until we find one record with + // deleteRecord >= firstRecordInBatch or until we exhaust all the delete records. + while (deleteRecordKey.compareRow(firstRecordIdInBatch) == -1) { + isDeleteRecordAvailable = deleteRecords.next(deleteRecordKey, deleteRecordValue); + if (!isDeleteRecordAvailable) return; // exhausted all delete records, return. + } + + // If we are here, then we have established that firstRecordInBatch <= deleteRecord. + // Now continue marking records which have been deleted until we reach the end of the batch + // or we exhaust all the delete records. + + int currIndex = firstValidIndex; + RecordIdentifier currRecordIdInBatch = new RecordIdentifier(); + while (isDeleteRecordAvailable && currIndex != -1 && currIndex <= lastValidIndex) { + currRecordIdInBatch.setValues( + (originalTransaction != null) ? originalTransaction[currIndex] : repeatedOriginalTransaction, + (bucket != null) ? (int) bucket[currIndex] : (int) repeatedBucket, + (rowId != null) ? rowId[currIndex] : repeatedRowId); + + if (deleteRecordKey.compareRow(currRecordIdInBatch) == 0) { + // When deleteRecordId == currRecordIdInBatch, this record in the batch has been deleted. + selectedBitSet.clear(currIndex); + currIndex = selectedBitSet.nextSetBit(currIndex + 1); // Move to next valid index. + } else if (deleteRecordKey.compareRow(currRecordIdInBatch) == 1) { + // When deleteRecordId > currRecordIdInBatch, we have to move on to look at the + // next record in the batch. + // But before that, can we short-circuit and skip the entire batch itself + // by checking if the deleteRecordId > lastRecordInBatch? + if (deleteRecordKey.compareRow(lastRecordIdInBatch) == 1) { + return; // Yay! We short-circuited, skip everything remaining in the batch and return. + } + currIndex = selectedBitSet.nextSetBit(currIndex + 1); // Move to next valid index. + } else { + // We have deleteRecordId < currRecordIdInBatch, we must now move on to find + // next the larger deleteRecordId that can possibly match anything in the batch. + isDeleteRecordAvailable = deleteRecords.next(deleteRecordKey, deleteRecordValue); + } + } + } + + @Override + public void close() throws IOException { + this.deleteRecords.close(); + } + } + + public class ColumnizedDeleteEventRegistry implements DeleteEventRegistry { + // ColumnizedDeleteEventRegistry loads all the delete events from all the delete deltas into + // memory. To prevent out-of-memory errors, following is a rough heuristic that prevents + // creation of an object of this class if the total number of delete events exceed this value. + // Roughly it has been set to 10 million delete events per bucket (~160 MB) + public static final int MAX_DELETE_EVENTS_TO_LOAD_IN_MEMORY = 10 * 1000 * 1000; + + public class DeleteRecordKey implements Comparable { + private long originalTransactionId; + private long rowId; + public DeleteRecordKey() { + this.originalTransactionId = -1; + this.rowId = -1; + } + public DeleteRecordKey(long otid, long rowId) { + this.originalTransactionId = otid; + this.rowId = rowId; + } + public void set(long otid, long rowId) { + this.originalTransactionId = otid; + this.rowId = rowId; + } + @Override + public int compareTo(DeleteRecordKey other) { + if (other == null) { + return -1; + } + if (originalTransactionId != other.originalTransactionId) { + return originalTransactionId < other.originalTransactionId ? -1 : 1; + } + if (rowId != other.rowId) { + return rowId < other.rowId ? -1 : 1; + } + return 0; + } + } + + public class DeleteReaderValue { + private VectorizedRowBatch batch; + private RecordReader recordReader; + private int indexPtrInBatch; + private int bucketForSplit; // The bucket value should be same for all the records. + private ValidTxnList validTxnList; + + public DeleteReaderValue(Reader deleteDeltaReader, Reader.Options readerOptions, int bucket, + ValidTxnList validTxnList) throws IOException { + this.recordReader = deleteDeltaReader.rowsOptions(readerOptions); + this.bucketForSplit = bucket; + this.batch = deleteDeltaReader.getSchema().createRowBatch(); + if (!recordReader.nextBatch(batch)) { // Read the first batch. + this.batch = null; // Oh! the first batch itself was null. Close the reader. + } + this.indexPtrInBatch = 0; + this.validTxnList = validTxnList; + } + + public boolean next(DeleteRecordKey deleteRecordKey) throws IOException { + if (batch == null) { + return false; + } + boolean isValidNext = false; + while (!isValidNext) { + if (indexPtrInBatch >= batch.size) { + // We have exhausted our current batch, read the next batch. + if (recordReader.nextBatch(batch)) { + // Whenever we are reading a batch, we must ensure that all the records in the batch + // have the same bucket id as the bucket id of the split. If not, throw exception. + // NOTE: this assertion might not hold, once virtual bucketing is in place. However, + // it should be simple to fix that case. Just replace check for bucket equality with + // a check for valid bucket mapping. + long bucketForRecord = ((LongColumnVector) batch.cols[OrcRecordUpdater.BUCKET]).vector[0]; + if ((batch.size > 1 && !batch.cols[OrcRecordUpdater.BUCKET].isRepeating) + || (bucketForRecord != bucketForSplit)){ + throw new IOException("Corrupted records with different bucket ids " + + "from the containing bucket file found!"); + } + indexPtrInBatch = 0; // After reading the batch, reset the pointer to beginning. + } else { + return false; // no more batches to read, exhausted the reader. + } + } + long originalTransaction = + batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? + ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[0] + : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[indexPtrInBatch]; + long rowId = ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector[indexPtrInBatch]; + long currentTransaction = + batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION].isRepeating ? + ((LongColumnVector) batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector[0] + : ((LongColumnVector) batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector[indexPtrInBatch]; + ++indexPtrInBatch; + if (validTxnList.isTxnValid(currentTransaction)) { + isValidNext = true; + deleteRecordKey.set(originalTransaction, rowId); + } + } + return isValidNext; + } + + public void close() throws IOException { + this.recordReader.close(); + } + } + + private class OtidInterval implements Comparable { + long originalTransactionId; + int fromIndex; // inclusive + int toIndex; // exclusive + + public OtidInterval(long otid, int fromIndex, int toIndex) { + this.originalTransactionId = otid; + this.fromIndex = fromIndex; + this.toIndex = toIndex; + } + + @Override + public int compareTo(OtidInterval other) { + if (originalTransactionId != other.originalTransactionId) { + return originalTransactionId < other.originalTransactionId ? -1 : 1; + } + return 0; + } + } + + private TreeMap sortMerger; + private long rowIds[]; + private OtidInterval otidIntervals[]; + private ValidTxnList validTxnList; + + public ColumnizedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, + Reader.Options readerOptions) throws IOException, DeleteEventsOverflowMemoryException { + // Set the range on the readerOptions to 0 to INTEGER_MAX, because we always want to read + // all the delete delta files. + readerOptions.range(0, Long.MAX_VALUE); + int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucket(); + String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY, + Long.MAX_VALUE + ":"); + this.validTxnList = new ValidReadTxnList(txnString); + this.sortMerger = new TreeMap(); + this.rowIds = null; + this.otidIntervals = null; + + final Path[] deleteDeltaDirs = getDeleteDeltaDirsFromSplit(orcSplit); + if (deleteDeltaDirs.length > 0) { + int totalDeleteEventCount = 0; + for (Path deleteDeltaDir : deleteDeltaDirs) { + Path deleteDeltaFile = AcidUtils.createBucketFile(deleteDeltaDir, bucket); + FileSystem fs = deleteDeltaFile.getFileSystem(conf); + long length = OrcAcidUtils.getLastFlushLength(fs, deleteDeltaFile); + if (length != -1 && fs.exists(deleteDeltaFile)) { + Reader deleteDeltaReader = OrcFile.createReader(deleteDeltaFile, + OrcFile.readerOptions(conf).maxLength(length)); + AcidStats acidStats = OrcAcidUtils.parseAcidStats(deleteDeltaReader); + if (acidStats.deletes == 0) { + continue; // just a safe check to ensure that we are not reading empty delete files. + } + totalDeleteEventCount += acidStats.deletes; + DeleteReaderValue deleteReaderValue = new DeleteReaderValue(deleteDeltaReader, + readerOptions, bucket, validTxnList); + DeleteRecordKey deleteRecordKey = new DeleteRecordKey(); + if (deleteReaderValue.next(deleteRecordKey)) { + sortMerger.put(deleteRecordKey, deleteReaderValue); + } else { + deleteReaderValue.close(); + } + } + } + if (totalDeleteEventCount > 0) { + if (totalDeleteEventCount > MAX_DELETE_EVENTS_TO_LOAD_IN_MEMORY) { + throw new DeleteEventsOverflowMemoryException(); + } + // Initialize the rowId array when we have some delete events. + rowIds = new long[totalDeleteEventCount]; + readAllDeleteEventsFromDeletaDeltas(); + } + } + } + + private void readAllDeleteEventsFromDeletaDeltas() throws IOException { + int distinctOtids = 0; + long lastSeenOtid = -1; + long otids[] = new long[rowIds.length]; + int index = 0; + while (!sortMerger.isEmpty()) { + Entry entry = sortMerger.pollFirstEntry(); + DeleteRecordKey deleteRecordKey = entry.getKey(); + DeleteReaderValue deleteReaderValue = entry.getValue(); + otids[index] = deleteRecordKey.originalTransactionId; + rowIds[index] = deleteRecordKey.rowId; + ++index; + if (lastSeenOtid != deleteRecordKey.originalTransactionId) { + ++distinctOtids; + lastSeenOtid = deleteRecordKey.originalTransactionId; + } + if (deleteReaderValue.next(deleteRecordKey)) { + sortMerger.put(deleteRecordKey, deleteReaderValue); + } else { + deleteReaderValue.close(); // Exhausted reading all records, close the reader. + } + } + // Once we have processed all the delete events and seen all the distinct otids, + // we compress the otids into OtidInterval data structure that records + // the fromIndex(inclusive) and toIndex(exclusive) for each unique otid. + this.otidIntervals = new OtidInterval[distinctOtids]; + lastSeenOtid = otids[0]; + int fromIndex = 0, pos = 0; + for (int i = 1; i < otids.length; ++i) { + if (otids[i] != lastSeenOtid) { + otidIntervals[pos] = new OtidInterval(lastSeenOtid, fromIndex, i); + lastSeenOtid = otids[i]; + fromIndex = i; + ++pos; + } + } + // account for the last distinct otid + otidIntervals[pos] = new OtidInterval(lastSeenOtid, fromIndex, otids.length); + } + + private boolean isDeleted(long otid, long rowId) { + if (otidIntervals == null || rowIds == null) { + return false; + } + // Check if otid is outside the range of all otids present. + if (otid < otidIntervals[0].originalTransactionId + || otid > otidIntervals[otidIntervals.length - 1].originalTransactionId) { + return false; + } + // Create a dummy key for searching the otid in the otid intervals. + OtidInterval key = new OtidInterval(otid, -1, -1); + int pos = Arrays.binarySearch(otidIntervals, key); + if (pos >= 0) { + // Otid with the given value found! Searching now for rowId... + key = otidIntervals[pos]; // Retrieve the actual otidInterval that matched. + // Check if rowId is outside the range of all rowIds present for this otid. + if (rowId < rowIds[key.fromIndex] + || rowId > rowIds[key.toIndex - 1]) { + return false; + } + if (Arrays.binarySearch(rowIds, key.fromIndex, key.toIndex, rowId) >= 0) { + return true; // rowId also found! + } + } + return false; + } + + @Override + public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet) + throws IOException { + if (rowIds == null || otidIntervals == null) { + return; + } + long[] originalTransactionVector = + batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? null + : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector; + long repeatedOriginalTransaction = (originalTransactionVector != null) ? -1 + : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[0]; + + long[] rowIdVector = + ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector; + + for (int setBitIndex = selectedBitSet.nextSetBit(0); + setBitIndex >= 0; + setBitIndex = selectedBitSet.nextSetBit(setBitIndex+1)) { + long otid = originalTransactionVector != null ? originalTransactionVector[setBitIndex] + : repeatedOriginalTransaction ; + long rowId = rowIdVector[setBitIndex]; + if (isDeleted(otid, rowId)) { + selectedBitSet.clear(setBitIndex); + } + } + } + + @Override + public void close() throws IOException { + // do nothing, since the ColumnizedDeleteEventRegistry has already read the data in + // memory and closed the readers. + } + } + + public class DeleteEventsOverflowMemoryException extends Exception { + private static final long serialVersionUID = 1L; + } +} diff --git ql/src/test/queries/clientpositive/acid_vectorization.q ql/src/test/queries/clientpositive/acid_vectorization.q index 832909b..4c37563 100644 --- ql/src/test/queries/clientpositive/acid_vectorization.q +++ ql/src/test/queries/clientpositive/acid_vectorization.q @@ -15,3 +15,15 @@ set hive.vectorized.execution.enabled=true; delete from acid_vectorized where b = 'foo'; set hive.vectorized.execution.enabled=true; select a, b from acid_vectorized order by a, b; + + +CREATE TABLE acid_fast_vectorized(a INT, b STRING) CLUSTERED BY(a) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default'); +insert into table acid_fast_vectorized select cint, cstring1 from alltypesorc where cint is not null order by cint limit 10; +set hive.vectorized.execution.enabled=true; +insert into table acid_fast_vectorized values (1, 'bar'); +set hive.vectorized.execution.enabled=true; +update acid_fast_vectorized set b = 'foo' where b = 'bar'; +set hive.vectorized.execution.enabled=true; +delete from acid_fast_vectorized where b = 'foo'; +set hive.vectorized.execution.enabled=true; +select a, b from acid_fast_vectorized order by a, b; diff --git ql/src/test/results/clientpositive/acid_vectorization.q.out ql/src/test/results/clientpositive/acid_vectorization.q.out index 1792979..24330bd 100644 --- ql/src/test/results/clientpositive/acid_vectorization.q.out +++ ql/src/test/results/clientpositive/acid_vectorization.q.out @@ -60,3 +60,65 @@ POSTHOOK: Input: default@acid_vectorized -1070883071 0ruyd6Y50JpdGRf6HqD -1070551679 iUR3Q -1069736047 k17Am8uPHWk02cEf1jet +PREHOOK: query: CREATE TABLE acid_fast_vectorized(a INT, b STRING) CLUSTERED BY(a) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@acid_fast_vectorized +POSTHOOK: query: CREATE TABLE acid_fast_vectorized(a INT, b STRING) CLUSTERED BY(a) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@acid_fast_vectorized +PREHOOK: query: insert into table acid_fast_vectorized select cint, cstring1 from alltypesorc where cint is not null order by cint limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +PREHOOK: Output: default@acid_fast_vectorized +POSTHOOK: query: insert into table acid_fast_vectorized select cint, cstring1 from alltypesorc where cint is not null order by cint limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +POSTHOOK: Output: default@acid_fast_vectorized +POSTHOOK: Lineage: acid_fast_vectorized.a SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +POSTHOOK: Lineage: acid_fast_vectorized.b SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring1, type:string, comment:null), ] +PREHOOK: query: insert into table acid_fast_vectorized values (1, 'bar') +PREHOOK: type: QUERY +PREHOOK: Input: default@values__tmp__table__2 +PREHOOK: Output: default@acid_fast_vectorized +POSTHOOK: query: insert into table acid_fast_vectorized values (1, 'bar') +POSTHOOK: type: QUERY +POSTHOOK: Input: default@values__tmp__table__2 +POSTHOOK: Output: default@acid_fast_vectorized +POSTHOOK: Lineage: acid_fast_vectorized.a EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: acid_fast_vectorized.b SIMPLE [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +PREHOOK: query: update acid_fast_vectorized set b = 'foo' where b = 'bar' +PREHOOK: type: QUERY +PREHOOK: Input: default@acid_fast_vectorized +PREHOOK: Output: default@acid_fast_vectorized +POSTHOOK: query: update acid_fast_vectorized set b = 'foo' where b = 'bar' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@acid_fast_vectorized +POSTHOOK: Output: default@acid_fast_vectorized +PREHOOK: query: delete from acid_fast_vectorized where b = 'foo' +PREHOOK: type: QUERY +PREHOOK: Input: default@acid_fast_vectorized +PREHOOK: Output: default@acid_fast_vectorized +POSTHOOK: query: delete from acid_fast_vectorized where b = 'foo' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@acid_fast_vectorized +POSTHOOK: Output: default@acid_fast_vectorized +PREHOOK: query: select a, b from acid_fast_vectorized order by a, b +PREHOOK: type: QUERY +PREHOOK: Input: default@acid_fast_vectorized +#### A masked pattern was here #### +POSTHOOK: query: select a, b from acid_fast_vectorized order by a, b +POSTHOOK: type: QUERY +POSTHOOK: Input: default@acid_fast_vectorized +#### A masked pattern was here #### +-1073279343 oj1YrV5Wa +-1073051226 A34p7oRr2WvUJNf +-1072910839 0iqrc5 +-1072081801 dPkN74F7 +-1072076362 2uLyD28144vklju213J1mr +-1071480828 aw724t8c5558x2xneC624 +-1071363017 Anj0oF +-1070883071 0ruyd6Y50JpdGRf6HqD +-1070551679 iUR3Q +-1069736047 k17Am8uPHWk02cEf1jet