diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java index 1378a01f44..3113bef14d 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java @@ -107,6 +107,7 @@ private final ReadPipeline rp; private final ExecutorService executor; private final boolean isAcidScan; + private final boolean isAcidFormat; /** * Creates the record reader and checks the input-specific compatibility. @@ -187,10 +188,15 @@ private LlapRecordReader(MapWork mapWork, JobConf job, FileSplit split, this.isVectorized = HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED); if (isAcidScan) { + OrcSplit orcSplit = (OrcSplit) split; this.acidReader = new VectorizedOrcAcidRowBatchReader( - (OrcSplit) split, jobConf, Reporter.NULL, null, rbCtx, true); + orcSplit, jobConf, Reporter.NULL, null, rbCtx, true); + isAcidFormat = !orcSplit.isOriginal(); + } else { + isAcidFormat = false; } - this.includes = new IncludesImpl(tableIncludedCols, isAcidScan, rbCtx, + + this.includes = new IncludesImpl(tableIncludedCols, isAcidScan && isAcidFormat, rbCtx, schema, job, isAcidScan && acidReader.includeAcidColumns()); // Create the consumer of encoded data; it will coordinate decoding to CVBs. @@ -397,7 +403,7 @@ public boolean next(NullWritable key, VectorizedRowBatch vrb) throws IOException counters.incrWallClockCounter(LlapIOCounters.CONSUMER_TIME_NS, firstReturnTime); return false; } - if (isAcidScan) { + if (isAcidFormat) { vrb.selectedInUse = true;//why? if (isVectorized) { // TODO: relying everywhere on the magical constants and columns being together means ACID diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java index 3eadc26a75..9d1f83dd40 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java @@ -30,7 +30,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -273,38 +272,17 @@ public long getColumnarProjectionSize() { @Override public boolean canUseLlapIo(Configuration conf) { - final boolean hasDelta = deltas != null && !deltas.isEmpty(); - final boolean isAcidRead = AcidUtils.isFullAcidScan(conf); - final boolean isVectorized = Utilities.getIsVectorized(conf); - Boolean isSplitUpdate = null; - if (isAcidRead) { - final AcidUtils.AcidOperationalProperties acidOperationalProperties - = AcidUtils.getAcidOperationalProperties(conf); - isSplitUpdate = acidOperationalProperties.isSplitUpdate(); - // TODO: this is brittle. Who said everyone has to upgrade using upgrade process? - assert isSplitUpdate : "should be true in Hive 3.0"; - } - - if (isOriginal) { - if (!isAcidRead && !hasDelta) { - // Original scan only - return true; + if (AcidUtils.isFullAcidScan(conf)) { + if (HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ACID_ENABLED) + && HiveConf.getBoolVar(conf, ConfVars.HIVE_VECTORIZATION_ENABLED)) { + boolean hasDeleteDelta = deltas != null && !deltas.isEmpty(); + return VectorizedOrcAcidRowBatchReader.canUseLlapIoForAcid(this, hasDeleteDelta, conf); + } else { + return false; } } else { - boolean isAcidEnabled = HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ACID_ENABLED); - if (isAcidEnabled && isAcidRead && hasBase && isVectorized) { - if (hasDelta) { - if (isSplitUpdate) { - // Base with delete deltas - return true; - } - } else { - // Base scan only - return true; - } - } + return true; } - return false; } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 374b1058e4..f8d014387b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -272,7 +272,7 @@ private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit orcSplit, Reporte * isOriginal - don't have meta columns - nothing to skip * there no relevant delete events && ROW__ID is not needed higher up * (e.g. this is not a delete statement)*/ - if (!isOriginal && deleteEventRegistry.isEmpty() && !rowIdProjected) { + if (deleteEventRegistry.isEmpty() && !rowIdProjected) { Path parent = orcSplit.getPath().getParent(); while (parent != null && !rootPath.equals(parent)) { if (parent.getName().startsWith(AcidUtils.BASE_PREFIX)) { @@ -749,13 +749,14 @@ dropped by the Reader (I guess because of orc.impl.SchemaEvolution) * @param hasDeletes - if there are any deletes that apply to this split * todo: HIVE-17944 */ - static boolean canUseLlapForAcid(OrcSplit split, boolean hasDeletes, Configuration conf) { + static boolean canUseLlapIoForAcid(OrcSplit split, boolean hasDeletes, Configuration conf) { if(!split.isOriginal()) { return true; } VectorizedRowBatchCtx rbCtx = Utilities.getVectorizedRowBatchCtx(conf); if(rbCtx == null) { - throw new IllegalStateException("Could not create VectorizedRowBatchCtx for " + split.getPath()); + //Could not create VectorizedRowBatchCtx for split.getPath(); + return false; } return !needSyntheticRowIds(split.isOriginal(), hasDeletes, areRowIdsProjected(rbCtx)); } @@ -1266,7 +1267,6 @@ public void close() throws IOException { * A simple wrapper class to hold the (owid, bucketProperty, rowId) pair. */ static class DeleteRecordKey implements Comparable { - private static final DeleteRecordKey otherKey = new DeleteRecordKey(); private long originalWriteId; /** * see {@link BucketCodec} @@ -1288,25 +1288,29 @@ public int compareTo(DeleteRecordKey other) { if (other == null) { return -1; } - if (originalWriteId != other.originalWriteId) { - return originalWriteId < other.originalWriteId ? -1 : 1; - } - if(bucketProperty != other.bucketProperty) { - return bucketProperty < other.bucketProperty ? -1 : 1; - } - if (rowId != other.rowId) { - return rowId < other.rowId ? -1 : 1; - } - return 0; + return compareTo(other.originalWriteId, other.bucketProperty, other.rowId); } + private int compareTo(RecordIdentifier other) { if (other == null) { return -1; } - otherKey.set(other.getWriteId(), other.getBucketProperty(), - other.getRowId()); - return compareTo(otherKey); + return compareTo(other.getWriteId(), other.getBucketProperty(), other.getRowId()); + } + + private int compareTo(long oOriginalWriteId, int oBucketProperty, long oRowId) { + if (originalWriteId != oOriginalWriteId) { + return originalWriteId < oOriginalWriteId ? -1 : 1; + } + if(bucketProperty != oBucketProperty) { + return bucketProperty < oBucketProperty ? -1 : 1; + } + if (rowId != oRowId) { + return rowId < oRowId ? -1 : 1; + } + return 0; } + @Override public String toString() { return "DeleteRecordKey(" + originalWriteId + "," +