diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java index 1378a01f44..3113bef14d 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java @@ -107,6 +107,7 @@ private final ReadPipeline rp; private final ExecutorService executor; private final boolean isAcidScan; + private final boolean isAcidFormat; /** * Creates the record reader and checks the input-specific compatibility. @@ -187,10 +188,15 @@ private LlapRecordReader(MapWork mapWork, JobConf job, FileSplit split, this.isVectorized = HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED); if (isAcidScan) { + OrcSplit orcSplit = (OrcSplit) split; this.acidReader = new VectorizedOrcAcidRowBatchReader( - (OrcSplit) split, jobConf, Reporter.NULL, null, rbCtx, true); + orcSplit, jobConf, Reporter.NULL, null, rbCtx, true); + isAcidFormat = !orcSplit.isOriginal(); + } else { + isAcidFormat = false; } - this.includes = new IncludesImpl(tableIncludedCols, isAcidScan, rbCtx, + + this.includes = new IncludesImpl(tableIncludedCols, isAcidScan && isAcidFormat, rbCtx, schema, job, isAcidScan && acidReader.includeAcidColumns()); // Create the consumer of encoded data; it will coordinate decoding to CVBs. @@ -397,7 +403,7 @@ public boolean next(NullWritable key, VectorizedRowBatch vrb) throws IOException counters.incrWallClockCounter(LlapIOCounters.CONSUMER_TIME_NS, firstReturnTime); return false; } - if (isAcidScan) { + if (isAcidFormat) { vrb.selectedInUse = true;//why? if (isVectorized) { // TODO: relying everywhere on the magical constants and columns being together means ACID diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java index 3eadc26a75..0004c67178 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java @@ -273,38 +273,17 @@ public long getColumnarProjectionSize() { @Override public boolean canUseLlapIo(Configuration conf) { - final boolean hasDelta = deltas != null && !deltas.isEmpty(); - final boolean isAcidRead = AcidUtils.isFullAcidScan(conf); - final boolean isVectorized = Utilities.getIsVectorized(conf); - Boolean isSplitUpdate = null; - if (isAcidRead) { - final AcidUtils.AcidOperationalProperties acidOperationalProperties - = AcidUtils.getAcidOperationalProperties(conf); - isSplitUpdate = acidOperationalProperties.isSplitUpdate(); - // TODO: this is brittle. Who said everyone has to upgrade using upgrade process? - assert isSplitUpdate : "should be true in Hive 3.0"; - } - - if (isOriginal) { - if (!isAcidRead && !hasDelta) { - // Original scan only - return true; + if (AcidUtils.isFullAcidScan(conf)) { + if (HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ACID_ENABLED) + && Utilities.getIsVectorized(conf)) { + boolean hasDeleteDelta = deltas != null && !deltas.isEmpty(); + return VectorizedOrcAcidRowBatchReader.canUseLlapIoForAcid(this, hasDeleteDelta, conf); + } else { + return false; } } else { - boolean isAcidEnabled = HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ACID_ENABLED); - if (isAcidEnabled && isAcidRead && hasBase && isVectorized) { - if (hasDelta) { - if (isSplitUpdate) { - // Base with delete deltas - return true; - } - } else { - // Base scan only - return true; - } - } + return true; } - return false; } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 374b1058e4..2543dc6fc4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -272,7 +272,7 @@ private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit orcSplit, Reporte * isOriginal - don't have meta columns - nothing to skip * there no relevant delete events && ROW__ID is not needed higher up * (e.g. this is not a delete statement)*/ - if (!isOriginal && deleteEventRegistry.isEmpty() && !rowIdProjected) { + if (deleteEventRegistry.isEmpty() && !rowIdProjected) { Path parent = orcSplit.getPath().getParent(); while (parent != null && !rootPath.equals(parent)) { if (parent.getName().startsWith(AcidUtils.BASE_PREFIX)) { @@ -749,7 +749,7 @@ dropped by the Reader (I guess because of orc.impl.SchemaEvolution) * @param hasDeletes - if there are any deletes that apply to this split * todo: HIVE-17944 */ - static boolean canUseLlapForAcid(OrcSplit split, boolean hasDeletes, Configuration conf) { + static boolean canUseLlapIoForAcid(OrcSplit split, boolean hasDeletes, Configuration conf) { if(!split.isOriginal()) { return true; } @@ -906,12 +906,8 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti } } - if (isOriginal) { - /* Just copy the payload. {@link recordIdColumnVector} has already been populated */ - System.arraycopy(vectorizedRowBatchBase.cols, 0, value.cols, 0, value.getDataColumnCount()); - } else { - copyFromBase(value); - } + copyFromBase(value); + if (rowIdProjected) { int ix = rbCtx.findVirtualColumnNum(VirtualColumn.ROWID); value.cols[ix] = recordIdColumnVector; @@ -923,7 +919,11 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti //ColumnVectors for acid meta cols to create a single ColumnVector //representing RecordIdentifier and (optionally) set it in 'value' private void copyFromBase(VectorizedRowBatch value) { - assert !isOriginal; + if (isOriginal) { + /* Just copy the payload. {@link recordIdColumnVector} has already been populated if needed */ + System.arraycopy(vectorizedRowBatchBase.cols, 0, value.cols, 0, value.getDataColumnCount()); + return; + } if (isFlatPayload) { int payloadCol = includeAcidColumns ? OrcRecordUpdater.ROW : 0; // Ignore the struct column and just copy all the following data columns. @@ -1266,7 +1266,6 @@ public void close() throws IOException { * A simple wrapper class to hold the (owid, bucketProperty, rowId) pair. */ static class DeleteRecordKey implements Comparable { - private static final DeleteRecordKey otherKey = new DeleteRecordKey(); private long originalWriteId; /** * see {@link BucketCodec} @@ -1288,25 +1287,29 @@ public int compareTo(DeleteRecordKey other) { if (other == null) { return -1; } - if (originalWriteId != other.originalWriteId) { - return originalWriteId < other.originalWriteId ? -1 : 1; - } - if(bucketProperty != other.bucketProperty) { - return bucketProperty < other.bucketProperty ? -1 : 1; - } - if (rowId != other.rowId) { - return rowId < other.rowId ? -1 : 1; - } - return 0; + return compareTo(other.originalWriteId, other.bucketProperty, other.rowId); } + private int compareTo(RecordIdentifier other) { if (other == null) { return -1; } - otherKey.set(other.getWriteId(), other.getBucketProperty(), - other.getRowId()); - return compareTo(otherKey); + return compareTo(other.getWriteId(), other.getBucketProperty(), other.getRowId()); } + + private int compareTo(long oOriginalWriteId, int oBucketProperty, long oRowId) { + if (originalWriteId != oOriginalWriteId) { + return originalWriteId < oOriginalWriteId ? -1 : 1; + } + if(bucketProperty != oBucketProperty) { + return bucketProperty < oBucketProperty ? -1 : 1; + } + if (rowId != oRowId) { + return rowId < oRowId ? -1 : 1; + } + return 0; + } + @Override public String toString() { return "DeleteRecordKey(" + originalWriteId + "," + diff --git ql/src/test/queries/clientpositive/acid_vectorization_original.q ql/src/test/queries/clientpositive/acid_vectorization_original.q index 1a9fc573c8..c6d790d1df 100644 --- ql/src/test/queries/clientpositive/acid_vectorization_original.q +++ ql/src/test/queries/clientpositive/acid_vectorization_original.q @@ -110,6 +110,10 @@ explain select ROW__ID, count(*) from over10k_orc_bucketed group by ROW__ID havi -- this test that there are no duplicate ROW__IDs so should produce no output -- on LLAP this produces "NULL, 6"; on tez it produces nothing: HIVE-17921 +-- this makes sure that the same code is running on the Ptest and on localhost. The target is: +-- Original split count is 11 grouped split count is 1, for bucket: 1 +set tez.grouping.split-count=1; + select ROW__ID, count(*) from over10k_orc_bucketed group by ROW__ID having count(*) > 1; -- this produces nothing (as it should) select ROW__ID, * from over10k_orc_bucketed where ROW__ID is null;