diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index c364343528..c9adab5fd3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -1869,7 +1869,10 @@ private static void scheduleSplits(ETLSplitStrategy splitStrategy, Context conte reporter.setStatus(inputSplit.toString()); - boolean isFastVectorizedReaderAvailable = + //TODO: why would inputSplit be something other than OrcSplit? If that is really possible we + //have to retain VectorizedOrcAcidRowReader or make VectorizedOrcAcidRowBatchReader handle + //non orc splits + boolean isFastVectorizedReaderAvailable = vectorMode && VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, inputSplit); if (vectorMode && isFastVectorizedReaderAvailable) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 1e19a911a6..461a5d7525 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -62,21 +62,22 @@ private static final Logger LOG = LoggerFactory.getLogger(OrcRecordUpdater.class); - public static final String ACID_KEY_INDEX_NAME = "hive.acid.key.index"; - public static final String ACID_FORMAT = "_orc_acid_version"; - public static final int ORC_ACID_VERSION = 0; + static final String ACID_KEY_INDEX_NAME = "hive.acid.key.index"; + private static final String ACID_FORMAT = "_orc_acid_version"; + private static final int ORC_ACID_VERSION = 0; final static int INSERT_OPERATION = 0; final static int UPDATE_OPERATION = 1; final static int DELETE_OPERATION = 2; - + //column indexes of corresponding data in storage layer final static int OPERATION = 0; final static int ORIGINAL_TRANSACTION = 1; final static int BUCKET = 2; final static int ROW_ID = 3; final static int CURRENT_TRANSACTION = 4; final static int ROW = 5; + //total number of fields (above) final static int FIELDS = 6; final static int DELTA_BUFFER_SIZE = 16 * 1024; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 1e16f09bc7..3fb7b0e38e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -37,9 +37,13 @@ import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.io.AcidInputFormat; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -57,32 +61,56 @@ * directly read from the base files/insert_only deltas in vectorized row batches. The deleted * rows can then be easily indicated via the 'selected' field of the vectorized row batch. * Refer HIVE-14233 for more details. + * + * + * todo: annotate the plan to indicate which reader is used? + * better yet remove the VectorizedOrcAcidRowReader - then there is just 1 vectorized path + * Also, figure out how to make it not use LLAP cache for original files with ROW__ID projection + * This way everything still vectorizes + * todo: add test that loads a lot of data, like 100k to multiple stripes/row groups + * add some checksum queries to make sure ROW__IDs are correct/distinct etc + * Add a few queries with predicates to make sure ROW__IDs are correct, i.e. that we assign them + * consistently + * Can you just convert 100k table to acid? maybe not - will screw up other tests + * */ public class VectorizedOrcAcidRowBatchReader implements org.apache.hadoop.mapred.RecordReader { private static final Logger LOG = LoggerFactory.getLogger(VectorizedOrcAcidRowBatchReader.class); - public org.apache.hadoop.mapred.RecordReader baseReader; - protected VectorizedRowBatchCtx rbCtx; - protected VectorizedRowBatch vectorizedRowBatchBase; + private org.apache.hadoop.mapred.RecordReader baseReader; + private final VectorizedRowBatchCtx rbCtx; + private VectorizedRowBatch vectorizedRowBatchBase; private long offset; private long length; protected float progress = 0.0f; protected Object[] partitionValues; - protected boolean addPartitionCols = true; - private ValidTxnList validTxnList; - protected DeleteEventRegistry deleteEventRegistry; - protected StructColumnVector recordIdColumnVector; - private org.apache.orc.Reader.Options readerOptions; + private boolean addPartitionCols = true; + private final ValidTxnList validTxnList; + private final DeleteEventRegistry deleteEventRegistry; + private final StructColumnVector recordIdColumnVector; + private final Reader.Options readerOptions; + private final boolean isOriginal; + /** + * something further in the data pipeline wants {@link VirtualColumn#ROWID} + */ + private final boolean rowIdProjected; + //partition root + private final Path rootPath; + /** + * for reading "original" files + */ + private final OffsetAndBucketProperty syntheticProps; + private RecordReader innerReader; - public VectorizedOrcAcidRowBatchReader(InputSplit inputSplit, JobConf conf, + VectorizedOrcAcidRowBatchReader(InputSplit inputSplit, JobConf conf, Reporter reporter) throws IOException { - this.init(inputSplit, conf, reporter, Utilities.getVectorizedRowBatchCtx(conf)); + this(inputSplit, conf, reporter, Utilities.getVectorizedRowBatchCtx(conf)); final Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, (OrcSplit) inputSplit); // Careful with the range here now, we do not want to read the whole base file like deltas. - final RecordReader innerReader = reader.rowsOptions(readerOptions.range(offset, length)); + innerReader = reader.rowsOptions(readerOptions.range(offset, length)); baseReader = new org.apache.hadoop.mapred.RecordReader() { @Override @@ -121,12 +149,13 @@ public float getProgress() throws IOException { public VectorizedOrcAcidRowBatchReader(InputSplit inputSplit, JobConf conf, Reporter reporter, org.apache.hadoop.mapred.RecordReader baseReader, VectorizedRowBatchCtx rbCtx) throws IOException { - this.init(inputSplit, conf, reporter, rbCtx); + this(inputSplit, conf, reporter, rbCtx); this.baseReader = baseReader; + this.innerReader = null; this.vectorizedRowBatchBase = baseReader.createValue(); } - private void init(InputSplit inputSplit, JobConf conf, Reporter reporter, + private VectorizedOrcAcidRowBatchReader(InputSplit inputSplit, JobConf conf, Reporter reporter, VectorizedRowBatchCtx rowBatchCtx) throws IOException { this.rbCtx = rowBatchCtx; final boolean isAcidRead = HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN); @@ -143,8 +172,7 @@ private void init(InputSplit inputSplit, JobConf conf, Reporter reporter, final OrcSplit orcSplit = (OrcSplit) inputSplit; reporter.setStatus(orcSplit.toString()); - readerOptions = OrcInputFormat.createOptionsForReader(conf); - readerOptions = OrcRawRecordMerger.createEventOptions(readerOptions); + readerOptions = OrcRawRecordMerger.createEventOptions(OrcInputFormat.createOptionsForReader(conf)); this.offset = orcSplit.getStart(); this.length = orcSplit.getLength(); @@ -167,46 +195,118 @@ private void init(InputSplit inputSplit, JobConf conf, Reporter reporter, deleteEventReaderOptions.range(0, Long.MAX_VALUE); // Disable SARGs for deleteEventReaders, as SARGs have no meaning. deleteEventReaderOptions.searchArgument(null, null); + DeleteEventRegistry der = null; try { // See if we can load all the delete events from all the delete deltas in memory... - this.deleteEventRegistry = new ColumnizedDeleteEventRegistry(conf, orcSplit, deleteEventReaderOptions); + der = new ColumnizedDeleteEventRegistry(conf, orcSplit, deleteEventReaderOptions); } catch (DeleteEventsOverflowMemoryException e) { // If not, then create a set of hanging readers that do sort-merge to find the next smallest // delete event on-demand. Caps the memory consumption to (some_const * no. of readers). - this.deleteEventRegistry = new SortMergedDeleteEventRegistry(conf, orcSplit, deleteEventReaderOptions); + der = new SortMergedDeleteEventRegistry(conf, orcSplit, deleteEventReaderOptions); } - - recordIdColumnVector = new StructColumnVector(VectorizedRowBatch.DEFAULT_SIZE, null, null, null); + this.deleteEventRegistry = der; + isOriginal = orcSplit.isOriginal(); + if(isOriginal) { + recordIdColumnVector = new StructColumnVector(VectorizedRowBatch.DEFAULT_SIZE, + new LongColumnVector(), new LongColumnVector(), new LongColumnVector()); + } + else { + recordIdColumnVector = new StructColumnVector(VectorizedRowBatch.DEFAULT_SIZE, null, null, null); + } + rowIdProjected = areRowIdsProjected(rbCtx); + rootPath = orcSplit.getRootDir(); + syntheticProps = computeOffsetAndBucket(orcSplit, conf, validTxnList); } /** + * Used for generating synthetic ROW__IDs for reading "original" files + */ + private static final class OffsetAndBucketProperty { + private final long rowIdOffset; + private final int bucketProperty; + private OffsetAndBucketProperty(long rowIdOffset, int bucketProperty) { + this.rowIdOffset = rowIdOffset; + this.bucketProperty = bucketProperty; + } + } + /** + * See {@link #next(NullWritable, VectorizedRowBatch)} fist and + * {@link OrcRawRecordMerger.OriginalReaderPair}. + * When reading a split of an "original" file and we need to decorate data with ROW__ID. + * This requires handling multiple files that are part of the same bucket (tranche for unbucketed + * tables) as a single logical file to number rowids consistently. + * + * This logic is executed per split of every "original" file. The computed result is the same + * for every split form the same file so this could be optimized by moving it to before/during + * splt computation and passing the info in the split. ToDo: file Jira for this + */ + private OffsetAndBucketProperty computeOffsetAndBucket( + OrcSplit split, JobConf conf,ValidTxnList validTxnList) throws IOException { + if(!(split.isOriginal() && (rowIdProjected || !deleteEventRegistry.isEmpty()))) { + return new OffsetAndBucketProperty(0,0); + } + long rowIdOffset = 0; + int bucketId = AcidUtils.parseBaseOrDeltaBucketFilename(split.getPath(), conf).getBucketId(); + int bucketProperty = BucketCodec.V1.encode(new AcidOutputFormat.Options(conf).statementId(0).bucket(bucketId)); + AcidUtils.Directory directoryState = AcidUtils.getAcidState(split.getRootDir(), conf, validTxnList); + for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) { + AcidOutputFormat.Options bucketOptions = + AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf); + if (bucketOptions.getBucketId() != bucketId) { + continue; + } + if (f.getFileStatus().getPath().equals(split.getPath())) { + //'f' is the file whence this split is + break; + } + Reader reader = OrcFile.createReader(f.getFileStatus().getPath(), + OrcFile.readerOptions(conf)); + rowIdOffset += reader.getNumberOfRows(); + } + return new OffsetAndBucketProperty(rowIdOffset, bucketProperty); + } + /** * Returns whether it is possible to create a valid instance of this class for a given split. * @param conf is the job configuration - * @param inputSplit * @return true if it is possible, else false. */ - public static boolean canCreateVectorizedAcidRowBatchReaderOnSplit(JobConf conf, InputSplit inputSplit) { + static boolean canCreateVectorizedAcidRowBatchReaderOnSplit(JobConf conf, InputSplit inputSplit) + throws IOException { if (!(inputSplit instanceof OrcSplit)) { return false; // must be an instance of OrcSplit. } - // First check if we are reading any original files in the split. - // To simplify the vectorization logic, the vectorized acid row batch reader does not handle - // original files for now as they have a different schema than a regular ACID file. final OrcSplit split = (OrcSplit) inputSplit; - if (AcidUtils.getAcidOperationalProperties(conf).isSplitUpdate() && !split.isOriginal()) { - // When split-update is turned on for ACID, a more optimized vectorized batch reader - // can be created. But still only possible when we are *NOT* reading any originals. - return true; + if(Utilities.getVectorizedRowBatchCtx(conf) == null) { + //expected that check if vectorization is possible has already happened + throw new IllegalStateException("Could not create VectorizedRowBatchCtx for " + + split.getRootDir()); } - return false; // no split-update or possibly reading originals! + return true; } + private static boolean areRowIdsProjected(VectorizedRowBatchCtx rbCtx) { + if(rbCtx.getVirtualColumnCount() == 0) { + return false; + } + for(VirtualColumn vc : rbCtx.getNeededVirtualColumns()) { + if(vc == VirtualColumn.ROWID) { + //The query needs ROW__ID: maybe explicitly asked, maybe it's part of + // Update/Delete statement. + //Either way, we need to decorate "original" rows with row__id + return true; + } + } + return false; + } + /** + * ToDo: refactor/merge with {@link OrcInputFormat#getReader(InputSplit, AcidInputFormat.Options)} + */ private static Path[] getDeleteDeltaDirsFromSplit(OrcSplit orcSplit) throws IOException { Path path = orcSplit.getPath(); Path root; if (orcSplit.hasBase()) { if (orcSplit.isOriginal()) { - root = path.getParent(); + root = orcSplit.getRootDir(); } else { root = path.getParent().getParent(); } @@ -216,6 +316,31 @@ public static boolean canCreateVectorizedAcidRowBatchReaderOnSplit(JobConf conf, return AcidUtils.deserializeDeleteDeltas(root, orcSplit.getDeltas()); } + /** + * There are 2 types of schema from the {@link #baseReader} that this handles. In the case + * the data was written to a transactional table from the start, every row is decorated with + * transaction related info and looks like >. + * + * The other case is when data was written to non-transactional table and thus only has the user + * data: . Then this table was then converted to a transactional table but the data + * files are not changed until major compaction. These are the "original" files. + * + * In this case we may need to decorate the outgoing data with transactional column values at + * read time. (It's done somewhat out of band via VectorizedRowBatchCtx - ask Teddy Choi). + * The "otid, writerId, rowid" columns represent {@link RecordIdentifier}. They are assigned + * each time the table is read in a way that needs to proeject {@link VirtualColumn#ROWID}. + * Major compaction will attach these values to each row permanently. + * It's critical that these generated column values are assigned exactly the same way by each + * read of the same row and by the Compactor. + * See {@link org.apache.hadoop.hive.ql.txn.compactor.CompactorMR} and + * {@link OrcRawRecordMerger.OriginalReaderPairToCompact} for the Compactor read path. + * (Longer term should make compactor use this class) + * + * This only decorates original rows with metadata if something above is requesting these values + * or if there are Delete events to apply. + * + * @return false where there is no more data, i.e. {@code value} is empty + */ @Override public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException { try { @@ -257,12 +382,60 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti // When selectedInUse is set to false, everything in the batch is selected. selectedBitSet.set(0, vectorizedRowBatchBase.size, true); } - - // Case 1- find rows which belong to transactions that are not valid. - findRecordsWithInvalidTransactionIds(vectorizedRowBatchBase, selectedBitSet); + ColumnVector[] innerRecordIdColumnVector = vectorizedRowBatchBase.cols; + if(isOriginal) { + /* + * If there are deletes and reading original file, we must produce synthetic ROW_IDs in order + * to see if any deletes apply + */ + if(rowIdProjected || !deleteEventRegistry.isEmpty()) { + if(innerReader == null) { + throw new IllegalStateException(getClass().getName() + " requires " + + org.apache.orc.RecordReader.class + + " to handle original files that require ROW__IDs: " + rootPath); + } + /** + * {@link RecordIdentifier#getTransactionId()} + */ + recordIdColumnVector.fields[0].noNulls = true; + recordIdColumnVector.fields[0].isRepeating = true; + //all "original" is considered written by txnid:0 which committed + ((LongColumnVector)recordIdColumnVector.fields[0]).vector[0] = 0; + /** + * This is {@link RecordIdentifier#getBucketProperty()} + * Also see {@link BucketCodec} + */ + recordIdColumnVector.fields[1].noNulls = true; + recordIdColumnVector.fields[1].isRepeating = true; + ((LongColumnVector)recordIdColumnVector.fields[1]).vector[0] = syntheticProps.bucketProperty; + /** + * {@link RecordIdentifier#getRowId()} + */ + recordIdColumnVector.fields[2].noNulls = true; + recordIdColumnVector.fields[2].isRepeating = false; + long[] rowIdVector = ((LongColumnVector)recordIdColumnVector.fields[2]).vector; + for(int i = 0; i < vectorizedRowBatchBase.size; i++) { + //baseReader.getRowNumber() seems to point at the start of the batch todo: validate + rowIdVector[i] = syntheticProps.rowIdOffset + innerReader.getRowNumber() + i; + } + //Now populate a structure to use to apply delete events + innerRecordIdColumnVector = new ColumnVector[OrcRecordUpdater.FIELDS]; + innerRecordIdColumnVector[OrcRecordUpdater.ORIGINAL_TRANSACTION] = recordIdColumnVector.fields[0]; + innerRecordIdColumnVector[OrcRecordUpdater.BUCKET] = recordIdColumnVector.fields[1]; + innerRecordIdColumnVector[OrcRecordUpdater.ROW_ID] = recordIdColumnVector.fields[2]; + } + } + else { + // Case 1- find rows which belong to transactions that are not valid. + findRecordsWithInvalidTransactionIds(vectorizedRowBatchBase, selectedBitSet); + /** + * All "original" data belongs to txnid:0 and is always valid/committed for every reader + * So do not need to filter wrt {@link validTxnList} + */ + } // Case 2- find rows which have been deleted. - this.deleteEventRegistry.findDeletedRecords(vectorizedRowBatchBase.cols, + this.deleteEventRegistry.findDeletedRecords(innerRecordIdColumnVector, vectorizedRowBatchBase.size, selectedBitSet); if (selectedBitSet.cardinality() == vectorizedRowBatchBase.size) { @@ -283,30 +456,39 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti } } - // Finally, link up the columnVector from the base VectorizedRowBatch to outgoing batch. - // NOTE: We only link up the user columns and not the ACID metadata columns because this - // vectorized code path is not being used in cases of update/delete, when the metadata columns - // would be expected to be passed up the operator pipeline. This is because - // currently the update/delete specifically disable vectorized code paths. - // This happens at ql/exec/Utilities.java::3293 when it checks for mapWork.getVectorMode() - StructColumnVector payloadStruct = (StructColumnVector) vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW]; - // Transfer columnVector objects from base batch to outgoing batch. - System.arraycopy(payloadStruct.fields, 0, value.cols, 0, value.getDataColumnCount()); - if (rbCtx != null) { - recordIdColumnVector.fields[0] = vectorizedRowBatchBase.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]; - recordIdColumnVector.fields[1] = vectorizedRowBatchBase.cols[OrcRecordUpdater.BUCKET]; - recordIdColumnVector.fields[2] = vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW_ID]; + if(isOriginal) { + /*Just copy the payload. {@link recordIdColumnVector} has already been populated*/ + System.arraycopy(vectorizedRowBatchBase.cols, 0, value.cols, 0, + value.getDataColumnCount()); + } + else { + // Finally, link up the columnVector from the base VectorizedRowBatch to outgoing batch. + // NOTE: We only link up the user columns and not the ACID metadata columns because this + // vectorized code path is not being used in cases of update/delete, when the metadata columns + // would be expected to be passed up the operator pipeline. This is because + // currently the update/delete specifically disable vectorized code paths. + // This happens at ql/exec/Utilities.java::3293 when it checks for mapWork.getVectorMode() + StructColumnVector payloadStruct = (StructColumnVector) vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW]; + // Transfer columnVector objects from base batch to outgoing batch. + System.arraycopy(payloadStruct.fields, 0, value.cols, 0, value.getDataColumnCount()); + if(rowIdProjected) { + recordIdColumnVector.fields[0] = vectorizedRowBatchBase.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]; + recordIdColumnVector.fields[1] = vectorizedRowBatchBase.cols[OrcRecordUpdater.BUCKET]; + recordIdColumnVector.fields[2] = vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW_ID]; + } + } + if(rowIdProjected) { rbCtx.setRecordIdColumnVector(recordIdColumnVector); } progress = baseReader.getProgress(); return true; } - protected void findRecordsWithInvalidTransactionIds(VectorizedRowBatch batch, BitSet selectedBitSet) { + private void findRecordsWithInvalidTransactionIds(VectorizedRowBatch batch, BitSet selectedBitSet) { findRecordsWithInvalidTransactionIds(batch.cols, batch.size, selectedBitSet); } - protected void findRecordsWithInvalidTransactionIds(ColumnVector[] cols, int size, BitSet selectedBitSet) { + private void findRecordsWithInvalidTransactionIds(ColumnVector[] cols, int size, BitSet selectedBitSet) { if (cols[OrcRecordUpdater.CURRENT_TRANSACTION].isRepeating) { // When we have repeating values, we can unset the whole bitset at once // if the repeating value is not a valid transaction. @@ -387,6 +569,11 @@ DeleteEventRegistry getDeleteEventRegistry() { * @throws IOException */ public void close() throws IOException; + + /** + * @return {@code true} if no delete events were found + */ + boolean isEmpty(); } /** @@ -400,10 +587,10 @@ DeleteEventRegistry getDeleteEventRegistry() { private OrcRawRecordMerger deleteRecords; private OrcRawRecordMerger.ReaderKey deleteRecordKey; private OrcStruct deleteRecordValue; - private boolean isDeleteRecordAvailable = true; + private Boolean isDeleteRecordAvailable = null; private ValidTxnList validTxnList; - public SortMergedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, Reader.Options readerOptions) + SortMergedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, Reader.Options readerOptions) throws IOException { final Path[] deleteDeltas = getDeleteDeltaDirsFromSplit(orcSplit); if (deleteDeltas.length > 0) { @@ -428,6 +615,13 @@ public SortMergedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, Reader.Opt } @Override + public boolean isEmpty() { + if(isDeleteRecordAvailable == null) { + throw new IllegalStateException("Not yet initialized"); + } + return !isDeleteRecordAvailable; + } + @Override public void findDeletedRecords(ColumnVector[] cols, int size, BitSet selectedBitSet) throws IOException { if (!isDeleteRecordAvailable) { @@ -546,7 +740,7 @@ public void close() throws IOException { */ private int bucketProperty; private long rowId; - public DeleteRecordKey() { + DeleteRecordKey() { this.originalTransactionId = -1; this.rowId = -1; } @@ -596,7 +790,7 @@ public String toString() { private boolean isBucketPropertyRepeating; private final boolean isBucketedTable; - public DeleteReaderValue(Reader deleteDeltaReader, Reader.Options readerOptions, int bucket, + DeleteReaderValue(Reader deleteDeltaReader, Reader.Options readerOptions, int bucket, ValidTxnList validTxnList, boolean isBucketedTable) throws IOException { this.recordReader = deleteDeltaReader.rowsOptions(readerOptions); this.bucketForSplit = bucket; @@ -741,8 +935,9 @@ public int compareTo(CompressedOtid other) { private long rowIds[]; private CompressedOtid compressedOtids[]; private ValidTxnList validTxnList; + private Boolean isEmpty = null; - public ColumnizedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, + ColumnizedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, Reader.Options readerOptions) throws IOException, DeleteEventsOverflowMemoryException { int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucketId(); String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY); @@ -804,6 +999,7 @@ public ColumnizedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, readAllDeleteEventsFromDeleteDeltas(); } } + isEmpty = compressedOtids == null || rowIds == null; } catch(IOException|DeleteEventsOverflowMemoryException e) { close(); // close any open readers, if there was some exception during initialization. throw e; // rethrow the exception so that the caller can handle. @@ -910,7 +1106,13 @@ private boolean isDeleted(long otid, int bucketProperty, long rowId) { } return false; } - + @Override + public boolean isEmpty() { + if(isEmpty == null) { + throw new IllegalStateException("Not yet initialized"); + } + return isEmpty; + } @Override public void findDeletedRecords(ColumnVector[] cols, int size, BitSet selectedBitSet) throws IOException { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java index 885ef83381..90403e1b8a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java @@ -40,7 +40,12 @@ * support tables and partitions stored in the ACID format. It works by using * the non-vectorized ACID reader and moving the data into a vectorized row * batch. + * + * Is there a reason to still have this when we have VectorizedOrcAcidRowBatchReader? + * Once VectorizedOrcAcidRowBatchReader handles isOriginal, there is really no reason + * to have this, right? */ +@Deprecated public class VectorizedOrcAcidRowReader implements org.apache.hadoop.mapred.RecordReader { diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index 39d6b2b414..ced0325e95 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hive.ql; -import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; @@ -37,7 +36,6 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService; -import org.junit.After; import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; @@ -56,9 +54,9 @@ * test AC=true, and AC=false with commit/rollback/exception and test resulting data. * * Can also test, calling commit in AC=true mode, etc, toggling AC... - * - * Tests here are for multi-statement transactions (WIP) and those that don't need to - * run with Acid 2.0 (see subclasses of TestTxnCommands2) + * + * Tests here are for multi-statement transactions (WIP) and others + * Mostly uses bucketed tables */ public class TestTxnCommands extends TxnCommandsBaseForTests { static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommands.class); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java index c827dc4a0e..e310b9d025 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; @@ -522,5 +523,88 @@ public void testCtasBucketed() throws Exception { // Assert.assertEquals("Wrong msg", ErrorMsg.CTAS_PARCOL_COEXISTENCE.getErrorCode(), cpr.getErrorCode()); Assert.assertTrue(cpr.getErrorMessage().contains("CREATE-TABLE-AS-SELECT does not support")); } + @Test + public void testVectorizedWithDelete() throws Exception { + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); + hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none"); + //this enables vectorization of ROW__ID + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ROW_IDENTIFIER_ENABLED, true);//HIVE-12631 + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("create table T(a int, b int) stored as orc tblproperties('transactional'='true')"); + runStatementOnDriver("insert into T(a,b) values(1,2),(3,4)"); + runStatementOnDriver("delete from T where b = 4"); + List rs = runStatementOnDriver("select a, b from T"); + Assert.assertEquals(1, rs.size()); + } + /** + * maybe there is no issue surfacing this if we only have 1 vectroized reader for acid... + * need to handle case with delete events for this*/ + @Test + public void testNonAcidToAcidVectorzied() throws Exception { + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); + hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none"); + //this enables vectorization of ROW__ID + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ROW_IDENTIFIER_ENABLED, true);//HIVE-12631 + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("create table T(a int, b int) stored as orc"); + int[][] values = {{1,2},{2,4},{5,6},{6,8},{9,10}}; + runStatementOnDriver("insert into T(a, b) " + makeValuesClause(values)); + //, 'transactional_properties'='default' + runStatementOnDriver("alter table T SET TBLPROPERTIES ('transactional'='true')"); + //this uses VectorizedOrcAcidRowBatchReader + List rs = runStatementOnDriver("select a from T where b > 6 order by a"); + String[][] expected = { + {"6", ""}, + {"9", ""}, + }; + checkExpected(rs, expected, "After conversion"); + Assert.assertEquals(Integer.toString(6), rs.get(0)); + Assert.assertEquals(Integer.toString(9), rs.get(1)); + + //why isn't PPD working.... - it is working but storage layer doesn't do row level filtering; only row group level + //this uses VectorizedOrcAcidRowBatchReader + rs = runStatementOnDriver("select ROW__ID, a from T where b > 6 order by a"); + String[][] expected1 = { + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}", "6"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}", "9"} + }; + checkExpected(rs, expected1, "After conversion with VC1"); + + //this uses VectorizedOrcAcidRowBatchReader + rs = runStatementOnDriver("select ROW__ID, a from T where b > 0 order by a"); + String[][] expected2 = { + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}", "1"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}", "2"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}", "5"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}", "6"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}", "9"} + }; + checkExpected(rs, expected2, "After conversion with VC2"); + + //doesn't vectorize (uses neither of the Vectorzied Acid readers) + rs = runStatementOnDriver("select ROW__ID, a, INPUT__FILE__NAME from T where b > 6 order by a"); + Assert.assertEquals("", 2, rs.size()); + String[][] expected3 = { + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t6", "warehouse/t/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t9", "warehouse/t/000000_0"} + }; + checkExpected(rs, expected3, "After non-vectorized read"); + Assert.assertEquals(0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912)); + + runStatementOnDriver("update T set b = 17 where a = 1"); + //this should use VectorizedOrcAcidRowReader + rs = runStatementOnDriver("select ROW__ID, b from T where b > 0 order by a"); + String[][] expected4 = { + {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}","17"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}","4"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}","6"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}","8"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}","10"} + }; + checkExpected(rs, expected4, "After conversion with VC4"); + + //this should not vectorize at all + rs = runStatementOnDriver("select ROW__ID, INPUT__FILE__NAME, b from T where b > 0 order by a"); + } }