diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java index a99b266..be92a9a 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.llap.io.api.impl; +import org.apache.hadoop.hive.llap.cache.LowLevelCache; import org.apache.hadoop.hive.ql.io.BatchToRowInputFormat; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -101,6 +102,11 @@ @Override public RecordReader getRecordReader( InputSplit split, JobConf job, Reporter reporter) throws IOException { + return getRecordReader(split, job, reporter, LowLevelCache.Priority.NORMAL); + } + + public RecordReader getRecordReader( + InputSplit split, JobConf job, Reporter reporter, LowLevelCache.Priority priority) throws IOException { RecordReader noLlap = checkLlapSplit(split, job, reporter); if (noLlap != null) return noLlap; @@ -112,7 +118,7 @@ List includedCols = ColumnProjectionUtils.isReadAllColumns(job) ? null : ColumnProjectionUtils.getReadColumnIDs(job); LlapRecordReader rr = new LlapRecordReader(job, fileSplit, includedCols, hostName, cvp, - executor, sourceInputFormat, sourceSerDe, reporter); + executor, sourceInputFormat, sourceSerDe, reporter, priority); if (!rr.init()) { return sourceInputFormat.getRecordReader(split, job, reporter); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java index 7c309a4..2da49aa 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java @@ -23,14 +23,13 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.management.ObjectName; import org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool; +import org.apache.hadoop.hive.llap.io.decode.OrcAcidColumnVectorProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -66,8 +65,6 @@ import org.apache.hadoop.metrics2.util.MBeans; import com.google.common.primitives.Ints; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; public class LlapIoImpl implements LlapIo { @@ -80,13 +77,16 @@ // TODO: later, we may have a map private final ColumnVectorProducer orcCvp, genericCvp; + private final ColumnVectorProducer acidOrcCvp; private final ExecutorService executor; private final LlapDaemonCacheMetrics cacheMetrics; private final LlapDaemonIOMetrics ioMetrics; private ObjectName buddyAllocatorMXBean; private final Allocator allocator; + private final Configuration conf; private LlapIoImpl(Configuration conf) throws IOException { + this.conf = conf; String ioMode = HiveConf.getVar(conf, HiveConf.ConfVars.LLAP_IO_MEMORY_MODE); boolean useLowLevelCache = LlapIoImpl.MODE_CACHE.equalsIgnoreCase(ioMode); LOG.info("Initializing LLAP IO in {} mode", useLowLevelCache ? LlapIoImpl.MODE_CACHE : "none"); @@ -162,6 +162,8 @@ private LlapIoImpl(Configuration conf) throws IOException { metadataCache, cache, bufferManager, conf, cacheMetrics, ioMetrics); this.genericCvp = isEncodeEnabled ? new GenericColumnVectorProducer( serdeCache, bufferManager, conf, cacheMetrics, ioMetrics) : null; + this.acidOrcCvp = new OrcAcidColumnVectorProducer( + metadataCache, cache, bufferManager, conf, cacheMetrics, ioMetrics); LOG.info("LLAP IO initialized"); registerMXBeans(); @@ -177,7 +179,12 @@ private void registerMXBeans() { InputFormat sourceInputFormat, Deserializer sourceSerDe) { ColumnVectorProducer cvp = genericCvp; if (sourceInputFormat instanceof OrcInputFormat) { - cvp = orcCvp; // Special-case for ORC. + OrcInputFormat orcInputFormat = (OrcInputFormat) sourceInputFormat; + if (orcInputFormat.isAcidRead(conf)) { + cvp = acidOrcCvp; // Special case for ACID ORC. + } else { + cvp = orcCvp; // Special case for non-ACID ORC. + } } else if (cvp == null) { LOG.warn("LLAP encode is disabled; cannot use for " + sourceInputFormat.getClass()); return null; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java index d4e14a8..4e625c5 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.ConsumerFeedback; +import org.apache.hadoop.hive.llap.cache.LowLevelCache; import org.apache.hadoop.hive.llap.counters.FragmentCountersMap; import org.apache.hadoop.hive.llap.counters.LlapIOCounters; import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; @@ -97,8 +98,8 @@ public LlapRecordReader(JobConf job, FileSplit split, List includedCols, String hostName, ColumnVectorProducer cvp, ExecutorService executor, - InputFormat sourceInputFormat, Deserializer sourceSerDe, Reporter reporter) - throws IOException, HiveException { + InputFormat sourceInputFormat, Deserializer sourceSerDe, Reporter reporter, + LowLevelCache.Priority priority) throws IOException, HiveException { this.executor = executor; this.jobConf = job; this.split = split; @@ -148,7 +149,7 @@ public LlapRecordReader(JobConf job, FileSplit split, List includedCols // Create the consumer of encoded data; it will coordinate decoding to CVBs. feedback = rp = cvp.createReadPipeline(this, split, columnIds, sarg, columnNames, counters, schema, sourceInputFormat, sourceSerDe, reporter, job, - mapWork.getPathToPartitionInfo()); + mapWork.getPathToPartitionInfo(), priority); evolution = rp.getSchemaEvolution(); includedColumns = rp.getIncludedColumns(); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java index d08dfbb..5786f75 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.llap.cache.LowLevelCache; import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch; import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer; @@ -43,5 +44,5 @@ ReadPipeline createReadPipeline(Consumer consumer, FileSplit List columnIds, SearchArgument sarg, String[] columnNames, QueryFragmentCounters counters, TypeDescription readerSchema, InputFormat sourceInputFormat, Deserializer sourceSerDe, Reporter reporter, - JobConf job, Map parts) throws IOException; + JobConf job, Map parts, LowLevelCache.Priority priority) throws IOException; } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java index 0d9779f..99c3a64 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.llap.cache.BufferUsageManager; +import org.apache.hadoop.hive.llap.cache.LowLevelCache; import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl; import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch; @@ -79,8 +80,8 @@ public GenericColumnVectorProducer(SerDeLowLevelCacheImpl serdeCache, public ReadPipeline createReadPipeline(Consumer consumer, FileSplit split, List columnIds, SearchArgument sarg, String[] columnNames, QueryFragmentCounters counters, TypeDescription schema, InputFormat sourceInputFormat, - Deserializer sourceSerDe, Reporter reporter, JobConf job, Map parts) - throws IOException { + Deserializer sourceSerDe, Reporter reporter, JobConf job, Map parts, + LowLevelCache.Priority priority) throws IOException { cacheMetrics.incrCacheReadRequests(); OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer( consumer, columnIds.size(), false, counters, ioMetrics); @@ -94,7 +95,7 @@ public ReadPipeline createReadPipeline(Consumer consumer, Fil // Note that we pass job config to the record reader, but use global config for LLAP IO. SerDeEncodedDataReader reader = new SerDeEncodedDataReader(cache, bufferManager, conf, split, columnIds, edc, job, reporter, sourceInputFormat, - sourceSerDe, counters, fm.getSchema(), parts); + sourceSerDe, counters, fm.getSchema(), parts, priority); edc.init(reader, reader); if (LlapIoImpl.LOG.isDebugEnabled()) { LlapIoImpl.LOG.debug("Ignoring schema: " + schema); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcAcidColumnVectorProducer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcAcidColumnVectorProducer.java new file mode 100644 index 0000000..24f2b63 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcAcidColumnVectorProducer.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.io.decode; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.llap.cache.BufferUsageManager; +import org.apache.hadoop.hive.llap.cache.LowLevelCache; +import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; +import org.apache.hadoop.hive.llap.io.api.LlapProxy; +import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch; +import org.apache.hadoop.hive.llap.io.api.impl.LlapInputFormat; +import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; +import org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader; +import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics; +import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader; +import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.orc.OrcConf; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * OrcAcidColumnVectorProducer produces a ReadPipeline of ORC ACID data for LLAP. + */ +public class OrcAcidColumnVectorProducer implements ColumnVectorProducer { + + private final OrcMetadataCache metadataCache; + private final LowLevelCache baseCache; + private final BufferUsageManager bufferManager; + private final Configuration conf; + private boolean _skipCorrupt; // TODO: get rid of this + private LlapDaemonCacheMetrics cacheMetrics; + private LlapDaemonIOMetrics ioMetrics; + + public OrcAcidColumnVectorProducer( + OrcMetadataCache metadataCache, LowLevelCache baseCache, BufferUsageManager bufferManager, + Configuration conf, LlapDaemonCacheMetrics cacheMetrics, LlapDaemonIOMetrics ioMetrics) { + LlapIoImpl.LOG.info("Initializing ORC ACID column vector producer"); + + this.metadataCache = metadataCache; + this.baseCache = baseCache; + this.bufferManager = bufferManager; + this.conf = conf; + this._skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf); + this.cacheMetrics = cacheMetrics; + this.ioMetrics = ioMetrics; + } + + @Override + public ReadPipeline createReadPipeline( + Consumer consumer, FileSplit split, List columnIds, + SearchArgument sarg, String[] columnNames, QueryFragmentCounters counters, + TypeDescription readerSchema, InputFormat inputFormat, Deserializer deserializer, + Reporter reporter, JobConf job, Map unused2, LowLevelCache.Priority priority) throws IOException { + cacheMetrics.incrCacheReadRequests(); + OrcEncodedDataConsumer edc; + if (VectorizedOrcAcidRowBatchReader.isAcid(job, split)) { + // If the split is ACID, then use ORC ACID consumer + LlapInputFormat llapInputFormat = (LlapInputFormat) + LlapProxy.getIo().getInputFormat(inputFormat, deserializer); + edc = new OrcAcidEncodedDataConsumer(consumer, columnIds.size(), + _skipCorrupt, counters, ioMetrics, job, split, llapInputFormat, reporter); + } else { + // If the split is non-ACID, then use ORC consumer + edc = new OrcEncodedDataConsumer( + consumer, columnIds.size(), _skipCorrupt, counters, ioMetrics); + } + // Note: we use global conf here and ignore JobConf. + OrcEncodedDataReader reader = new OrcEncodedDataReader(baseCache, bufferManager, + metadataCache, conf, split, columnIds, sarg, columnNames, edc, counters, readerSchema, priority); + edc.init(reader, reader); + return edc; + } +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcAcidEncodedDataConsumer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcAcidEncodedDataConsumer.java new file mode 100644 index 0000000..cd49d75 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcAcidEncodedDataConsumer.java @@ -0,0 +1,284 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.io.decode; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ValidReadTxnList; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.llap.cache.LowLevelCache; +import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; +import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch; +import org.apache.hadoop.hive.llap.io.api.impl.LlapInputFormat; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger; +import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; +import org.apache.hadoop.hive.ql.io.orc.OrcSplit; +import org.apache.hadoop.hive.ql.io.orc.OrcStruct; +import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader; +import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer; +import org.apache.hadoop.hive.ql.io.orc.encoded.Reader; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; + +import java.io.IOException; +import java.util.BitSet; + +import static org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader.findRecordsWithInvalidTransactionIds; +import static org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader.getDeleteDeltaDirsFromSplit; + +/** + * OrcAcidEncodeDataConsumer consumes data after merging the base, delta, and delete delta. + */ +public class OrcAcidEncodedDataConsumer extends OrcEncodedDataConsumer implements ReadPipeline { + private final InnerConsumer innerConsumer = new InnerConsumer(); + private final JobConf conf; + private final FileSplit split; + private final LlapInputFormat llapInputFormat; + private final Reporter reporter; + + public OrcAcidEncodedDataConsumer( + Consumer consumer, int size, boolean skipCorrupt, + QueryFragmentCounters counters, LlapDaemonIOMetrics ioMetrics, + JobConf conf, FileSplit split, LlapInputFormat llapInputFormat, Reporter reporter) throws IOException { + super(consumer, size, skipCorrupt, counters, ioMetrics); + this.split = split; + this.conf = conf; + this.llapInputFormat = llapInputFormat; + this.reporter = reporter; + } + + @Override + protected void decodeBatch(Reader.OrcEncodedColumnBatch batch, + Consumer downstreamConsumer) { + innerConsumer.downstreamConsumer = downstreamConsumer; + super.decodeBatch(batch, innerConsumer); + } + + private class InnerConsumer implements Consumer { + Consumer downstreamConsumer; + VectorizedOrcAcidRowBatchReader.DeleteEventRegistry deleteEventRegistry; + + InnerConsumer() { + // Clone readerOptions for deleteEvents. + Reader.Options readerOptions = OrcInputFormat.createOptionsForReader(conf); + readerOptions = OrcRawRecordMerger.createEventOptions(readerOptions); + Reader.Options deleteEventReaderOptions = readerOptions.clone(); + // Set the range on the deleteEventReaderOptions to 0 to INTEGER_MAX because + // we always want to read all the delete delta files. + deleteEventReaderOptions.range(0, Long.MAX_VALUE); + // Disable SARGs for deleteEventReaders, as SARGs have no meaning. + deleteEventReaderOptions.searchArgument(null, null); + OrcSplit orcSplit = (OrcSplit) split; + + try { + // Create a set of hanging readers that do sort-merge to find the next smallest + // delete event on-demand. Caps the memory consumption to (some_const * no. of readers). + deleteEventRegistry = new SortMergedDeleteEventRegistry( + conf, orcSplit, deleteEventReaderOptions); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void consumeData(ColumnVectorBatch data) { + BitSet selectedBitSet = new BitSet(data.size); + + String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY); + ValidTxnList validTxnList = (txnString == null) ? new ValidReadTxnList() : new ValidReadTxnList(txnString); + + // Case 1- find rows which belong to transactions that are not valid. + findRecordsWithInvalidTransactionIds(data.cols, data.size, selectedBitSet, validTxnList); + + // Case 2- find rows which have been deleted. + try { + deleteEventRegistry.findDeletedRecords(data.cols, data.size, selectedBitSet); + } catch (IOException e) { + throw new RuntimeException(e); + } + + // Select only not deleted ones + int cardinality = selectedBitSet.cardinality(); + if (cardinality != data.size) { + data.size = cardinality; + for (int setBitIndex = selectedBitSet.nextSetBit(0), selectedItr = 0; + setBitIndex >= 0; + setBitIndex = selectedBitSet.nextSetBit(setBitIndex+1), ++selectedItr) { + for (ColumnVector columnVector : data.cols) { + columnVector.setElement(selectedItr, setBitIndex, columnVector); + } + } + } + + downstreamConsumer.consumeData(data); + } + + @Override + public void setDone() { + downstreamConsumer.setDone(); + try { + deleteEventRegistry.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void setError(Throwable t) { + downstreamConsumer.setError(t); + try { + deleteEventRegistry.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + private class SortMergedDeleteEventRegistry + implements VectorizedOrcAcidRowBatchReader.DeleteEventRegistry { + + private OrcRawRecordMerger deleteRecords; + private OrcRawRecordMerger.ReaderKey deleteRecordKey; + private OrcStruct deleteRecordValue; + private boolean isDeleteRecordAvailable = true; + private ValidTxnList validTxnList; + + public SortMergedDeleteEventRegistry( + JobConf conf, OrcSplit orcSplit, + org.apache.hadoop.hive.ql.io.orc.Reader.Options readerOptions) + throws IOException { + + final Path[] deleteDeltas = getDeleteDeltaDirsFromSplit(orcSplit); + if (deleteDeltas.length > 0) { + int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucket(); + String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY); + this.validTxnList = (txnString == null) ? new ValidReadTxnList() : new ValidReadTxnList(txnString); + this.deleteRecords = new OrcEncodedRawRecordMerger(conf, true, null, false, bucket, + validTxnList, readerOptions, deleteDeltas, llapInputFormat, reporter, LowLevelCache.Priority.HIGH); + this.deleteRecordKey = new OrcEncodedRawRecordMerger.ReaderKey(); + this.deleteRecordValue = this.deleteRecords.createValue(); + // Initialize the first value in the delete reader. + this.isDeleteRecordAvailable = this.deleteRecords.next(deleteRecordKey, deleteRecordValue); + } else { + this.isDeleteRecordAvailable = false; + this.deleteRecordKey = null; + this.deleteRecordValue = null; + this.deleteRecords = null; + } + } + + @Override + public void findDeletedRecords(ColumnVector[] cols, int size, BitSet selectedBitSet) + throws IOException { + if (!isDeleteRecordAvailable) { + return; + } + + long[] originalTransaction = + cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? null + : ((LongColumnVector) cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector; + long[] bucket = + cols[OrcRecordUpdater.BUCKET].isRepeating ? null + : ((LongColumnVector) cols[OrcRecordUpdater.BUCKET]).vector; + long[] rowId = + cols[OrcRecordUpdater.ROW_ID].isRepeating ? null + : ((LongColumnVector) cols[OrcRecordUpdater.ROW_ID]).vector; + + // The following repeatedX values will be set, if any of the columns are repeating. + long repeatedOriginalTransaction = (originalTransaction != null) ? -1 + : ((LongColumnVector) cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[0]; + long repeatedBucket = (bucket != null) ? -1 + : ((LongColumnVector) cols[OrcRecordUpdater.BUCKET]).vector[0]; + long repeatedRowId = (rowId != null) ? -1 + : ((LongColumnVector) cols[OrcRecordUpdater.ROW_ID]).vector[0]; + + + // Get the first valid row in the batch still available. + int firstValidIndex = selectedBitSet.nextSetBit(0); + if (firstValidIndex == -1) { + return; // Everything in the batch has already been filtered out. + } + RecordIdentifier firstRecordIdInBatch = + new RecordIdentifier( + originalTransaction != null ? originalTransaction[firstValidIndex] : repeatedOriginalTransaction, + bucket != null ? (int) bucket[firstValidIndex] : (int) repeatedBucket, + rowId != null ? (int) rowId[firstValidIndex] : repeatedRowId); + + // Get the last valid row in the batch still available. + int lastValidIndex = selectedBitSet.previousSetBit(size - 1); + RecordIdentifier lastRecordIdInBatch = + new RecordIdentifier( + originalTransaction != null ? originalTransaction[lastValidIndex] : repeatedOriginalTransaction, + bucket != null ? (int) bucket[lastValidIndex] : (int) repeatedBucket, + rowId != null ? (int) rowId[lastValidIndex] : repeatedRowId); + + // We must iterate over all the delete records, until we find one record with + // deleteRecord >= firstRecordInBatch or until we exhaust all the delete records. + while (deleteRecordKey.compareRow(firstRecordIdInBatch) == -1) { + isDeleteRecordAvailable = deleteRecords.next(deleteRecordKey, deleteRecordValue); + if (!isDeleteRecordAvailable) return; // exhausted all delete records, return. + } + + // If we are here, then we have established that firstRecordInBatch <= deleteRecord. + // Now continue marking records which have been deleted until we reach the end of the batch + // or we exhaust all the delete records. + + int currIndex = firstValidIndex; + RecordIdentifier currRecordIdInBatch = new RecordIdentifier(); + while (isDeleteRecordAvailable && currIndex != -1 && currIndex <= lastValidIndex) { + currRecordIdInBatch.setValues( + (originalTransaction != null) ? originalTransaction[currIndex] : repeatedOriginalTransaction, + (bucket != null) ? (int) bucket[currIndex] : (int) repeatedBucket, + (rowId != null) ? rowId[currIndex] : repeatedRowId); + + if (deleteRecordKey.compareRow(currRecordIdInBatch) == 0) { + // When deleteRecordId == currRecordIdInBatch, this record in the batch has been deleted. + selectedBitSet.clear(currIndex); + currIndex = selectedBitSet.nextSetBit(currIndex + 1); // Move to next valid index. + } else if (deleteRecordKey.compareRow(currRecordIdInBatch) == 1) { + // When deleteRecordId > currRecordIdInBatch, we have to move on to look at the + // next record in the batch. + // But before that, can we short-circuit and skip the entire batch itself + // by checking if the deleteRecordId > lastRecordInBatch? + if (deleteRecordKey.compareRow(lastRecordIdInBatch) == 1) { + return; // Yay! We short-circuited, skip everything remaining in the batch and return. + } + currIndex = selectedBitSet.nextSetBit(currIndex + 1); // Move to next valid index. + } else { + // We have deleteRecordId < currRecordIdInBatch, we must now move on to find + // next the larger deleteRecordId that can possibly match anything in the batch. + isDeleteRecordAvailable = deleteRecords.next(deleteRecordKey, deleteRecordValue); + } + } + } + + @Override + public void close() throws IOException { + if (this.deleteRecords != null) { + this.deleteRecords.close(); + } + } + } +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java index ac031aa..9381614 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java @@ -75,13 +75,15 @@ public ReadPipeline createReadPipeline( Consumer consumer, FileSplit split, List columnIds, SearchArgument sarg, String[] columnNames, QueryFragmentCounters counters, TypeDescription readerSchema, InputFormat unused0, Deserializer unused1, - Reporter reporter, JobConf job, Map unused2) throws IOException { + Reporter reporter, JobConf job, Map unused2, + LowLevelCache.Priority priority) throws IOException { cacheMetrics.incrCacheReadRequests(); OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(consumer, columnIds.size(), _skipCorrupt, counters, ioMetrics); // Note: we use global conf here and ignore JobConf. - OrcEncodedDataReader reader = new OrcEncodedDataReader(lowLevelCache, bufferManager, - metadataCache, conf, split, columnIds, sarg, columnNames, edc, counters, readerSchema); + OrcEncodedDataReader reader = new OrcEncodedDataReader( + lowLevelCache, bufferManager, metadataCache, conf, split, columnIds, + sarg, columnNames, edc, counters, readerSchema, priority); edc.init(reader, reader); return edc; } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedRawRecordMerger.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedRawRecordMerger.java new file mode 100644 index 0000000..6386cc9 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedRawRecordMerger.java @@ -0,0 +1,251 @@ +package org.apache.hadoop.hive.llap.io.decode; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.llap.cache.LowLevelCache; +import org.apache.hadoop.hive.llap.io.api.impl.LlapInputFormat; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger; +import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; +import org.apache.hadoop.hive.ql.io.orc.OrcStruct; +import org.apache.hadoop.hive.ql.io.orc.Reader; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class OrcEncodedRawRecordMerger extends OrcRawRecordMerger { + + private static final Logger LOG = LoggerFactory.getLogger(OrcEncodedRawRecordMerger.class); + + private final LlapInputFormat llapInputFormat; + private final JobConf job; + private final Reporter reporter; + private final LowLevelCache.Priority priority; + + /** + * Create a reader that merge sorts the ACID events together. + * + * @param conf the configuration + * @param collapseEvents should the events on the same row be collapsed + * @param reader + * @param isOriginal is the base file a pre-acid file + * @param bucket the bucket we are reading + * @param validTxnList + * @param options the options to read with + * @param deltaDirectory the list of delta directories to include @throws IOException + */ + public OrcEncodedRawRecordMerger( + JobConf conf, boolean collapseEvents, Reader reader, boolean isOriginal, int bucket, + ValidTxnList validTxnList, Reader.Options options, Path[] deltaDirectory, + LlapInputFormat llapInputFormat, Reporter reporter, LowLevelCache.Priority priority) + throws IOException { + super(conf, collapseEvents, reader, isOriginal, bucket, validTxnList, options, deltaDirectory); + this.llapInputFormat = llapInputFormat; + this.job = conf; + this.reporter = reporter; + this.priority = priority; + } + + @Override + protected IReaderPair createOriginalReaderPair( + ReaderKey key, Reader reader, int bucket, + RecordIdentifier minKey, RecordIdentifier maxKey, + org.apache.orc.Reader.Options options) throws IOException { + return new OriginalEncodedReaderPair(key, reader, bucket, minKey, maxKey, options); + } + + @Override + protected IReaderPair createReaderPair( + ReaderKey key, Reader reader, int bucket, + RecordIdentifier minKey, RecordIdentifier maxKey, + org.apache.orc.Reader.Options options, int statementId) throws IOException { + return new EncodedReaderPair(key, reader, bucket, minKey, maxKey, options, statementId); + } + + private class EncodedReaderPair implements IReaderPair { + OrcStruct nextRecord; + int indexInBatch = 0; + final org.apache.hadoop.hive.ql.io.orc.encoded.Reader reader; + final org.apache.hadoop.mapred.RecordReader recordReader; + final ReaderKey key; + final RecordIdentifier maxKey; + final int bucket; + final int statementId; + final VectorizedRowBatch batch; + final NullWritable nullWritable = NullWritable.get(); + final IntWritable otWritable = new IntWritable(); + final LongWritable bucketWritable = new LongWritable(); + final LongWritable ctWritable = new LongWritable(); + + public EncodedReaderPair( + ReaderKey key, Reader reader, int bucket, RecordIdentifier minKey, RecordIdentifier maxKey, + org.apache.orc.Reader.Options options, int statementId) throws IOException { + this.reader = (org.apache.hadoop.hive.ql.io.orc.encoded.Reader) reader; + this.key = key; + this.maxKey = maxKey; + this.bucket = bucket; + // TODO use stripe statistics to jump over stripes + InputSplit[] splits = llapInputFormat.getSplits(job, 1); + recordReader = llapInputFormat.getRecordReader(splits[0], job, reporter, priority); + + this.statementId = statementId; + // advance the reader until we reach the minimum key + do { + next(nextRecord); + } while (nextRecord != null && + (minKey != null && key.compareRow(minKey) <= 0)); + + batch = new VectorizedRowBatch(getColumns()); + } + + @Override + public void next(OrcStruct next) throws IOException { + if (indexInBatch >= batch.size) { + if (recordReader.next(nullWritable, batch)) { + indexInBatch = 0; + } else { + nextRecord = null; + recordReader.close(); + return; + } + } + + setOrcStruct(next); + nextRecord = next; + // set the key + key.setValues(OrcRecordUpdater.getOriginalTransaction(nextRecord), + OrcRecordUpdater.getBucket(nextRecord), + OrcRecordUpdater.getRowId(nextRecord), + OrcRecordUpdater.getCurrentTransaction(nextRecord), + statementId); + + // if this record is larger than maxKey, we need to stop + if (maxKey != null && key.compareRow(maxKey) > 0) { + LOG.debug("key " + key + " > maxkey " + maxKey); + nextRecord = null; + recordReader.close(); + } + + indexInBatch++; + } + + void setOrcStruct(OrcStruct next) { + otWritable.set((int) ((LongColumnVector) + batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[indexInBatch]); + bucketWritable.set(((LongColumnVector) + batch.cols[OrcRecordUpdater.BUCKET]).vector[indexInBatch]); + ctWritable.set(((LongColumnVector) + batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector[indexInBatch]); + + next.setFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION, otWritable); + next.setFieldValue(OrcRecordUpdater.BUCKET, bucketWritable); + next.setFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION, ctWritable); + } + + @Override + public void close() throws IOException { + recordReader.close(); + } + + @Override + public OrcStruct getNextRecord() { + return nextRecord; + } + + @Override + public ReaderKey getKey() { + return key; + } + + @Override + public int getColumns() { + return reader.getTypes().get(OrcRecordUpdater.ROW + 1).getSubtypesCount(); + } + + @Override + public float getProgress() throws IOException { + return recordReader.getProgress(); + } + } + + private class OriginalEncodedReaderPair extends EncodedReaderPair implements IReaderPair { + public OriginalEncodedReaderPair( + ReaderKey key, Reader reader, int bucket, RecordIdentifier minKey, RecordIdentifier maxKey, + org.apache.orc.Reader.Options options) throws IOException { + super(key, reader, bucket, minKey, maxKey, options, 0); + } + + @Override + public void next(OrcStruct next) throws IOException { + if (indexInBatch >= batch.size) { + if (recordReader.next(nullWritable, batch)) { + indexInBatch = 0; + } else { + nextRecord = null; + recordReader.close(); + return; + } + } + + long nextRowId = recordReader.getPos(); + // have to do initialization here, because the super's constructor + // calls next and thus we need to initialize before our constructor + // runs + if (next == null) { + nextRecord = new OrcStruct(OrcRecordUpdater.FIELDS); + IntWritable operation = + new IntWritable(OrcRecordUpdater.INSERT_OPERATION); + nextRecord.setFieldValue(OrcRecordUpdater.OPERATION, operation); + nextRecord.setFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION, + new LongWritable(0)); + nextRecord.setFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION, + new LongWritable(0)); + nextRecord.setFieldValue(OrcRecordUpdater.BUCKET, + new IntWritable(bucket)); + nextRecord.setFieldValue(OrcRecordUpdater.ROW_ID, + new LongWritable(nextRowId)); + next = new OrcStruct(OrcRecordUpdater.FIELDS); + setOrcStruct(next); + nextRecord.setFieldValue(OrcRecordUpdater.ROW, next); + } else { + nextRecord = next; + ((IntWritable) next.getFieldValue(OrcRecordUpdater.OPERATION)) + .set(OrcRecordUpdater.INSERT_OPERATION); + ((LongWritable) next.getFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION)) + .set(0); + ((IntWritable) next.getFieldValue(OrcRecordUpdater.BUCKET)) + .set(bucket); + ((LongWritable) next.getFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION)) + .set(0); + ((LongWritable) next.getFieldValue(OrcRecordUpdater.ROW_ID)) + .set(nextRowId); + setOrcStruct(next); + nextRecord.setFieldValue(OrcRecordUpdater.ROW, OrcRecordUpdater.getRow(next)); + } + key.setValues(0L, bucket, nextRowId, 0L, 0); + if (maxKey != null && key.compareRow(maxKey) > 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("key " + key + " > maxkey " + maxKey); + } + nextRecord = null; + recordReader.close(); + } + + indexInBatch++; + } + + @Override + public int getColumns() { + return reader.getTypes().get(0).getSubtypesCount(); + } + } +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index 6554fa2..e41b122 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -145,6 +145,7 @@ public void resetBeforeOffer(OrcEncodedColumnBatch t) { private final QueryFragmentCounters counters; private final UserGroupInformation ugi; private final SchemaEvolution evolution; + private final Priority priority; // Read state. private int stripeIxFrom; @@ -169,7 +170,8 @@ public void resetBeforeOffer(OrcEncodedColumnBatch t) { public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager bufferManager, OrcMetadataCache metadataCache, Configuration conf, FileSplit split, List columnIds, SearchArgument sarg, String[] columnNames, OrcEncodedDataConsumer consumer, - QueryFragmentCounters counters, TypeDescription readerSchema) throws IOException { + QueryFragmentCounters counters, TypeDescription readerSchema, Priority priority) + throws IOException { this.lowLevelCache = lowLevelCache; this.metadataCache = metadataCache; this.bufferManager = bufferManager; @@ -183,6 +185,7 @@ public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager buff this.columnNames = columnNames; this.consumer = consumer; this.counters = counters; + this.priority = priority; try { this.ugi = UserGroupInformation.getCurrentUser(); } catch (IOException e) { @@ -846,7 +849,7 @@ public DiskRangeList getFileData(Object fileKey, DiskRangeList range, MemoryBuffer[] data, long baseOffset) { if (data != null) { return lowLevelCache.putFileData( - fileKey, ranges, data, baseOffset, Priority.NORMAL, counters); + fileKey, ranges, data, baseOffset, priority, counters); } else if (metadataCache != null) { metadataCache.putIncompleteCbs(fileKey, ranges, baseOffset); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java index 221c99e..289f74b 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java @@ -166,6 +166,7 @@ public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end private final int allocSize; private final int targetSliceRowCount; private final boolean isLrrEnabled; + private final Priority priority; private final boolean[] writerIncludes; private FileReaderYieldReturn currentFileRead = null; @@ -182,7 +183,8 @@ public SerDeEncodedDataReader(SerDeLowLevelCacheImpl cache, BufferUsageManager bufferManager, Configuration daemonConf, FileSplit split, List columnIds, OrcEncodedDataConsumer consumer, JobConf jobConf, Reporter reporter, InputFormat sourceInputFormat, Deserializer sourceSerDe, - QueryFragmentCounters counters, TypeDescription schema, Map parts) + QueryFragmentCounters counters, TypeDescription schema, Map parts, + Priority priority) throws IOException { this.cache = cache; this.bufferManager = bufferManager; @@ -198,6 +200,7 @@ public SerDeEncodedDataReader(SerDeLowLevelCacheImpl cache, this.targetSliceRowCount = HiveConf.getIntVar( sliceConf, ConfVars.LLAP_IO_ENCODE_SLICE_ROW_COUNT); this.isLrrEnabled = HiveConf.getBoolVar(sliceConf, ConfVars.LLAP_IO_ENCODE_SLICE_LRR); + this.priority = priority; if (this.columnIds != null) { Collections.sort(this.columnIds); } @@ -725,7 +728,7 @@ public void cacheFileData(StripeData sd) { } FileData fd = new FileData(fileKey, encodings.length); fd.addStripe(sd); - cache.putFileData(fd, Priority.NORMAL, counters); + cache.putFileData(fd, priority, counters); } else { lockAllBuffers(sd); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 369584b..c6dd270 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -210,6 +210,13 @@ public boolean isAcidRead(Configuration conf, InputSplit inputSplit) { return HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN); } + public boolean isAcidRead(Configuration conf) { + /* + * Fallback for the case when OrcSplit flags do not contain hasBase and deltas + */ + return HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN); + } + private static class OrcRecordReader implements org.apache.hadoop.mapred.RecordReader, StatsProvidingRecordReader { @@ -1844,8 +1851,7 @@ private static void scheduleSplits(ETLSplitStrategy splitStrategy, Context conte reporter.setStatus(inputSplit.toString()); - boolean isFastVectorizedReaderAvailable = - VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, inputSplit); + boolean isFastVectorizedReaderAvailable = VectorizedOrcAcidRowBatchReader.isAcid(conf, inputSplit); if (vectorMode && isFastVectorizedReaderAvailable) { // Faster vectorized ACID row batch reader is available that avoids row-by-row stitching. @@ -2030,7 +2036,7 @@ static Path findOriginalBucket(FileSystem fs, directory); } - static Reader.Options createOptionsForReader(Configuration conf) { + public static Reader.Options createOptionsForReader(Configuration conf) { /** * Do we have schema on read in the configuration variables? */ diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 95b8806..3d5dd20 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.TreeMap; +import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedOrcFile; import org.apache.orc.OrcUtils; import org.apache.orc.StripeInformation; import org.apache.orc.TypeDescription; @@ -53,7 +54,6 @@ private final Configuration conf; private final boolean collapse; - private final RecordReader baseReader; private final ObjectInspector objectInspector; private final long offset; private final long length; @@ -177,13 +177,23 @@ public String toString() { } } + protected interface IReaderPair { + void next(OrcStruct next) throws IOException; + void close() throws IOException; + + OrcStruct getNextRecord(); + ReaderKey getKey(); + int getColumns(); + float getProgress() throws IOException; + } + /** * A reader and the next record from that reader. The code reads ahead so that * we can return the lowest ReaderKey from each of the readers. Thus, the * next available row is nextRecord and only following records are still in * the reader. */ - static class ReaderPair { + static class ReaderPair implements IReaderPair { OrcStruct nextRecord; final Reader reader; final RecordReader recordReader; @@ -222,7 +232,8 @@ public String toString() { (minKey != null && key.compareRow(minKey) <= 0)); } - void next(OrcStruct next) throws IOException { + @Override + public void next(OrcStruct next) throws IOException { if (recordReader.hasNext()) { nextRecord = (OrcStruct) recordReader.next(next); // set the key @@ -244,9 +255,30 @@ void next(OrcStruct next) throws IOException { } } - int getColumns() { + @Override + public void close() throws IOException { + recordReader.close(); + } + + @Override + public OrcStruct getNextRecord() { + return nextRecord; + } + + @Override + public ReaderKey getKey() { + return key; + } + + @Override + public int getColumns() { return reader.getTypes().get(OrcRecordUpdater.ROW + 1).getSubtypesCount(); } + + @Override + public float getProgress() throws IOException { + return recordReader.getProgress(); + } } /** @@ -254,7 +286,7 @@ int getColumns() { * It wraps the underlying reader's row with an ACID event object and * makes the relevant translations. */ - static final class OriginalReaderPair extends ReaderPair { + static final class OriginalReaderPair extends ReaderPair implements IReaderPair { OriginalReaderPair(ReaderKey key, Reader reader, int bucket, RecordIdentifier minKey, RecordIdentifier maxKey, Reader.Options options) throws IOException { @@ -262,7 +294,7 @@ int getColumns() { } @Override - void next(OrcStruct next) throws IOException { + public void next(OrcStruct next) throws IOException { if (recordReader.hasNext()) { long nextRowId = recordReader.getRowNumber(); // have to do initialization here, because the super's constructor @@ -313,16 +345,16 @@ void next(OrcStruct next) throws IOException { } @Override - int getColumns() { + public int getColumns() { return reader.getTypes().get(0).getSubtypesCount(); } } - private final TreeMap readers = - new TreeMap(); + private final TreeMap readers = + new TreeMap(); // The reader that currently has the lowest key. - private ReaderPair primary; + private IReaderPair primary; // The key of the next lowest reader. private ReaderKey secondaryKey = null; @@ -399,7 +431,7 @@ private void discoverKeyBounds(Reader reader, * @param options options for the row reader * @return a cloned options object that is modified for the event reader */ - static Reader.Options createEventOptions(Reader.Options options) { + public static Reader.Options createEventOptions(Reader.Options options) { Reader.Options result = options.clone(); result.range(options.getOffset(), Long.MAX_VALUE); result.include(options.getInclude()); @@ -426,7 +458,7 @@ private void discoverKeyBounds(Reader reader, * @param deltaDirectory the list of delta directories to include * @throws IOException */ - OrcRawRecordMerger(Configuration conf, + protected OrcRawRecordMerger(Configuration conf, boolean collapseEvents, Reader reader, boolean isOriginal, @@ -448,9 +480,7 @@ private void discoverKeyBounds(Reader reader, // modify the options to reflect the event instead of the base row Reader.Options eventOptions = createEventOptions(options); - if (reader == null) { - baseReader = null; - } else { + if (reader != null) { // find the min/max based on the offset and length if (isOriginal) { @@ -460,22 +490,19 @@ private void discoverKeyBounds(Reader reader, } LOG.info("min key = " + minKey + ", max key = " + maxKey); // use the min/max instead of the byte range - ReaderPair pair; + IReaderPair pair; ReaderKey key = new ReaderKey(); if (isOriginal) { options = options.clone(); - pair = new OriginalReaderPair(key, reader, bucket, minKey, maxKey, - options); + pair = createOriginalReaderPair(key, reader, bucket, minKey, maxKey, options); } else { - pair = new ReaderPair(key, reader, bucket, minKey, maxKey, - eventOptions, 0); + pair = createReaderPair(key, reader, bucket, minKey, maxKey, eventOptions, 0); } // if there is at least one record, put it in the map - if (pair.nextRecord != null) { + if (pair.getNextRecord() != null) { readers.put(key, pair); } - baseReader = pair.recordReader; } // we always want to read all of the deltas @@ -488,7 +515,7 @@ private void discoverKeyBounds(Reader reader, FileSystem fs = deltaFile.getFileSystem(conf); long length = OrcAcidUtils.getLastFlushLength(fs, deltaFile); if (length != -1 && fs.exists(deltaFile)) { - Reader deltaReader = OrcFile.createReader(deltaFile, + Reader deltaReader = EncodedOrcFile.createReader(deltaFile, OrcFile.readerOptions(conf).maxLength(length)); Reader.Options deltaEventOptions = null; if(eventOptions.getSearchArgument() != null) { @@ -501,10 +528,10 @@ private void discoverKeyBounds(Reader reader, deltaEventOptions = eventOptions.clone().searchArgument(null, null); } } - ReaderPair deltaPair; - deltaPair = new ReaderPair(key, deltaReader, bucket, minKey, + IReaderPair deltaPair; + deltaPair = createReaderPair(key, deltaReader, bucket, minKey, maxKey, deltaEventOptions != null ? deltaEventOptions : eventOptions, deltaDir.getStatementId()); - if (deltaPair.nextRecord != null) { + if (deltaPair.getNextRecord() != null) { readers.put(key, deltaPair); } } @@ -512,7 +539,7 @@ private void discoverKeyBounds(Reader reader, } // get the first record - Map.Entry entry = readers.pollFirstEntry(); + Map.Entry entry = readers.pollFirstEntry(); if (entry == null) { columns = 0; primary = null; @@ -528,6 +555,20 @@ private void discoverKeyBounds(Reader reader, } } + protected IReaderPair createOriginalReaderPair( + ReaderKey key, Reader reader, int bucket, + RecordIdentifier minKey, RecordIdentifier maxKey, + org.apache.orc.Reader.Options options) throws IOException { + return new OriginalReaderPair(key, reader, bucket, minKey, maxKey, options); + } + + protected IReaderPair createReaderPair( + ReaderKey key, Reader reader, int bucket, + RecordIdentifier minKey, RecordIdentifier maxKey, + org.apache.orc.Reader.Options options, int statementId) throws IOException { + return new ReaderPair(key, reader, bucket, minKey, maxKey, options, statementId); + } + @VisibleForTesting RecordIdentifier getMinKey() { return minKey; @@ -539,12 +580,12 @@ RecordIdentifier getMaxKey() { } @VisibleForTesting - ReaderPair getCurrentReader() { + IReaderPair getCurrentReader() { return primary; } @VisibleForTesting - Map getOtherReaders() { + Map getOtherReaders() { return readers; } @@ -555,8 +596,8 @@ public boolean next(RecordIdentifier recordIdentifier, while (keysSame && primary != null) { // The primary's nextRecord is the next value to return - OrcStruct current = primary.nextRecord; - recordIdentifier.set(primary.key); + OrcStruct current = primary.getNextRecord(); + recordIdentifier.set(primary.getKey()); // Advance the primary reader to the next record primary.next(extraValue); @@ -567,16 +608,16 @@ public boolean next(RecordIdentifier recordIdentifier, // now that the primary reader has advanced, we need to see if we // continue to read it or move to the secondary. - if (primary.nextRecord == null || - primary.key.compareTo(secondaryKey) > 0) { + if (primary.getNextRecord() == null || + primary.getKey().compareTo(secondaryKey) > 0) { // if the primary isn't done, push it back into the readers - if (primary.nextRecord != null) { - readers.put(primary.key, primary); + if (primary.getNextRecord() != null) { + readers.put(primary.getKey(), primary); } // update primary and secondaryKey - Map.Entry entry = readers.pollFirstEntry(); + Map.Entry entry = readers.pollFirstEntry(); if (entry != null) { primary = entry.getValue(); if (readers.isEmpty()) { @@ -639,16 +680,16 @@ public long getPos() throws IOException { @Override public void close() throws IOException { if (primary != null) { - primary.recordReader.close(); + primary.close(); } - for(ReaderPair pair: readers.values()) { - pair.recordReader.close(); + for(IReaderPair pair: readers.values()) { + pair.close(); } } @Override public float getProgress() throws IOException { - return baseReader == null ? 1 : baseReader.getProgress(); + return primary == null ? 1 : primary.getProgress(); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 65f4a24..b86547b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -62,17 +62,17 @@ public static final int ORC_ACID_VERSION = 0; - final static int INSERT_OPERATION = 0; - final static int UPDATE_OPERATION = 1; - final static int DELETE_OPERATION = 2; - - final static int OPERATION = 0; - final static int ORIGINAL_TRANSACTION = 1; - final static int BUCKET = 2; - final static int ROW_ID = 3; - final static int CURRENT_TRANSACTION = 4; - final static int ROW = 5; - final static int FIELDS = 6; + public final static int INSERT_OPERATION = 0; + public final static int UPDATE_OPERATION = 1; + public final static int DELETE_OPERATION = 2; + + public final static int OPERATION = 0; + public final static int ORIGINAL_TRANSACTION = 1; + public final static int BUCKET = 2; + public final static int ROW_ID = 3; + public final static int CURRENT_TRANSACTION = 4; + public final static int ROW = 5; + public final static int FIELDS = 6; final static int DELTA_BUFFER_SIZE = 16 * 1024; final static long DELTA_STRIPE_SIZE = 16 * 1024 * 1024; @@ -116,23 +116,23 @@ static int getOperation(OrcStruct struct) { return ((IntWritable) struct.getFieldValue(OPERATION)).get(); } - static long getCurrentTransaction(OrcStruct struct) { + public static long getCurrentTransaction(OrcStruct struct) { return ((LongWritable) struct.getFieldValue(CURRENT_TRANSACTION)).get(); } - static long getOriginalTransaction(OrcStruct struct) { + public static long getOriginalTransaction(OrcStruct struct) { return ((LongWritable) struct.getFieldValue(ORIGINAL_TRANSACTION)).get(); } - static int getBucket(OrcStruct struct) { + public static int getBucket(OrcStruct struct) { return ((IntWritable) struct.getFieldValue(BUCKET)).get(); } - static long getRowId(OrcStruct struct) { + public static long getRowId(OrcStruct struct) { return ((LongWritable) struct.getFieldValue(ROW_ID)).get(); } - static OrcStruct getRow(OrcStruct struct) { + public static OrcStruct getRow(OrcStruct struct) { if (struct == null) { return null; } else { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java index d61b24b..eb6231f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java @@ -218,7 +218,7 @@ public long getColumnarProjectionSize() { @Override public boolean canUseLlapIo() { - return isOriginal && (deltas == null || deltas.isEmpty()); + return isOriginal; } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java index d48cadd..606c55b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java @@ -48,15 +48,15 @@ private Object[] fields; - OrcStruct(int children) { + public OrcStruct(int children) { fields = new Object[children]; } - Object getFieldValue(int fieldIndex) { + public Object getFieldValue(int fieldIndex) { return fields[fieldIndex]; } - void setFieldValue(int fieldIndex, Object value) { + public void setFieldValue(int fieldIndex, Object value) { fields[fieldIndex] = value; } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 75c7680..3eba09b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.Arrays; import java.util.BitSet; -import java.util.List; import java.util.Map.Entry; import java.util.TreeMap; @@ -32,6 +31,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; @@ -42,15 +42,13 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; -import org.apache.orc.OrcProto; -import org.apache.orc.OrcUtils; -import org.apache.orc.TypeDescription; + +import com.google.common.annotations.VisibleForTesting; import org.apache.orc.impl.AcidStats; import org.apache.orc.impl.OrcAcidUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; /** * A fast vectorized batch reader class for ACID when split-update behavior is enabled. * When split-update is turned on, row-by-row stitching could be avoided to create the final @@ -147,7 +145,7 @@ public VectorizedOrcAcidRowBatchReader(InputSplit inputSplit, JobConf conf, * @param inputSplit * @return true if it is possible, else false. */ - public static boolean canCreateVectorizedAcidRowBatchReaderOnSplit(JobConf conf, InputSplit inputSplit) { + public static boolean isAcid(JobConf conf, InputSplit inputSplit) { if (!(inputSplit instanceof OrcSplit)) { return false; // must be an instance of OrcSplit. } @@ -163,7 +161,7 @@ public static boolean canCreateVectorizedAcidRowBatchReaderOnSplit(JobConf conf, return false; // no split-update or possibly reading originals! } - private static Path[] getDeleteDeltaDirsFromSplit(OrcSplit orcSplit) throws IOException { + public static Path[] getDeleteDeltaDirsFromSplit(OrcSplit orcSplit) throws IOException { Path path = orcSplit.getPath(); Path root; if (orcSplit.hasBase()) { @@ -221,10 +219,12 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti } // Case 1- find rows which belong to transactions that are not valid. - findRecordsWithInvalidTransactionIds(vectorizedRowBatchBase, selectedBitSet); + findRecordsWithInvalidTransactionIds( + vectorizedRowBatchBase.cols, vectorizedRowBatchBase.size, selectedBitSet, validTxnList); // Case 2- find rows which have been deleted. - this.deleteEventRegistry.findDeletedRecords(vectorizedRowBatchBase, selectedBitSet); + this.deleteEventRegistry.findDeletedRecords( + vectorizedRowBatchBase.cols, vectorizedRowBatchBase.size, selectedBitSet); if (selectedBitSet.cardinality() == vectorizedRowBatchBase.size) { // None of the cases above matched and everything is selected. Hence, we will use the @@ -257,19 +257,20 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti return true; } - private void findRecordsWithInvalidTransactionIds(VectorizedRowBatch batch, BitSet selectedBitSet) { - if (batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION].isRepeating) { + public static void findRecordsWithInvalidTransactionIds( + ColumnVector[] cols, int size, BitSet selectedBitSet, ValidTxnList validTxnList) { + if (cols[OrcRecordUpdater.CURRENT_TRANSACTION].isRepeating) { // When we have repeating values, we can unset the whole bitset at once // if the repeating value is not a valid transaction. long currentTransactionIdForBatch = ((LongColumnVector) - batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector[0]; + cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector[0]; if (!validTxnList.isTxnValid(currentTransactionIdForBatch)) { - selectedBitSet.clear(0, batch.size); + selectedBitSet.clear(0, size); } return; } long[] currentTransactionVector = - ((LongColumnVector) batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector; + ((LongColumnVector) cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector; // Loop through the bits that are set to true and mark those rows as false, if their // current transactions are not valid. for (int setBitIndex = selectedBitSet.nextSetBit(0); @@ -278,7 +279,7 @@ private void findRecordsWithInvalidTransactionIds(VectorizedRowBatch batch, BitS if (!validTxnList.isTxnValid(currentTransactionVector[setBitIndex])) { selectedBitSet.clear(setBitIndex); } - } + } } @Override @@ -321,22 +322,24 @@ DeleteEventRegistry getDeleteEventRegistry() { * will read the delete delta files and will create their own internal * data structures to maintain record ids of the records that got deleted. */ - static interface DeleteEventRegistry { + public interface DeleteEventRegistry { /** * Modifies the passed bitset to indicate which of the rows in the batch * have been deleted. Assumes that the batch.size is equal to bitset size. - * @param batch + * @param cols + * @param size * @param selectedBitSet * @throws IOException */ - public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet) throws IOException; + public void findDeletedRecords( + ColumnVector[] cols, int size, BitSet selectedBitSet) throws IOException; /** * The close() method can be called externally to signal the implementing classes * to free up resources. * @throws IOException */ - public void close() throws IOException; + void close() throws IOException; } /** @@ -346,7 +349,7 @@ DeleteEventRegistry getDeleteEventRegistry() { * amount of memory usage, given the number of delete delta files. Therefore, this * implementation will be picked up when the memory pressure is high. */ - static class SortMergedDeleteEventRegistry implements DeleteEventRegistry { + public static class SortMergedDeleteEventRegistry implements DeleteEventRegistry { private OrcRawRecordMerger deleteRecords; private OrcRawRecordMerger.ReaderKey deleteRecordKey; private OrcStruct deleteRecordValue; @@ -375,29 +378,29 @@ public SortMergedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, Reader.Opt } @Override - public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet) + public void findDeletedRecords(ColumnVector[] cols, int size, BitSet selectedBitSet) throws IOException { if (!isDeleteRecordAvailable) { return; } long[] originalTransaction = - batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? null - : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector; + cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? null + : ((LongColumnVector) cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector; long[] bucket = - batch.cols[OrcRecordUpdater.BUCKET].isRepeating ? null - : ((LongColumnVector) batch.cols[OrcRecordUpdater.BUCKET]).vector; + cols[OrcRecordUpdater.BUCKET].isRepeating ? null + : ((LongColumnVector) cols[OrcRecordUpdater.BUCKET]).vector; long[] rowId = - batch.cols[OrcRecordUpdater.ROW_ID].isRepeating ? null - : ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector; + cols[OrcRecordUpdater.ROW_ID].isRepeating ? null + : ((LongColumnVector) cols[OrcRecordUpdater.ROW_ID]).vector; // The following repeatedX values will be set, if any of the columns are repeating. long repeatedOriginalTransaction = (originalTransaction != null) ? -1 - : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[0]; + : ((LongColumnVector) cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[0]; long repeatedBucket = (bucket != null) ? -1 - : ((LongColumnVector) batch.cols[OrcRecordUpdater.BUCKET]).vector[0]; + : ((LongColumnVector) cols[OrcRecordUpdater.BUCKET]).vector[0]; long repeatedRowId = (rowId != null) ? -1 - : ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector[0]; + : ((LongColumnVector) cols[OrcRecordUpdater.ROW_ID]).vector[0]; // Get the first valid row in the batch still available. @@ -412,7 +415,7 @@ public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet) rowId != null ? (int) rowId[firstValidIndex] : repeatedRowId); // Get the last valid row in the batch still available. - int lastValidIndex = selectedBitSet.previousSetBit(batch.size - 1); + int lastValidIndex = selectedBitSet.previousSetBit(size - 1); RecordIdentifier lastRecordIdInBatch = new RecordIdentifier( originalTransaction != null ? originalTransaction[lastValidIndex] : repeatedOriginalTransaction, @@ -482,7 +485,7 @@ public void close() throws IOException { * heuristic that prevents creation of an instance of this class if the memory pressure is high. * The SortMergedDeleteEventRegistry is then the fallback method for such scenarios. */ - static class ColumnizedDeleteEventRegistry implements DeleteEventRegistry { + public static class ColumnizedDeleteEventRegistry implements DeleteEventRegistry { /** * A simple wrapper class to hold the (otid, rowId) pair. */ @@ -626,8 +629,9 @@ public int compareTo(CompressedOtid other) { private CompressedOtid compressedOtids[]; private ValidTxnList validTxnList; - public ColumnizedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, - Reader.Options readerOptions) throws IOException, DeleteEventsOverflowMemoryException { + public ColumnizedDeleteEventRegistry( + JobConf conf, OrcSplit orcSplit, Reader.Options readerOptions) + throws IOException, DeleteEventsOverflowMemoryException { int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucket(); String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY); this.validTxnList = (txnString == null) ? new ValidReadTxnList() : new ValidReadTxnList(txnString); @@ -775,7 +779,7 @@ private boolean isDeleted(long otid, long rowId) { } @Override - public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet) + public void findDeletedRecords(ColumnVector[] cols, int size, BitSet selectedBitSet) throws IOException { if (rowIds == null || compressedOtids == null) { return; @@ -784,13 +788,13 @@ public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet) // check if it is deleted or not. long[] originalTransactionVector = - batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? null - : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector; + cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? null + : ((LongColumnVector) cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector; long repeatedOriginalTransaction = (originalTransactionVector != null) ? -1 - : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[0]; + : ((LongColumnVector) cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[0]; long[] rowIdVector = - ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector; + ((LongColumnVector) cols[OrcRecordUpdater.ROW_ID]).vector; for (int setBitIndex = selectedBitSet.nextSetBit(0); setBitIndex >= 0; @@ -801,7 +805,7 @@ public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet) if (isDeleted(otid, rowId)) { selectedBitSet.clear(setBitIndex); } - } + } } @Override @@ -816,7 +820,7 @@ public void close() throws IOException { } } - static class DeleteEventsOverflowMemoryException extends Exception { + public static class DeleteEventsOverflowMemoryException extends Exception { private static final long serialVersionUID = 1L; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index 2120400..2d4d7d4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -258,7 +258,7 @@ public void deriveExplainAttributes() { } public void deriveLlap(Configuration conf, boolean isExecDriver) { - boolean hasLlap = false, hasNonLlap = false, hasAcid = false; + boolean hasLlap = false, hasNonLlap = false; // Assume the IO is enabled on the daemon by default. We cannot reasonably check it here. boolean isLlapOn = HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ENABLED, llapMode); boolean canWrapAny = false, doCheckIfs = false; @@ -278,12 +278,7 @@ public void deriveLlap(Configuration conf, boolean isExecDriver) { boolean isUsingLlapIo = HiveInputFormat.canWrapForLlap( part.getInputFileFormatClass(), doCheckIfs); if (isUsingLlapIo) { - if (part.getTableDesc() != null && - AcidUtils.isTablePropertyTransactional(part.getTableDesc().getProperties())) { - hasAcid = true; - } else { - hasLlap = true; - } + hasLlap = true; } else { hasNonLlap = true; } @@ -296,7 +291,7 @@ public void deriveLlap(Configuration conf, boolean isExecDriver) { } llapIoDesc = deriveLlapIoDescString( - isLlapOn, canWrapAny, hasPathToPartInfo, hasLlap, hasNonLlap, hasAcid); + isLlapOn, canWrapAny, hasPathToPartInfo, hasLlap, hasNonLlap); } private boolean checkVectorizerSupportedTypes(boolean hasLlap) { @@ -321,11 +316,10 @@ private boolean checkVectorizerSupportedTypes(boolean hasLlap) { } private static String deriveLlapIoDescString(boolean isLlapOn, boolean canWrapAny, - boolean hasPathToPartInfo, boolean hasLlap, boolean hasNonLlap, boolean hasAcid) { + boolean hasPathToPartInfo, boolean hasLlap, boolean hasNonLlap) { if (!isLlapOn) return null; // LLAP IO is off, don't output. if (!canWrapAny) return "no inputs"; // Cannot use with input formats. if (!hasPathToPartInfo) return "unknown"; // No information to judge. - if (hasAcid) return "may be used (ACID table)"; return (hasLlap ? (hasNonLlap ? "some inputs" : "all inputs") : "no inputs"); } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java index f07aa49..102ba5c 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java @@ -423,7 +423,7 @@ public void testNewBase() throws Exception { OrcRawRecordMerger merger = new OrcRawRecordMerger(conf, false, reader, false, 10, createMaximalTxnList(), new Reader.Options().range(1000, 1000), null); - RecordReader rr = merger.getCurrentReader().recordReader; + RecordReader rr = ((ReaderPair) merger.getCurrentReader()).recordReader; assertEquals(0, merger.getOtherReaders().size()); assertEquals(new RecordIdentifier(10, 20, 30), merger.getMinKey()); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java index 6bf1312..1957f9d 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java @@ -247,17 +247,17 @@ public void testCanCreateVectorizedAcidRowBatchReaderOnSplit() throws Exception conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, AcidUtils.AcidOperationalProperties.getLegacy().toInt()); // Test false when trying to create a vectorized ACID row batch reader for a legacy table. - assertFalse(VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, mockSplit)); + assertFalse(VectorizedOrcAcidRowBatchReader.isAcid(conf, mockSplit)); conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, AcidUtils.AcidOperationalProperties.getDefault().toInt()); Mockito.when(mockSplit.isOriginal()).thenReturn(true); // Test false when trying to create a vectorized ACID row batch reader when reading originals. - assertFalse(VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, mockSplit)); + assertFalse(VectorizedOrcAcidRowBatchReader.isAcid(conf, mockSplit)); // A positive test case. Mockito.when(mockSplit.isOriginal()).thenReturn(false); - assertTrue(VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, mockSplit)); + assertTrue(VectorizedOrcAcidRowBatchReader.isAcid(conf, mockSplit)); } } diff --git ql/src/test/queries/clientpositive/llap_acid.q ql/src/test/queries/clientpositive/llap_acid.q index 6bd216a..41d86af 100644 --- ql/src/test/queries/clientpositive/llap_acid.q +++ ql/src/test/queries/clientpositive/llap_acid.q @@ -27,7 +27,7 @@ select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limi insert into table orc_llap partition (csmallint = 2) select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limit 10; -alter table orc_llap SET TBLPROPERTIES ('transactional'='true'); +alter table orc_llap SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default'); insert into table orc_llap partition (csmallint = 3) select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble desc limit 10; diff --git ql/src/test/results/clientpositive/llap/acid_bucket_pruning.q.out ql/src/test/results/clientpositive/llap/acid_bucket_pruning.q.out index d05bf64..e54fdec 100644 --- ql/src/test/results/clientpositive/llap/acid_bucket_pruning.q.out +++ ql/src/test/results/clientpositive/llap/acid_bucket_pruning.q.out @@ -76,7 +76,7 @@ STAGE PLANS: GatherStats: false MultiFileSpray: false Execution mode: llap - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Path -> Alias: #### A masked pattern was here #### Path -> Partition: diff --git ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction_3.q.out ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction_3.q.out index 829f28f..abea2bb 100644 --- ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction_3.q.out +++ ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction_3.q.out @@ -65,7 +65,7 @@ STAGE PLANS: Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE value expressions: ROW__ID (type: struct) Execution mode: llap - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Map 7 Map Operator Tree: TableScan @@ -336,7 +336,7 @@ STAGE PLANS: Map-reduce partition columns: a (type: int) Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE Execution mode: llap - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Map 4 Map Operator Tree: TableScan @@ -526,7 +526,7 @@ STAGE PLANS: Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE value expressions: ROW__ID (type: struct) Execution mode: llap - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Map 9 Map Operator Tree: TableScan diff --git ql/src/test/results/clientpositive/llap/dynpart_sort_optimization_acid.q.out ql/src/test/results/clientpositive/llap/dynpart_sort_optimization_acid.q.out index 788854a..6799d69 100644 --- ql/src/test/results/clientpositive/llap/dynpart_sort_optimization_acid.q.out +++ ql/src/test/results/clientpositive/llap/dynpart_sort_optimization_acid.q.out @@ -106,7 +106,7 @@ STAGE PLANS: Map-reduce partition columns: UDFToInteger(_col0) (type: int) Statistics: Num rows: 800 Data size: 15400 Basic stats: COMPLETE Column stats: NONE Execution mode: llap - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Reducer 2 Execution mode: llap Reduce Operator Tree: @@ -200,7 +200,7 @@ STAGE PLANS: Statistics: Num rows: 800 Data size: 347200 Basic stats: COMPLETE Column stats: PARTIAL value expressions: _col3 (type: string) Execution mode: llap - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Reducer 2 Execution mode: llap Reduce Operator Tree: @@ -386,7 +386,7 @@ STAGE PLANS: Map-reduce partition columns: UDFToInteger(_col0) (type: int) Statistics: Num rows: 800 Data size: 15400 Basic stats: COMPLETE Column stats: NONE Execution mode: llap - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Reducer 2 Execution mode: llap Reduce Operator Tree: @@ -479,7 +479,7 @@ STAGE PLANS: Map-reduce partition columns: _col3 (type: string) Statistics: Num rows: 800 Data size: 347200 Basic stats: COMPLETE Column stats: PARTIAL Execution mode: llap - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Reducer 2 Execution mode: llap Reduce Operator Tree: @@ -675,7 +675,7 @@ STAGE PLANS: Map-reduce partition columns: UDFToInteger(_col0) (type: int) Statistics: Num rows: 800 Data size: 15400 Basic stats: COMPLETE Column stats: NONE Execution mode: llap - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Reducer 2 Execution mode: llap Reduce Operator Tree: @@ -770,7 +770,7 @@ STAGE PLANS: Statistics: Num rows: 1600 Data size: 556800 Basic stats: COMPLETE Column stats: PARTIAL value expressions: _col4 (type: int) Execution mode: llap - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Reducer 2 Execution mode: llap Reduce Operator Tree: @@ -894,7 +894,7 @@ STAGE PLANS: Statistics: Num rows: 1600 Data size: 422400 Basic stats: COMPLETE Column stats: PARTIAL value expressions: _col1 (type: string), _col2 (type: int) Execution mode: llap - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Reducer 2 Execution mode: llap Reduce Operator Tree: @@ -1087,7 +1087,7 @@ STAGE PLANS: Map-reduce partition columns: UDFToInteger(_col0) (type: int) Statistics: Num rows: 800 Data size: 15400 Basic stats: COMPLETE Column stats: NONE Execution mode: llap - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Reducer 2 Execution mode: llap Reduce Operator Tree: @@ -1181,7 +1181,7 @@ STAGE PLANS: Map-reduce partition columns: '2008-04-08' (type: string), _col4 (type: int) Statistics: Num rows: 1600 Data size: 556800 Basic stats: COMPLETE Column stats: PARTIAL Execution mode: llap - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Reducer 2 Execution mode: llap Reduce Operator Tree: @@ -1305,7 +1305,7 @@ STAGE PLANS: Map-reduce partition columns: _col1 (type: string), _col2 (type: int) Statistics: Num rows: 1600 Data size: 422400 Basic stats: COMPLETE Column stats: PARTIAL Execution mode: llap - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Reducer 2 Execution mode: llap Reduce Operator Tree: @@ -1500,7 +1500,7 @@ STAGE PLANS: Statistics: Num rows: 800 Data size: 280800 Basic stats: COMPLETE Column stats: PARTIAL value expressions: _col1 (type: string), 'bar' (type: string) Execution mode: llap - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Reducer 2 Execution mode: llap Reduce Operator Tree: @@ -1596,7 +1596,7 @@ STAGE PLANS: Statistics: Num rows: 1600 Data size: 561600 Basic stats: COMPLETE Column stats: PARTIAL value expressions: _col1 (type: string), 'bar' (type: string) Execution mode: llap - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Reducer 2 Execution mode: llap Reduce Operator Tree: diff --git ql/src/test/results/clientpositive/llap/llap_acid.q.out ql/src/test/results/clientpositive/llap/llap_acid.q.out new file mode 100644 index 0000000..ca59f1d --- /dev/null +++ ql/src/test/results/clientpositive/llap/llap_acid.q.out @@ -0,0 +1,321 @@ +PREHOOK: query: DROP TABLE orc_llap +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE orc_llap +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE orc_llap ( + cint INT, + cbigint BIGINT, + cfloat FLOAT, + cdouble DOUBLE) +partitioned by (csmallint smallint) +clustered by (cint) into 2 buckets stored as orc +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@orc_llap +POSTHOOK: query: CREATE TABLE orc_llap ( + cint INT, + cbigint BIGINT, + cfloat FLOAT, + cdouble DOUBLE) +partitioned by (csmallint smallint) +clustered by (cint) into 2 buckets stored as orc +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@orc_llap +PREHOOK: query: insert into table orc_llap partition (csmallint = 1) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +PREHOOK: Output: default@orc_llap@csmallint=1 +POSTHOOK: query: insert into table orc_llap partition (csmallint = 1) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +POSTHOOK: Output: default@orc_llap@csmallint=1 +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=1).cbigint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ] +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=1).cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ] +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=1).cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=1).cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +PREHOOK: query: insert into table orc_llap partition (csmallint = 2) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +PREHOOK: Output: default@orc_llap@csmallint=2 +POSTHOOK: query: insert into table orc_llap partition (csmallint = 2) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +POSTHOOK: Output: default@orc_llap@csmallint=2 +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=2).cbigint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ] +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=2).cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ] +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=2).cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=2).cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +PREHOOK: query: alter table orc_llap SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default') +PREHOOK: type: ALTERTABLE_PROPERTIES +PREHOOK: Input: default@orc_llap +PREHOOK: Output: default@orc_llap +POSTHOOK: query: alter table orc_llap SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default') +POSTHOOK: type: ALTERTABLE_PROPERTIES +POSTHOOK: Input: default@orc_llap +POSTHOOK: Output: default@orc_llap +PREHOOK: query: insert into table orc_llap partition (csmallint = 3) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble desc limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +PREHOOK: Output: default@orc_llap@csmallint=3 +POSTHOOK: query: insert into table orc_llap partition (csmallint = 3) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble desc limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +POSTHOOK: Output: default@orc_llap@csmallint=3 +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=3).cbigint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ] +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=3).cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ] +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=3).cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=3).cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +PREHOOK: query: explain +select cint, csmallint, cbigint from orc_llap where cint is not null order +by csmallint, cint +PREHOOK: type: QUERY +POSTHOOK: query: explain +select cint, csmallint, cbigint from orc_llap where cint is not null order +by csmallint, cint +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: orc_llap + filterExpr: cint is not null (type: boolean) + Statistics: Num rows: 20 Data size: 376 Basic stats: COMPLETE Column stats: PARTIAL + Filter Operator + predicate: cint is not null (type: boolean) + Statistics: Num rows: 20 Data size: 80 Basic stats: COMPLETE Column stats: PARTIAL + Select Operator + expressions: cint (type: int), csmallint (type: smallint), cbigint (type: bigint) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 20 Data size: 80 Basic stats: COMPLETE Column stats: PARTIAL + Reduce Output Operator + key expressions: _col1 (type: smallint), _col0 (type: int) + sort order: ++ + Statistics: Num rows: 20 Data size: 80 Basic stats: COMPLETE Column stats: PARTIAL + value expressions: _col2 (type: bigint) + Execution mode: vectorized, llap + LLAP IO: all inputs + Reducer 2 + Execution mode: vectorized, llap + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey1 (type: int), KEY.reducesinkkey0 (type: smallint), VALUE._col0 (type: bigint) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 20 Data size: 80 Basic stats: COMPLETE Column stats: PARTIAL + File Output Operator + compressed: false + Statistics: Num rows: 20 Data size: 80 Basic stats: COMPLETE Column stats: PARTIAL + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select cint, csmallint, cbigint from orc_llap where cint is not null order +by csmallint, cint +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap +PREHOOK: Input: default@orc_llap@csmallint=1 +PREHOOK: Input: default@orc_llap@csmallint=2 +PREHOOK: Input: default@orc_llap@csmallint=3 +#### A masked pattern was here #### +POSTHOOK: query: select cint, csmallint, cbigint from orc_llap where cint is not null order +by csmallint, cint +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap +POSTHOOK: Input: default@orc_llap@csmallint=1 +POSTHOOK: Input: default@orc_llap@csmallint=2 +POSTHOOK: Input: default@orc_llap@csmallint=3 +#### A masked pattern was here #### +-285355633 1 -1241163445 +-109813638 1 -58941842 +164554497 1 1161977292 +199879534 1 123351087 +246423894 1 -1645852809 +354670578 1 562841852 +455419170 1 1108177470 +665801232 1 480783141 +708885482 1 -1645852809 +-285355633 2 -1241163445 +-109813638 2 -58941842 +164554497 2 1161977292 +199879534 2 123351087 +246423894 2 -1645852809 +354670578 2 562841852 +455419170 2 1108177470 +665801232 2 480783141 +708885482 2 -1645852809 +-923308739 3 -1887561756 +-3728 3 -1887561756 +762 3 -1645852809 +6981 3 -1887561756 +253665376 3 NULL +497728223 3 -1887561756 +528534767 3 NULL +528534767 3 NULL +528534767 3 NULL +528534767 3 NULL +PREHOOK: query: insert into table orc_llap partition (csmallint = 1) values (1, 1, 1, 1) +PREHOOK: type: QUERY +PREHOOK: Output: default@orc_llap@csmallint=1 +POSTHOOK: query: insert into table orc_llap partition (csmallint = 1) values (1, 1, 1, 1) +POSTHOOK: type: QUERY +POSTHOOK: Output: default@orc_llap@csmallint=1 +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=1).cbigint EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=1).cdouble EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col4, type:string, comment:), ] +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=1).cfloat EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col3, type:string, comment:), ] +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=1).cint EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +PREHOOK: query: update orc_llap set cbigint = 2 where cint = 1 +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap +PREHOOK: Input: default@orc_llap@csmallint=1 +PREHOOK: Input: default@orc_llap@csmallint=2 +PREHOOK: Input: default@orc_llap@csmallint=3 +PREHOOK: Output: default@orc_llap@csmallint=1 +PREHOOK: Output: default@orc_llap@csmallint=2 +PREHOOK: Output: default@orc_llap@csmallint=3 +POSTHOOK: query: update orc_llap set cbigint = 2 where cint = 1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap +POSTHOOK: Input: default@orc_llap@csmallint=1 +POSTHOOK: Input: default@orc_llap@csmallint=2 +POSTHOOK: Input: default@orc_llap@csmallint=3 +POSTHOOK: Output: default@orc_llap@csmallint=1 +POSTHOOK: Output: default@orc_llap@csmallint=2 +POSTHOOK: Output: default@orc_llap@csmallint=3 +PREHOOK: query: explain +select cint, csmallint, cbigint from orc_llap where cint is not null order +by csmallint, cint +PREHOOK: type: QUERY +POSTHOOK: query: explain +select cint, csmallint, cbigint from orc_llap where cint is not null order +by csmallint, cint +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: orc_llap + filterExpr: cint is not null (type: boolean) + Statistics: Num rows: 20 Data size: 376 Basic stats: COMPLETE Column stats: PARTIAL + Filter Operator + predicate: cint is not null (type: boolean) + Statistics: Num rows: 20 Data size: 80 Basic stats: COMPLETE Column stats: PARTIAL + Select Operator + expressions: cint (type: int), csmallint (type: smallint), cbigint (type: bigint) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 20 Data size: 80 Basic stats: COMPLETE Column stats: PARTIAL + Reduce Output Operator + key expressions: _col1 (type: smallint), _col0 (type: int) + sort order: ++ + Statistics: Num rows: 20 Data size: 80 Basic stats: COMPLETE Column stats: PARTIAL + value expressions: _col2 (type: bigint) + Execution mode: vectorized, llap + LLAP IO: all inputs + Reducer 2 + Execution mode: vectorized, llap + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey1 (type: int), KEY.reducesinkkey0 (type: smallint), VALUE._col0 (type: bigint) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 20 Data size: 80 Basic stats: COMPLETE Column stats: PARTIAL + File Output Operator + compressed: false + Statistics: Num rows: 20 Data size: 80 Basic stats: COMPLETE Column stats: PARTIAL + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select cint, csmallint, cbigint from orc_llap where cint is not null order +by csmallint, cint +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap +PREHOOK: Input: default@orc_llap@csmallint=1 +PREHOOK: Input: default@orc_llap@csmallint=2 +PREHOOK: Input: default@orc_llap@csmallint=3 +#### A masked pattern was here #### +POSTHOOK: query: select cint, csmallint, cbigint from orc_llap where cint is not null order +by csmallint, cint +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap +POSTHOOK: Input: default@orc_llap@csmallint=1 +POSTHOOK: Input: default@orc_llap@csmallint=2 +POSTHOOK: Input: default@orc_llap@csmallint=3 +#### A masked pattern was here #### +-285355633 1 -1241163445 +-109813638 1 -58941842 +1 1 2 +164554497 1 1161977292 +199879534 1 123351087 +246423894 1 -1645852809 +354670578 1 562841852 +455419170 1 1108177470 +665801232 1 480783141 +708885482 1 -1645852809 +-285355633 2 -1241163445 +-109813638 2 -58941842 +164554497 2 1161977292 +199879534 2 123351087 +246423894 2 -1645852809 +354670578 2 562841852 +455419170 2 1108177470 +665801232 2 480783141 +708885482 2 -1645852809 +-923308739 3 -1887561756 +-3728 3 -1887561756 +762 3 -1645852809 +6981 3 -1887561756 +253665376 3 NULL +497728223 3 -1887561756 +528534767 3 NULL +528534767 3 NULL +528534767 3 NULL +528534767 3 NULL +PREHOOK: query: DROP TABLE orc_llap +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@orc_llap +PREHOOK: Output: default@orc_llap +POSTHOOK: query: DROP TABLE orc_llap +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@orc_llap +POSTHOOK: Output: default@orc_llap diff --git ql/src/test/results/clientpositive/llap/sqlmerge.q.out ql/src/test/results/clientpositive/llap/sqlmerge.q.out index c73e0d2..c3995b0 100644 --- ql/src/test/results/clientpositive/llap/sqlmerge.q.out +++ ql/src/test/results/clientpositive/llap/sqlmerge.q.out @@ -60,7 +60,7 @@ STAGE PLANS: Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE value expressions: ROW__ID (type: struct) Execution mode: llap - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Map 7 Map Operator Tree: TableScan @@ -301,7 +301,7 @@ STAGE PLANS: Map-reduce partition columns: a (type: int) Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE Execution mode: llap - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Map 4 Map Operator Tree: TableScan diff --git ql/src/test/results/clientpositive/llap_acid.q.out ql/src/test/results/clientpositive/llap_acid.q.out index 5970fd7..e53c3b9 100644 --- ql/src/test/results/clientpositive/llap_acid.q.out +++ ql/src/test/results/clientpositive/llap_acid.q.out @@ -50,11 +50,11 @@ POSTHOOK: Lineage: orc_llap PARTITION(csmallint=2).cbigint SIMPLE [(alltypesorc) POSTHOOK: Lineage: orc_llap PARTITION(csmallint=2).cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ] POSTHOOK: Lineage: orc_llap PARTITION(csmallint=2).cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ] POSTHOOK: Lineage: orc_llap PARTITION(csmallint=2).cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] -PREHOOK: query: alter table orc_llap SET TBLPROPERTIES ('transactional'='true') +PREHOOK: query: alter table orc_llap SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default') PREHOOK: type: ALTERTABLE_PROPERTIES PREHOOK: Input: default@orc_llap PREHOOK: Output: default@orc_llap -POSTHOOK: query: alter table orc_llap SET TBLPROPERTIES ('transactional'='true') +POSTHOOK: query: alter table orc_llap SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default') POSTHOOK: type: ALTERTABLE_PROPERTIES POSTHOOK: Input: default@orc_llap POSTHOOK: Output: default@orc_llap @@ -105,7 +105,7 @@ STAGE PLANS: Statistics: Num rows: 20 Data size: 296 Basic stats: COMPLETE Column stats: NONE value expressions: _col2 (type: bigint) Execution mode: vectorized - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Reduce Operator Tree: Select Operator expressions: KEY.reducesinkkey1 (type: int), KEY.reducesinkkey0 (type: smallint), VALUE._col0 (type: bigint) @@ -230,7 +230,7 @@ STAGE PLANS: Statistics: Num rows: 20 Data size: 296 Basic stats: COMPLETE Column stats: NONE value expressions: _col2 (type: bigint) Execution mode: vectorized - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Reduce Operator Tree: Select Operator expressions: KEY.reducesinkkey1 (type: int), KEY.reducesinkkey0 (type: smallint), VALUE._col0 (type: bigint) diff --git ql/src/test/results/clientpositive/perf/query14.q.out ql/src/test/results/clientpositive/perf/query14.q.out index 9821180..21cabbc 100644 --- ql/src/test/results/clientpositive/perf/query14.q.out +++ ql/src/test/results/clientpositive/perf/query14.q.out @@ -1,9 +1,9 @@ Warning: Shuffle Join MERGEJOIN[916][tables = [$hdt$_1, $hdt$_2]] in Stage 'Reducer 114' is a cross product Warning: Shuffle Join MERGEJOIN[917][tables = [$hdt$_1, $hdt$_2, $hdt$_0]] in Stage 'Reducer 115' is a cross product -Warning: Shuffle Join MERGEJOIN[914][tables = [$hdt$_1, $hdt$_2]] in Stage 'Reducer 61' is a cross product -Warning: Shuffle Join MERGEJOIN[915][tables = [$hdt$_1, $hdt$_2, $hdt$_0]] in Stage 'Reducer 62' is a cross product Warning: Shuffle Join MERGEJOIN[912][tables = [$hdt$_1, $hdt$_2]] in Stage 'Reducer 5' is a cross product Warning: Shuffle Join MERGEJOIN[913][tables = [$hdt$_1, $hdt$_2, $hdt$_0]] in Stage 'Reducer 6' is a cross product +Warning: Shuffle Join MERGEJOIN[914][tables = [$hdt$_1, $hdt$_2]] in Stage 'Reducer 61' is a cross product +Warning: Shuffle Join MERGEJOIN[915][tables = [$hdt$_1, $hdt$_2, $hdt$_0]] in Stage 'Reducer 62' is a cross product PREHOOK: query: explain with cross_items as (select i_item_sk ss_item_sk