diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 779da4fbe8..8b37840da0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -79,62 +79,43 @@ @VisibleForTesting public final static class ReaderKey extends RecordIdentifier{ private long currentTransactionId; - /** - * This is the value from delta file name which may be different from value encode in - * {@link RecordIdentifier#getBucketProperty()} in case of Update/Delete. - * So for Acid 1.0 + multi-stmt txn, if {@code isSameRow() == true}, then it must be an update - * or delete event. For Acid 2.0 + multi-stmt txn, it must be a delete event. - * No 2 Insert events from can ever agree on {@link RecordIdentifier} - */ - private int statementId;//sort on this descending, like currentTransactionId - + private boolean isDeleteEvent = false; ReaderKey() { - this(-1, -1, -1, -1, 0); + this(-1, -1, -1, -1); } - ReaderKey(long originalTransaction, int bucket, long rowId, - long currentTransactionId) { - this(originalTransaction, bucket, rowId, currentTransactionId, 0); - } - /** - * @param statementId - set this to 0 if N/A - */ public ReaderKey(long originalTransaction, int bucket, long rowId, - long currentTransactionId, int statementId) { + long currentTransactionId) { super(originalTransaction, bucket, rowId); this.currentTransactionId = currentTransactionId; - this.statementId = statementId; } @Override public void set(RecordIdentifier other) { super.set(other); currentTransactionId = ((ReaderKey) other).currentTransactionId; - statementId = ((ReaderKey) other).statementId; + isDeleteEvent = ((ReaderKey) other).isDeleteEvent; } public void setValues(long originalTransactionId, int bucket, long rowId, long currentTransactionId, - int statementId) { + boolean isDelete) { setValues(originalTransactionId, bucket, rowId); this.currentTransactionId = currentTransactionId; - this.statementId = statementId; + this.isDeleteEvent = isDelete; } @Override public boolean equals(Object other) { return super.equals(other) && - currentTransactionId == ((ReaderKey) other).currentTransactionId - && statementId == ((ReaderKey) other).statementId//consistent with compareTo() - ; + currentTransactionId == ((ReaderKey) other).currentTransactionId; } @Override public int hashCode() { int result = super.hashCode(); result = 31 * result + (int)(currentTransactionId ^ (currentTransactionId >>> 32)); - result = 31 * result + statementId; return result; } @@ -148,8 +129,12 @@ public int compareTo(RecordIdentifier other) { if (currentTransactionId != oth.currentTransactionId) { return currentTransactionId < oth.currentTransactionId ? +1 : -1; } - if(statementId != oth.statementId) { - return statementId < oth.statementId ? +1 : -1; + if(isDeleteEvent != oth.isDeleteEvent) { + //this is to break a tie if insert + delete of a given row is done within the same + //txn (so that currentTransactionId is the same for both events) and we want the + //delete event to sort 1st since it needs to be sent up so that + // OrcInputFormat.getReader(InputSplit inputSplit, Options options) can skip it. + return isDeleteEvent ? -1 : +1; } } else { return -1; @@ -182,7 +167,7 @@ int compareRow(RecordIdentifier other) { public String toString() { return "{originalTxn: " + getTransactionId() + ", " + bucketToString() + ", row: " + getRowId() + ", currentTxn: " + - currentTransactionId + ", statementId: "+ statementId + "}"; + currentTransactionId + "}"; } } interface ReaderPair { @@ -237,8 +222,6 @@ public String toString() { private final ReaderKey key; private final RecordIdentifier minKey; private final RecordIdentifier maxKey; - @Deprecated//HIVE-18158 - private final int statementId; /** * Create a reader that reads from the first key larger than minKey to any @@ -249,18 +232,16 @@ public String toString() { * @param maxKey only return keys less than or equal to maxKey if it is * non-null * @param options options to provide to read the rows. - * @param statementId id of SQL statement within a transaction * @throws IOException */ @VisibleForTesting ReaderPairAcid(ReaderKey key, Reader reader, RecordIdentifier minKey, RecordIdentifier maxKey, - ReaderImpl.Options options, int statementId) throws IOException { + ReaderImpl.Options options) throws IOException { this.reader = reader; this.key = key; // TODO use stripe statistics to jump over stripes recordReader = reader.rowsOptions(options); - this.statementId = statementId; this.minKey = minKey; this.maxKey = maxKey; // advance the reader until we reach the minimum key @@ -300,7 +281,7 @@ public void next(OrcStruct next) throws IOException { OrcRecordUpdater.getBucket(nextRecord()), OrcRecordUpdater.getRowId(nextRecord()), OrcRecordUpdater.getCurrentTransaction(nextRecord()), - statementId); + OrcRecordUpdater.getOperation(nextRecord()) == OrcRecordUpdater.DELETE_OPERATION); // if this record is larger than maxKey, we need to stop if (getMaxKey() != null && getKey().compareRow(getMaxKey()) > 0) { @@ -353,7 +334,11 @@ public void next(OrcStruct next) throws IOException { * TransactionId to use when generating synthetic ROW_IDs */ final long transactionId; - + /** + * @param statementId - this should be from delta_x_y_stmtId file name. Imagine 2 load data + * statements in 1 txn. The stmtId will be embedded in + * {@link RecordIdentifier#bucketId} via {@link BucketCodec} below + */ OriginalReaderPair(ReaderKey key, int bucketId, Configuration conf, Options mergeOptions, int statementId) throws IOException { this.key = key; @@ -414,7 +399,7 @@ final boolean nextFromCurrentFile(OrcStruct next) throws IOException { nextRecord().setFieldValue(OrcRecordUpdater.ROW, getRecordReader().next(OrcRecordUpdater.getRow(next))); } - key.setValues(transactionId, bucketProperty, nextRowId, transactionId, 0); + key.setValues(transactionId, bucketProperty, nextRowId, transactionId, false); if (getMaxKey() != null && key.compareRow(getMaxKey()) > 0) { if (LOG.isDebugEnabled()) { LOG.debug("key " + key + " > maxkey " + getMaxKey()); @@ -1058,7 +1043,7 @@ public Options clone() { //required (on Tez) that base_x/ doesn't have a file for 'bucket' reader = OrcFile.createReader(bucketPath, OrcFile.readerOptions(conf)); pair = new ReaderPairAcid(key, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(), - eventOptions, 0); + eventOptions); } else { pair = new EmptyReaderPair(); @@ -1068,7 +1053,7 @@ public Options clone() { else { assert reader != null : "no reader? " + mergerOptions.getRootPath(); pair = new ReaderPairAcid(key, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(), - eventOptions, 0); + eventOptions); } } minKey = pair.getMinKey(); @@ -1125,7 +1110,7 @@ public Options clone() { //HIVE-17320: we should compute a SARG to push down min/max key to delete_delta Reader deltaReader = OrcFile.createReader(deltaFile, OrcFile.readerOptions(conf)); ReaderPair deltaPair = new ReaderPairAcid(key, deltaReader, minKey, maxKey, - deltaEventOptions, deltaDir.getStatementId()); + deltaEventOptions); if (deltaPair.nextRecord() != null) { readers.put(key, deltaPair); } @@ -1139,8 +1124,7 @@ public Options clone() { assert length >= 0; Reader deltaReader = OrcFile.createReader(deltaFile, OrcFile.readerOptions(conf).maxLength(length)); //must get statementId from file name since Acid 1.0 doesn't write it into bucketProperty - ReaderPairAcid deltaPair = new ReaderPairAcid(key, deltaReader, minKey, maxKey, - deltaEventOptions, deltaDir.getStatementId()); + ReaderPairAcid deltaPair = new ReaderPairAcid(key, deltaReader, minKey, maxKey, deltaEventOptions); if (deltaPair.nextRecord() != null) { readers.put(key, deltaPair); } @@ -1362,7 +1346,12 @@ public boolean next(RecordIdentifier recordIdentifier, * event. If we did want to pass it along, we'd have to include statementId in the row * returned so that compaction could write it out or make minor minor compaction understand * how to write out delta files in delta_xxx_yyy_stid format. There doesn't seem to be any - * value in this.*/ + * value in this. + * + * todo: this could be simplified since in Acid2 even if you update the same row 2 times in 1 + * txn, it will have different ROW__IDs, i.e. there is no such thing as multiple versions of + * the same physical row. Leave it for now since this Acid reader should go away altogether + * and org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader will be used.*/ boolean isSameRow = prevKey.isSameRow((ReaderKey)recordIdentifier); // if we are collapsing, figure out if this is a new row if (collapse || isSameRow) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java index 96c5916b37..64428f0d2a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java @@ -190,6 +190,14 @@ public boolean hasFooter() { return hasFooter; } + /** + * @return {@code true} if file schema doesn't have Acid metadata columns + * Such file may be in a delta_x_y/ or base_x due to being added via + * "load data" command. It could be at partition|table root due to table having + * been converted from non-acid to acid table. It could even be something like + * "warehouse/t/HIVE_UNION_SUBDIR_15/000000_0" if it was written by an + * "insert into t select ... from A union all select ... from B" + */ public boolean isOriginal() { return isOriginal; } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index d4b29d5c59..8811ef9360 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -214,6 +214,7 @@ private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit orcSplit, Reporte } rowIdProjected = areRowIdsProjected(rbCtx); rootPath = orcSplit.getRootDir(); + //why even compute syntheticProps if !isOriginal??? syntheticProps = computeOffsetAndBucket(orcSplit, conf, validTxnList); } @@ -231,7 +232,7 @@ private OffsetAndBucketProperty(long rowIdOffset, int bucketProperty, long synth } } /** - * See {@link #next(NullWritable, VectorizedRowBatch)} fist and + * See {@link #next(NullWritable, VectorizedRowBatch)} first and * {@link OrcRawRecordMerger.OriginalReaderPair}. * When reading a split of an "original" file and we need to decorate data with ROW__ID. * This requires treating multiple files that are part of the same bucket (tranche for unbucketed @@ -265,6 +266,7 @@ private OffsetAndBucketProperty computeOffsetAndBucket( split.getRootDir(), conf); int bucketId = AcidUtils.parseBaseOrDeltaBucketFilename(split.getPath(), conf).getBucketId(); int bucketProperty = BucketCodec.V1.encode(new AcidOutputFormat.Options(conf) + //statementId is from directory name (or 0 if there is none) .statementId(syntheticTxnInfo.statementId).bucket(bucketId)); AcidUtils.Directory directoryState = AcidUtils.getAcidState( syntheticTxnInfo.folder, conf, validTxnList, false, true); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsWithSplitUpdateAndVectorization.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsWithSplitUpdateAndVectorization.java new file mode 100644 index 0000000000..8ee50b9460 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsWithSplitUpdateAndVectorization.java @@ -0,0 +1,19 @@ +package org.apache.hadoop.hive.ql; + +import org.apache.hadoop.hive.conf.HiveConf; + +/** + * run all the TestTxnCommands tests but vectorized + */ +public class TestTxnCommandsWithSplitUpdateAndVectorization extends TestTxnCommands { + + public TestTxnCommandsWithSplitUpdateAndVectorization() { + super(); + } + + @Override + void initHiveConf() { + super.initHiveConf();; + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java index 12083fd79a..bc6e230bd2 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java +++ ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java @@ -71,8 +71,11 @@ public String toString() { public void setUp() throws Exception { setUpInternal(); } - void setUpInternal() throws Exception { + void initHiveConf() { hiveConf = new HiveConf(this.getClass()); + } + void setUpInternal() throws Exception { + initHiveConf(); hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, getWarehouseDir()); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/TestRecordIdentifier.java ql/src/test/org/apache/hadoop/hive/ql/io/TestRecordIdentifier.java index ccc5f88f79..e9f1e5369b 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/TestRecordIdentifier.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/TestRecordIdentifier.java @@ -54,12 +54,11 @@ public void testHashEquals() throws Exception { int bucketId = ThreadLocalRandom.current().nextInt(1, 512); long rowId = ThreadLocalRandom.current().nextLong(1, 10000000000L); long currTxn = origTxn + ThreadLocalRandom.current().nextLong(0, 10000000000L); - int stmtId = ThreadLocalRandom.current().nextInt(1, 512); RecordIdentifier left = new RecordIdentifier(origTxn, bucketId, rowId); RecordIdentifier right = new RecordIdentifier(origTxn, bucketId, rowId); - OrcRawRecordMerger.ReaderKey rkLeft = new OrcRawRecordMerger.ReaderKey(origTxn, bucketId, rowId, currTxn, stmtId); - OrcRawRecordMerger.ReaderKey rkRight = new OrcRawRecordMerger.ReaderKey(origTxn, bucketId, rowId, currTxn, stmtId); + OrcRawRecordMerger.ReaderKey rkLeft = new OrcRawRecordMerger.ReaderKey(origTxn, bucketId, rowId, currTxn); + OrcRawRecordMerger.ReaderKey rkRight = new OrcRawRecordMerger.ReaderKey(origTxn, bucketId, rowId, currTxn); assertEquals("RecordIdentifier.equals", left, right); assertEquals("RecordIdentifier.hashCode", left.hashCode(), right.hashCode()); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java index bbd040a9df..a409e0f407 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java @@ -78,12 +78,11 @@ public class TestOrcRawRecordMerger { private static final Logger LOG = LoggerFactory.getLogger(TestOrcRawRecordMerger.class); -//todo: why is statementId -1? @Test public void testOrdering() throws Exception { ReaderKey left = new ReaderKey(100, 200, 1200, 300); ReaderKey right = new ReaderKey(); - right.setValues(100, 200, 1000, 200,1); + right.setValues(100, 200, 1000, 200, false); assertTrue(right.compareTo(left) < 0); assertTrue(left.compareTo(right) > 0); assertEquals(false, left.equals(right)); @@ -92,16 +91,16 @@ public void testOrdering() throws Exception { assertEquals(true, right.equals(left)); right.setRowId(2000); assertTrue(right.compareTo(left) > 0); - left.setValues(1, 2, 3, 4,-1); - right.setValues(100, 2, 3, 4,-1); + left.setValues(1, 2, 3, 4, false); + right.setValues(100, 2, 3, 4, false); assertTrue(left.compareTo(right) < 0); assertTrue(right.compareTo(left) > 0); - left.setValues(1, 2, 3, 4,-1); - right.setValues(1, 100, 3, 4,-1); + left.setValues(1, 2, 3, 4, false); + right.setValues(1, 100, 3, 4, false); assertTrue(left.compareTo(right) < 0); assertTrue(right.compareTo(left) > 0); - left.setValues(1, 2, 3, 100,-1); - right.setValues(1, 2, 3, 4,-1); + left.setValues(1, 2, 3, 100, false); + right.setValues(1, 2, 3, 4, false); assertTrue(left.compareTo(right) < 0); assertTrue(right.compareTo(left) > 0); @@ -193,7 +192,7 @@ public void testReaderPair() throws Exception { RecordIdentifier minKey = new RecordIdentifier(10, 20, 30); RecordIdentifier maxKey = new RecordIdentifier(40, 50, 60); ReaderPair pair = new OrcRawRecordMerger.ReaderPairAcid(key, reader, minKey, maxKey, - new Reader.Options(), 0); + new Reader.Options()); RecordReader recordReader = pair.getRecordReader(); assertEquals(10, key.getTransactionId()); assertEquals(20, key.getBucketProperty()); @@ -219,7 +218,7 @@ public void testReaderPairNoMin() throws Exception { Reader reader = createMockReader(); ReaderPair pair = new OrcRawRecordMerger.ReaderPairAcid(key, reader, null, null, - new Reader.Options(), 0); + new Reader.Options()); RecordReader recordReader = pair.getRecordReader(); assertEquals(10, key.getTransactionId()); assertEquals(20, key.getBucketProperty());