diff --git llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java index 42129b7511..88c1a4cb6b 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java @@ -18,12 +18,14 @@ package org.apache.hadoop.hive.llap.io.api; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.InputFormat; public interface LlapIo { - InputFormat getInputFormat(InputFormat sourceInputFormat, Deserializer serde); + InputFormat getInputFormat( + InputFormat sourceInputFormat, Deserializer serde, Configuration conf); void close(); String getMemoryInfo(); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/ColumnVectorBatch.java llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/ColumnVectorBatch.java index 2cc18c86b2..69c37c9195 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/ColumnVectorBatch.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/ColumnVectorBatch.java @@ -28,6 +28,7 @@ public class ColumnVectorBatch { public ColumnVector[] cols; public int size; + public int rowNumber; public ColumnVectorBatch(int columnCount) { this(columnCount, VectorizedRowBatch.DEFAULT_SIZE); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java index 22ca025e90..cd64087842 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java @@ -138,7 +138,7 @@ InputSplit split, JobConf job, Reporter reporter) throws IOException { boolean useLlapIo = true; if (split instanceof LlapAwareSplit) { - useLlapIo = ((LlapAwareSplit) split).canUseLlapIo(); + useLlapIo = ((LlapAwareSplit) split).canUseLlapIo(job); } if (useLlapIo) return null; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java index 53c9bae5c1..7f65db5d73 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java @@ -77,6 +77,7 @@ // TODO: later, we may have a map private final ColumnVectorProducer orcCvp, genericCvp; + private final ColumnVectorProducer acidRowBatchOrcCvp; private final ExecutorService executor; private final LlapDaemonCacheMetrics cacheMetrics; private final LlapDaemonIOMetrics ioMetrics; @@ -185,8 +186,10 @@ private LlapIoImpl(Configuration conf) throws IOException { new LinkedBlockingQueue(), new ThreadFactoryBuilder().setNameFormat("IO-Elevator-Thread-%d").setDaemon(true).build()); // TODO: this should depends on input format and be in a map, or something. + this.acidRowBatchOrcCvp = new OrcColumnVectorProducer( + metadataCache, cache, bufferManagerOrc, conf, cacheMetrics, ioMetrics, true); this.orcCvp = new OrcColumnVectorProducer( - metadataCache, cache, bufferManagerOrc, conf, cacheMetrics, ioMetrics); + metadataCache, cache, bufferManagerOrc, conf, cacheMetrics, ioMetrics, false); this.genericCvp = isEncodeEnabled ? new GenericColumnVectorProducer( serdeCache, bufferManagerGeneric, conf, cacheMetrics, ioMetrics) : null; LOG.info("LLAP IO initialized"); @@ -209,10 +212,14 @@ public String getMemoryInfo() { @SuppressWarnings("rawtypes") @Override public InputFormat getInputFormat( - InputFormat sourceInputFormat, Deserializer sourceSerDe) { + InputFormat sourceInputFormat, Deserializer sourceSerDe, Configuration conf) { ColumnVectorProducer cvp = genericCvp; if (sourceInputFormat instanceof OrcInputFormat) { - cvp = orcCvp; // Special-case for ORC. + if (HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN)) { + cvp = acidRowBatchOrcCvp; // Special case for ACID ORC. + } else { + cvp = orcCvp; // Special case for non-ACID ORC. + } } else if (cvp == null) { LOG.warn("LLAP encode is disabled; cannot use for " + sourceInputFormat.getClass()); return null; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java index bbfe856843..720d02f910 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java @@ -266,6 +266,7 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti } value.selectedInUse = false; value.size = cvb.size; + value.rowNumber = cvb.rowNumber; if (wasFirst) { firstReturnTime = counters.startTimeCounter(); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcAcidRowBatchEncodedDataConsumer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcAcidRowBatchEncodedDataConsumer.java new file mode 100644 index 0000000000..db23ba0648 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcAcidRowBatchEncodedDataConsumer.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.io.decode; + +import org.apache.hadoop.hive.common.ValidReadTxnList; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; +import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger; +import org.apache.hadoop.hive.ql.io.orc.OrcSplit; +import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader; +import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader.ColumnizedDeleteEventRegistry; +import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader.DeleteEventRegistry; +import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader.SortMergedDeleteEventRegistry; +import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer; +import org.apache.hadoop.hive.ql.io.orc.encoded.Reader; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; + +import java.io.IOException; +import java.util.BitSet; + +import static org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader.findRecordsWithInvalidTransactionIds; + +/** + * OrcAcidEncodeDataConsumer consumes data after merging the base, delta, and delete delta. + */ +public class OrcAcidRowBatchEncodedDataConsumer extends OrcEncodedDataConsumer implements ReadPipeline { + private final InnerConsumer innerConsumer; + private final JobConf conf; + private final FileSplit split; + + public OrcAcidRowBatchEncodedDataConsumer( + Consumer consumer, int size, boolean skipCorrupt, + QueryFragmentCounters counters, LlapDaemonIOMetrics ioMetrics, + JobConf conf, FileSplit split) throws IOException { + + super(consumer, size, skipCorrupt, counters, ioMetrics); + this.split = split; + this.conf = conf; + this.innerConsumer = new InnerConsumer(); + } + + @Override + protected void decodeBatch(Reader.OrcEncodedColumnBatch batch, + Consumer downstreamConsumer) { + innerConsumer.downstreamConsumer = downstreamConsumer; + super.decodeBatch(batch, innerConsumer); + } + + private class InnerConsumer implements Consumer { + Consumer downstreamConsumer; + DeleteEventRegistry deleteEventRegistry; + + InnerConsumer() { + // Clone readerOptions for deleteEvents. + Reader.Options readerOptions = OrcInputFormat.createOptionsForReader(conf); + readerOptions = OrcRawRecordMerger.createEventOptions(readerOptions); + Reader.Options deleteEventReaderOptions = readerOptions.clone(); + // Set the range on the deleteEventReaderOptions to 0 to INTEGER_MAX because + // we always want to read all the delete delta files. + deleteEventReaderOptions.range(0, Long.MAX_VALUE); + // Disable SARGs for deleteEventReaders, as SARGs have no meaning. + deleteEventReaderOptions.searchArgument(null, null); + OrcSplit orcSplit = (OrcSplit) split; + + try { + try { + // See if we can load all the delete events from all the delete deltas in memory... + deleteEventRegistry = + new ColumnizedDeleteEventRegistry(conf, orcSplit, deleteEventReaderOptions); + } catch (VectorizedOrcAcidRowBatchReader.DeleteEventsOverflowMemoryException e) { + // If not, then create a set of hanging readers that do sort-merge to find the next + // smallest delete event on-demand. Caps the memory consumption to (some_const * no. + // of readers). + deleteEventRegistry = + new SortMergedDeleteEventRegistry(conf, orcSplit, deleteEventReaderOptions); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void consumeData(ColumnVectorBatch data) { + BitSet selectedBitSet = new BitSet(data.size); + + String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY); + ValidTxnList validTxnList = + (txnString == null) ? new ValidReadTxnList() : new ValidReadTxnList(txnString); + + // Case 1- find rows which belong to transactions that are not valid. + findRecordsWithInvalidTransactionIds(data.cols, data.size, selectedBitSet, validTxnList); + + // Case 2- find rows which have been deleted. + try { + deleteEventRegistry.findDeletedRecords(data.cols, data.size, selectedBitSet); + } catch (IOException e) { + throw new RuntimeException(e); + } + + // Select only not deleted ones + if (selectedBitSet.size() != data.size) { + data.size = selectedBitSet.size(); + int lastBit = 0; + int i = 0; + while ((lastBit = selectedBitSet.nextSetBit(lastBit)) >= 0) { + for (ColumnVector columnVector : data.cols) { + columnVector.setElement(i, lastBit, columnVector); + } + i++; + } + } + + downstreamConsumer.consumeData(data); + } + + @Override + public void setDone() { + downstreamConsumer.setDone(); + try { + deleteEventRegistry.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void setError(Throwable t) { + downstreamConsumer.setError(t); + try { + deleteEventRegistry.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java index 121e169fc6..3f204d4abb 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java @@ -52,19 +52,22 @@ private final LowLevelCache lowLevelCache; private final BufferUsageManager bufferManager; private final Configuration conf; + private final boolean isTransactionalRead; private boolean _skipCorrupt; // TODO: get rid of this private LlapDaemonCacheMetrics cacheMetrics; private LlapDaemonIOMetrics ioMetrics; public OrcColumnVectorProducer(OrcMetadataCache metadataCache, LowLevelCache lowLevelCache, BufferUsageManager bufferManager, - Configuration conf, LlapDaemonCacheMetrics cacheMetrics, LlapDaemonIOMetrics ioMetrics) { + Configuration conf, LlapDaemonCacheMetrics cacheMetrics, + LlapDaemonIOMetrics ioMetrics, boolean isTransactionalRead) { LlapIoImpl.LOG.info("Initializing ORC column vector producer"); this.metadataCache = metadataCache; this.lowLevelCache = lowLevelCache; this.bufferManager = bufferManager; this.conf = conf; + this.isTransactionalRead = isTransactionalRead; this._skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf); this.cacheMetrics = cacheMetrics; this.ioMetrics = ioMetrics; @@ -75,13 +78,33 @@ public ReadPipeline createReadPipeline( Consumer consumer, FileSplit split, List columnIds, SearchArgument sarg, String[] columnNames, QueryFragmentCounters counters, TypeDescription readerSchema, InputFormat unused0, Deserializer unused1, - Reporter reporter, JobConf job, Map unused2) throws IOException { + Reporter reporter, JobConf job, Map pathToPartMap) throws IOException { + + final OrcEncodedDataConsumer edc; + final OrcEncodedDataReader reader; cacheMetrics.incrCacheReadRequests(); - OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(consumer, columnIds.size(), - _skipCorrupt, counters, ioMetrics); - OrcEncodedDataReader reader = new OrcEncodedDataReader( - lowLevelCache, bufferManager, metadataCache, conf, job, split, columnIds, sarg, - columnNames, edc, counters, readerSchema); + + if (isTransactionalRead) { + readerSchema = + TypeDescription.createStruct(). + addField("operation", TypeDescription.createInt()). + addField("originalTransaction", TypeDescription.createLong()). + addField("bucket", TypeDescription.createInt()). + addField("rowId", TypeDescription.createLong()). + addField("currentTransaction", TypeDescription.createLong()). + addField("row", readerSchema); + LlapIoImpl.LOG.info("Initializing ORC ACID row batch encoded data consumer"); + edc = new OrcAcidRowBatchEncodedDataConsumer( + consumer, columnIds.size(), _skipCorrupt, counters, ioMetrics, job, split); + } else { + LlapIoImpl.LOG.info("Initializing ORC encoded data consumer"); + edc = new OrcEncodedDataConsumer( + consumer, columnIds.size(), _skipCorrupt, counters, ioMetrics); + } + + reader = new OrcEncodedDataReader( + lowLevelCache, bufferManager, metadataCache, conf, job, split, + columnIds, sarg, columnNames, edc, counters, readerSchema); edc.init(reader, reader); return edc; } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java index 8d96e7b2c2..a35c303078 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java @@ -145,6 +145,7 @@ protected void decodeBatch(OrcEncodedColumnBatch batch, ColumnVectorBatch cvb = cvbPool.take(); // assert cvb.cols.length == batch.getColumnIxs().length; // Must be constant per split. cvb.size = batchSize; + cvb.rowNumber = rgIdx; for (int idx = 0; idx < columnReaders.length; ++idx) { TreeReader reader = columnReaders[idx]; if (cvb.cols[idx] == null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/BatchToRowReader.java ql/src/java/org/apache/hadoop/hive/ql/io/BatchToRowReader.java index f35030202f..59970b8d09 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/BatchToRowReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/BatchToRowReader.java @@ -76,7 +76,7 @@ * so that the data produced after wrapping a vectorized reader would conform to the original OIs. */ public abstract class BatchToRowReader - implements RecordReader { + implements RowNumberProvidingRecordReader { protected static final Logger LOG = LoggerFactory.getLogger(BatchToRowReader.class); private final NullWritable key; @@ -176,6 +176,11 @@ public void close() throws IOException { batch.cols = null; } + @Override + public long getRowNumber() throws IOException { + return rowInBatch + batch.rowNumber; + } + /* Routines for stubbing into Writables */ public static BooleanWritable nextBoolean(ColumnVector vector, diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 21394c6aab..f7646c1225 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -287,7 +287,7 @@ public void configure(JobConf job) { throw new HiveException("Error creating SerDe for LLAP IO", e); } } - InputFormat wrappedIf = llapIo.getInputFormat(inputFormat, serde); + InputFormat wrappedIf = llapIo.getInputFormat(inputFormat, serde, conf); if (wrappedIf == null) { return inputFormat; // We cannot wrap; the cause is logged inside. } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/LlapAwareSplit.java ql/src/java/org/apache/hadoop/hive/ql/io/LlapAwareSplit.java index ead4678f64..db29dd085d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/LlapAwareSplit.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/LlapAwareSplit.java @@ -17,11 +17,13 @@ */ package org.apache.hadoop.hive.ql.io; +import org.apache.hadoop.mapred.JobConf; + /** * Split that is aware that it could be executed in LLAP. Allows LlapInputFormat to do * a last-minute check to see of LLAP IO pipeline should be used for this particular split. * By default, there is no such check - whatever is sent in is attempted with LLAP IO. */ public interface LlapAwareSplit { - boolean canUseLlapIo(); + boolean canUseLlapIo(JobConf jobConf); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/RowNumberProvidingRecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/RowNumberProvidingRecordReader.java new file mode 100644 index 0000000000..c095a9f0d6 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/RowNumberProvidingRecordReader.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.io.IOException; + +/** + * A record reader that provides its current row number. + * @param + * @param + */ +public interface RowNumberProvidingRecordReader + extends org.apache.hadoop.mapred.RecordReader { + + /** + * Get the current row number of the record reader. + * @return + * @throws IOException + */ + long getRowNumber() throws IOException; +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 0ef7c758d4..eea8519f0b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -71,6 +71,7 @@ import org.apache.hadoop.hive.ql.io.InputFormatChecker; import org.apache.hadoop.hive.ql.io.LlapWrappableInputFormatInterface; import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.ql.io.RowNumberProvidingRecordReader; import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface; import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader; import org.apache.hadoop.hive.ql.io.SyntheticFileId; @@ -210,7 +211,7 @@ public boolean isAcidRead(Configuration conf, InputSplit inputSplit) { } private static class OrcRecordReader - implements org.apache.hadoop.mapred.RecordReader, + implements RowNumberProvidingRecordReader, StatsProvidingRecordReader { private final RecordReader reader; private final long offset; @@ -230,6 +231,7 @@ public boolean isAcidRead(Configuration conf, InputSplit inputSplit) { this.length = split.getLength(); this.reader = createReaderFromFile(file, conf, offset, length); this.stats = new SerDeStats(); + this.reader.hasNext(); } @Override @@ -274,6 +276,11 @@ public SerDeStats getStats() { stats.setRowCount(file.getNumberOfRows()); return stats; } + + @Override + public long getRowNumber() throws IOException { + return reader.getRowNumber(); + } } /** @@ -2024,7 +2031,7 @@ static Path findOriginalBucket(FileSystem fs, directory); } - static Reader.Options createOptionsForReader(Configuration conf) { + public static Reader.Options createOptionsForReader(Configuration conf) { /** * Do we have schema on read in the configuration variables? */ diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 95b8806e70..d575e7cb43 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -18,11 +18,16 @@ package org.apache.hadoop.hive.ql.io.orc; import java.io.IOException; -import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.TreeMap; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.RowNumberProvidingRecordReader; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; import org.apache.orc.OrcUtils; import org.apache.orc.StripeInformation; import org.apache.orc.TypeDescription; @@ -51,14 +56,14 @@ private static final Logger LOG = LoggerFactory.getLogger(OrcRawRecordMerger.class); - private final Configuration conf; - private final boolean collapse; - private final RecordReader baseReader; - private final ObjectInspector objectInspector; - private final long offset; - private final long length; - private final ValidTxnList validTxnList; - private final int columns; + private Configuration conf; + private boolean collapse; + private RowNumberProvidingRecordReader baseReader; + private ObjectInspector objectInspector; + private long offset; + private long length; + private ValidTxnList validTxnList; + private int columns; private ReaderKey prevKey = new ReaderKey(); // this is the key less than the lowest key we need to process private RecordIdentifier minKey; @@ -185,12 +190,13 @@ public String toString() { */ static class ReaderPair { OrcStruct nextRecord; - final Reader reader; - final RecordReader recordReader; - final ReaderKey key; - final RecordIdentifier maxKey; - final int bucket; - private final int statementId; + Configuration conf; + RowNumberProvidingRecordReader recordReader; + ReaderKey key; + RecordIdentifier maxKey; + int bucket; + int statementId; + int numOfCols; /** * Create a reader that reads from the first key larger than minKey to any @@ -206,25 +212,58 @@ public String toString() { * @throws IOException */ ReaderPair(ReaderKey key, Reader reader, int bucket, - RecordIdentifier minKey, RecordIdentifier maxKey, - ReaderImpl.Options options, int statementId) throws IOException { - this.reader = reader; + RecordIdentifier minKey, RecordIdentifier maxKey, ReaderImpl.Options options, + int statementId) throws IOException { + this(key, reader, bucket, minKey, maxKey, options, statementId, false); + } + + ReaderPair(ReaderKey key, Reader reader, int bucket, + RecordIdentifier minKey, RecordIdentifier maxKey, ReaderImpl.Options options, + int statementId, boolean isOriginal) throws IOException { + init(key, reader, bucket, minKey, maxKey, options, statementId, isOriginal); + } + + Void init(ReaderKey key, Reader reader, int bucket, + RecordIdentifier minKey, RecordIdentifier maxKey, ReaderImpl.Options options, + int statementId, boolean isOriginal) throws IOException { this.key = key; this.maxKey = maxKey; this.bucket = bucket; // TODO use stripe statistics to jump over stripes - recordReader = reader.rowsOptions(options); + this.recordReader = getRecordReader(reader, options, isOriginal); this.statementId = statementId; + this.numOfCols = recordReader.createValue().getNumFields(); // advance the reader until we reach the minimum key do { next(nextRecord); } while (nextRecord != null && (minKey != null && key.compareRow(minKey) <= 0)); + return null; } - void next(OrcStruct next) throws IOException { - if (recordReader.hasNext()) { - nextRecord = (OrcStruct) recordReader.next(next); + RowNumberProvidingRecordReader getRecordReader(Reader reader, + org.apache.orc.Reader.Options options, boolean isOriginal) throws IOException { + + final Path path = reader.getPath(); + final OrcSplit orcSplit = new OrcSplit(path, null, options.getOffset(), options.getLength(), + new String[0], reader.getOrcTail(), isOriginal, false, Collections.emptyList(), -1, + reader.getRawDataSize()); + + final JobConf jobConf = new JobConf(); + AcidUtils.setTransactionalTableScan(jobConf, false); + HiveConf.setBoolVar(jobConf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false); + + // TODO: Return a LLAP record reader + return (RowNumberProvidingRecordReader) + new OrcInputFormat().getRecordReader(orcSplit, jobConf, Reporter.NULL); + } + + Void next(OrcStruct next) throws IOException { + if (next == null) { + next = new OrcStruct(numOfCols); + } + nextRecord = next; + if (recordReader.next(null, nextRecord)) { // set the key key.setValues(OrcRecordUpdater.getOriginalTransaction(nextRecord), OrcRecordUpdater.getBucket(nextRecord), @@ -242,10 +281,11 @@ void next(OrcStruct next) throws IOException { nextRecord = null; recordReader.close(); } + return null; } int getColumns() { - return reader.getTypes().get(OrcRecordUpdater.ROW + 1).getSubtypesCount(); + return numOfCols; } } @@ -254,21 +294,30 @@ int getColumns() { * It wraps the underlying reader's row with an ACID event object and * makes the relevant translations. */ - static final class OriginalReaderPair extends ReaderPair { + static class OriginalReaderPair extends ReaderPair { OriginalReaderPair(ReaderKey key, Reader reader, int bucket, RecordIdentifier minKey, RecordIdentifier maxKey, Reader.Options options) throws IOException { - super(key, reader, bucket, minKey, maxKey, options, 0); + super(key, reader, bucket, minKey, maxKey, options, 0, true); } @Override - void next(OrcStruct next) throws IOException { - if (recordReader.hasNext()) { - long nextRowId = recordReader.getRowNumber(); + Void next(OrcStruct next) throws IOException { + OrcStruct row = null; + final boolean nullNext = next == null; + if (!nullNext) { + row = OrcRecordUpdater.getRow(next); + } + if (row == null) { + row = new OrcStruct(numOfCols); + } + + final long nextRowId = recordReader.getRowNumber(); + if (recordReader.next(null, row)) { // have to do initialization here, because the super's constructor // calls next and thus we need to initialize before our constructor // runs - if (next == null) { + if (nullNext) { nextRecord = new OrcStruct(OrcRecordUpdater.FIELDS); IntWritable operation = new IntWritable(OrcRecordUpdater.INSERT_OPERATION); @@ -281,8 +330,7 @@ void next(OrcStruct next) throws IOException { new IntWritable(bucket)); nextRecord.setFieldValue(OrcRecordUpdater.ROW_ID, new LongWritable(nextRowId)); - nextRecord.setFieldValue(OrcRecordUpdater.ROW, - recordReader.next(null)); + nextRecord.setFieldValue(OrcRecordUpdater.ROW, row); } else { nextRecord = next; ((IntWritable) next.getFieldValue(OrcRecordUpdater.OPERATION)) @@ -295,8 +343,7 @@ void next(OrcStruct next) throws IOException { .set(0); ((LongWritable) next.getFieldValue(OrcRecordUpdater.ROW_ID)) .set(nextRowId); - nextRecord.setFieldValue(OrcRecordUpdater.ROW, - recordReader.next(OrcRecordUpdater.getRow(next))); + nextRecord.setFieldValue(OrcRecordUpdater.ROW, row); } key.setValues(0L, bucket, nextRowId, 0L, 0); if (maxKey != null && key.compareRow(maxKey) > 0) { @@ -310,16 +357,16 @@ void next(OrcStruct next) throws IOException { nextRecord = null; recordReader.close(); } + return null; } @Override int getColumns() { - return reader.getTypes().get(0).getSubtypesCount(); + return numOfCols; } } - private final TreeMap readers = - new TreeMap(); + private TreeMap readers; // The reader that currently has the lowest key. private ReaderPair primary; @@ -334,7 +381,7 @@ int getColumns() { * @param options the options for reading with * @throws IOException */ - private void discoverOriginalKeyBounds(Reader reader, int bucket, + Void discoverOriginalKeyBounds(Reader reader, int bucket, Reader.Options options ) throws IOException { long rowLength = 0; @@ -358,6 +405,7 @@ private void discoverOriginalKeyBounds(Reader reader, int bucket, if (!isTail) { maxKey = new RecordIdentifier(0, bucket, rowOffset + rowLength - 1); } + return null; } /** @@ -366,7 +414,7 @@ private void discoverOriginalKeyBounds(Reader reader, int bucket, * @param options the options for reading with * @throws IOException */ - private void discoverKeyBounds(Reader reader, + Void discoverKeyBounds(Reader reader, Reader.Options options) throws IOException { RecordIdentifier[] keyIndex = OrcRecordUpdater.parseKeyIndex(reader); long offset = options.getOffset(); @@ -391,6 +439,7 @@ private void discoverKeyBounds(Reader reader, if (!isTail) { maxKey = keyIndex[firstStripe + stripeCount - 1]; } + return null; } /** @@ -399,7 +448,11 @@ private void discoverKeyBounds(Reader reader, * @param options options for the row reader * @return a cloned options object that is modified for the event reader */ - static Reader.Options createEventOptions(Reader.Options options) { + Reader.Options innerCreateEventOptions(Reader.Options options) { + return createEventOptions(options); + } + + public static Reader.Options createEventOptions(Reader.Options options) { Reader.Options result = options.clone(); result.range(options.getOffset(), Long.MAX_VALUE); result.include(options.getInclude()); @@ -434,11 +487,18 @@ private void discoverKeyBounds(Reader reader, ValidTxnList validTxnList, Reader.Options options, Path[] deltaDirectory) throws IOException { + init(conf, collapseEvents, reader, isOriginal, bucket, validTxnList, options, deltaDirectory); + } + + Void init(Configuration conf, boolean collapseEvents, Reader reader, boolean isOriginal, + int bucket, ValidTxnList validTxnList, Reader.Options options, Path[] deltaDirectory) + throws IOException { this.conf = conf; this.collapse = collapseEvents; this.offset = options.getOffset(); this.length = options.getLength(); this.validTxnList = validTxnList; + this.readers = new TreeMap(); TypeDescription typeDescr = OrcInputFormat.getDesiredRowTypeDescr(conf, true, Integer.MAX_VALUE); @@ -447,7 +507,7 @@ private void discoverKeyBounds(Reader reader, (OrcStruct.createObjectInspector(0, OrcUtils.getOrcTypes(typeDescr))); // modify the options to reflect the event instead of the base row - Reader.Options eventOptions = createEventOptions(options); + Reader.Options eventOptions = innerCreateEventOptions(options); if (reader == null) { baseReader = null; } else { @@ -462,14 +522,7 @@ private void discoverKeyBounds(Reader reader, // use the min/max instead of the byte range ReaderPair pair; ReaderKey key = new ReaderKey(); - if (isOriginal) { - options = options.clone(); - pair = new OriginalReaderPair(key, reader, bucket, minKey, maxKey, - options); - } else { - pair = new ReaderPair(key, reader, bucket, minKey, maxKey, - eventOptions, 0); - } + pair = getReaderPair(isOriginal, options, eventOptions, key, reader, bucket); // if there is at least one record, put it in the map if (pair.nextRecord != null) { @@ -502,8 +555,9 @@ private void discoverKeyBounds(Reader reader, } } ReaderPair deltaPair; - deltaPair = new ReaderPair(key, deltaReader, bucket, minKey, - maxKey, deltaEventOptions != null ? deltaEventOptions : eventOptions, deltaDir.getStatementId()); + deltaPair = new ReaderPair(key, deltaReader, bucket, minKey, maxKey, + deltaEventOptions != null ? deltaEventOptions : eventOptions, + deltaDir.getStatementId()); if (deltaPair.nextRecord != null) { readers.put(key, deltaPair); } @@ -526,6 +580,19 @@ private void discoverKeyBounds(Reader reader, // get the number of columns in the user's rows columns = primary.getColumns(); } + return null; + } + + protected ReaderPair getReaderPair(boolean isOriginal, + ReaderImpl.Options options, ReaderImpl.Options eventOptions, + ReaderKey key, Reader reader, int bucket) throws IOException { + + if (isOriginal) { + options = options.clone(); + return new OriginalReaderPair(key, reader, bucket, minKey, maxKey, options); + } else { + return new ReaderPair(key, reader, bucket, minKey, maxKey, eventOptions, 0); + } } @VisibleForTesting diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java index d61b24bef3..bd21c46548 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java @@ -27,13 +27,17 @@ import java.util.List; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.ColumnarSplit; import org.apache.hadoop.hive.ql.io.LlapAwareSplit; import org.apache.hadoop.hive.ql.io.SyntheticFileId; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; import org.apache.orc.OrcProto; import org.apache.orc.impl.OrcTail; import org.slf4j.Logger; @@ -217,8 +221,39 @@ public long getColumnarProjectionSize() { } @Override - public boolean canUseLlapIo() { - return isOriginal && (deltas == null || deltas.isEmpty()); + public boolean canUseLlapIo(JobConf jobConf) { + // Support pure originals + if (isOriginal && (deltas == null || deltas.isEmpty())) { + return true; + } + + // Support split-update partitioned ACIDs + if (VectorizedOrcAcidRowBatchReader. + canCreateVectorizedAcidRowBatchReaderOnSplit(jobConf, this)) { + final MapWork mapWork = Utilities.getMapWork(jobConf); + if (mapWork == null) { + return false; + } + PartitionDesc oldPartition = null; + for (PartitionDesc partitionDesc : mapWork.getPartitionDescs()) { + + // Must have one partition description + if (oldPartition != null) { + if (oldPartition != partitionDesc) { + return false; + } + } + oldPartition = partitionDesc; + + // Must be partitioned + if (!partitionDesc.isPartitioned()) { + return false; + } + } + return true; + } else { + return false; + } } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java index 8823e216a1..ab095a0892 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java @@ -20,8 +20,10 @@ import java.io.IOException; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.orc.impl.OrcTail; /** * The interface for reading ORC files. @@ -98,4 +100,15 @@ RecordReader rows(long offset, long length, boolean[] include, SearchArgument sarg, String[] neededColumns) throws IOException; + /** + * Get the path. + * @return + */ + Path getPath(); + + /** + * Get the orc file tail object. + * @return + */ + OrcTail getOrcTail(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java index cbbbb150b6..7ec128bc59 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.orc.TypeDescription; +import org.apache.orc.impl.OrcTail; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,6 +101,16 @@ public RecordReader rows(long offset, long length, boolean[] include, } @Override + public Path getPath() { + return path; + } + + @Override + public OrcTail getOrcTail() { + return tail; + } + + @Override public String toString() { return "Hive " + super.toString(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 75c7680e26..04b8537bb9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; @@ -163,7 +164,7 @@ public static boolean canCreateVectorizedAcidRowBatchReaderOnSplit(JobConf conf, return false; // no split-update or possibly reading originals! } - private static Path[] getDeleteDeltaDirsFromSplit(OrcSplit orcSplit) throws IOException { + public static Path[] getDeleteDeltaDirsFromSplit(OrcSplit orcSplit) throws IOException { Path path = orcSplit.getPath(); Path root; if (orcSplit.hasBase()) { @@ -206,6 +207,7 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti // We will go through the batch to discover rows which match any of the cases and specifically // remove them from the selected vector. Of course, selectedInUse should also be set. + // TODO: Replace BitSet objects with selected integer array BitSet selectedBitSet = new BitSet(vectorizedRowBatchBase.size); if (vectorizedRowBatchBase.selectedInUse) { // When selectedInUse is true, start with every bit set to false and selectively set @@ -221,10 +223,12 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti } // Case 1- find rows which belong to transactions that are not valid. - findRecordsWithInvalidTransactionIds(vectorizedRowBatchBase, selectedBitSet); + findRecordsWithInvalidTransactionIds( + vectorizedRowBatchBase.cols, vectorizedRowBatchBase.size, selectedBitSet, validTxnList); // Case 2- find rows which have been deleted. - this.deleteEventRegistry.findDeletedRecords(vectorizedRowBatchBase, selectedBitSet); + this.deleteEventRegistry.findDeletedRecords( + vectorizedRowBatchBase.cols, vectorizedRowBatchBase.size, selectedBitSet); if (selectedBitSet.cardinality() == vectorizedRowBatchBase.size) { // None of the cases above matched and everything is selected. Hence, we will use the @@ -257,19 +261,20 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti return true; } - private void findRecordsWithInvalidTransactionIds(VectorizedRowBatch batch, BitSet selectedBitSet) { - if (batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION].isRepeating) { + public static void findRecordsWithInvalidTransactionIds( + ColumnVector[] cols, int size, BitSet selectedBitSet, ValidTxnList validTxnList) { + if (cols[OrcRecordUpdater.CURRENT_TRANSACTION].isRepeating) { // When we have repeating values, we can unset the whole bitset at once // if the repeating value is not a valid transaction. long currentTransactionIdForBatch = ((LongColumnVector) - batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector[0]; + cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector[0]; if (!validTxnList.isTxnValid(currentTransactionIdForBatch)) { - selectedBitSet.clear(0, batch.size); + selectedBitSet.clear(0, size); } return; } long[] currentTransactionVector = - ((LongColumnVector) batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector; + ((LongColumnVector) cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector; // Loop through the bits that are set to true and mark those rows as false, if their // current transactions are not valid. for (int setBitIndex = selectedBitSet.nextSetBit(0); @@ -278,7 +283,7 @@ private void findRecordsWithInvalidTransactionIds(VectorizedRowBatch batch, BitS if (!validTxnList.isTxnValid(currentTransactionVector[setBitIndex])) { selectedBitSet.clear(setBitIndex); } - } + } } @Override @@ -321,15 +326,16 @@ DeleteEventRegistry getDeleteEventRegistry() { * will read the delete delta files and will create their own internal * data structures to maintain record ids of the records that got deleted. */ - static interface DeleteEventRegistry { + public static interface DeleteEventRegistry { /** * Modifies the passed bitset to indicate which of the rows in the batch * have been deleted. Assumes that the batch.size is equal to bitset size. - * @param batch + * @param cols + * @param size * @param selectedBitSet * @throws IOException */ - public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet) throws IOException; + public void findDeletedRecords(ColumnVector[] cols, int size, BitSet selectedBitSet) throws IOException; /** * The close() method can be called externally to signal the implementing classes @@ -346,7 +352,7 @@ DeleteEventRegistry getDeleteEventRegistry() { * amount of memory usage, given the number of delete delta files. Therefore, this * implementation will be picked up when the memory pressure is high. */ - static class SortMergedDeleteEventRegistry implements DeleteEventRegistry { + public static class SortMergedDeleteEventRegistry implements DeleteEventRegistry { private OrcRawRecordMerger deleteRecords; private OrcRawRecordMerger.ReaderKey deleteRecordKey; private OrcStruct deleteRecordValue; @@ -375,29 +381,29 @@ public SortMergedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, Reader.Opt } @Override - public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet) + public void findDeletedRecords(ColumnVector[] cols, int size, BitSet selectedBitSet) throws IOException { if (!isDeleteRecordAvailable) { return; } long[] originalTransaction = - batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? null - : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector; + cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? null + : ((LongColumnVector) cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector; long[] bucket = - batch.cols[OrcRecordUpdater.BUCKET].isRepeating ? null - : ((LongColumnVector) batch.cols[OrcRecordUpdater.BUCKET]).vector; + cols[OrcRecordUpdater.BUCKET].isRepeating ? null + : ((LongColumnVector) cols[OrcRecordUpdater.BUCKET]).vector; long[] rowId = - batch.cols[OrcRecordUpdater.ROW_ID].isRepeating ? null - : ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector; + cols[OrcRecordUpdater.ROW_ID].isRepeating ? null + : ((LongColumnVector) cols[OrcRecordUpdater.ROW_ID]).vector; // The following repeatedX values will be set, if any of the columns are repeating. long repeatedOriginalTransaction = (originalTransaction != null) ? -1 - : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[0]; + : ((LongColumnVector) cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[0]; long repeatedBucket = (bucket != null) ? -1 - : ((LongColumnVector) batch.cols[OrcRecordUpdater.BUCKET]).vector[0]; + : ((LongColumnVector) cols[OrcRecordUpdater.BUCKET]).vector[0]; long repeatedRowId = (rowId != null) ? -1 - : ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector[0]; + : ((LongColumnVector) cols[OrcRecordUpdater.ROW_ID]).vector[0]; // Get the first valid row in the batch still available. @@ -412,7 +418,7 @@ public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet) rowId != null ? (int) rowId[firstValidIndex] : repeatedRowId); // Get the last valid row in the batch still available. - int lastValidIndex = selectedBitSet.previousSetBit(batch.size - 1); + int lastValidIndex = selectedBitSet.previousSetBit(size - 1); RecordIdentifier lastRecordIdInBatch = new RecordIdentifier( originalTransaction != null ? originalTransaction[lastValidIndex] : repeatedOriginalTransaction, @@ -482,7 +488,7 @@ public void close() throws IOException { * heuristic that prevents creation of an instance of this class if the memory pressure is high. * The SortMergedDeleteEventRegistry is then the fallback method for such scenarios. */ - static class ColumnizedDeleteEventRegistry implements DeleteEventRegistry { + public static class ColumnizedDeleteEventRegistry implements DeleteEventRegistry { /** * A simple wrapper class to hold the (otid, rowId) pair. */ @@ -775,7 +781,7 @@ private boolean isDeleted(long otid, long rowId) { } @Override - public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet) + public void findDeletedRecords(ColumnVector[] cols, int size, BitSet selectedBitSet) throws IOException { if (rowIds == null || compressedOtids == null) { return; @@ -784,13 +790,13 @@ public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet) // check if it is deleted or not. long[] originalTransactionVector = - batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? null - : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector; + cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? null + : ((LongColumnVector) cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector; long repeatedOriginalTransaction = (originalTransactionVector != null) ? -1 - : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[0]; + : ((LongColumnVector) cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[0]; long[] rowIdVector = - ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector; + ((LongColumnVector) cols[OrcRecordUpdater.ROW_ID]).vector; for (int setBitIndex = selectedBitSet.nextSetBit(0); setBitIndex >= 0; @@ -816,7 +822,7 @@ public void close() throws IOException { } } - static class DeleteEventsOverflowMemoryException extends Exception { + public static class DeleteEventsOverflowMemoryException extends Exception { private static final long serialVersionUID = 1L; } } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java index 1ce1bfb1dd..35bcc8b76c 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java @@ -18,10 +18,13 @@ package org.apache.hadoop.hive.ql.io.orc; +import org.apache.hadoop.hive.ql.io.RowNumberProvidingRecordReader; import org.apache.orc.CompressionKind; import org.apache.orc.MemoryManager; import org.apache.orc.StripeInformation; import org.apache.orc.impl.MemoryManagerImpl; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -38,10 +41,8 @@ import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.OriginalReaderPair; import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.ReaderKey; import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.ReaderPair; -import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.IntWritable; @@ -151,45 +152,64 @@ private static String value(OrcStruct event) { private final Path tmpDir = new Path(System.getProperty("test.tmp.dir", "target" + File.separator + "test" + File.separator + "tmp")); - private Reader createMockReader() throws IOException { - Reader reader = Mockito.mock(Reader.class, settings); - RecordReader recordReader = Mockito.mock(RecordReader.class, settings); - OrcStruct row1 = new OrcStruct(OrcRecordUpdater.FIELDS); - setRow(row1, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 20, 100, "first"); - OrcStruct row2 = new OrcStruct(OrcRecordUpdater.FIELDS); - setRow(row2, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 30, 110, "second"); - OrcStruct row3 = new OrcStruct(OrcRecordUpdater.FIELDS); - setRow(row3, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 40, 120, "third"); - OrcStruct row4 = new OrcStruct(OrcRecordUpdater.FIELDS); - setRow(row4, OrcRecordUpdater.INSERT_OPERATION, 40, 50, 60, 130, "fourth"); - OrcStruct row5 = new OrcStruct(OrcRecordUpdater.FIELDS); - setRow(row5, OrcRecordUpdater.INSERT_OPERATION, 40, 50, 61, 140, "fifth"); - Mockito.when(reader.rowsOptions(Mockito.any(Reader.Options.class))) - .thenReturn(recordReader); - - Mockito.when(recordReader.hasNext()). - thenReturn(true, true, true, true, true, false); - - Mockito.when(recordReader.getProgress()).thenReturn(1.0f); - - Mockito.when(recordReader.next(null)).thenReturn(row1); - Mockito.when(recordReader.next(row1)).thenReturn(row2); - Mockito.when(recordReader.next(row2)).thenReturn(row3); - Mockito.when(recordReader.next(row3)).thenReturn(row4); - Mockito.when(recordReader.next(row4)).thenReturn(row5); - - return reader; + private ReaderPair createMockReaderPair(ReaderKey key, int bucket, + RecordIdentifier minKey, RecordIdentifier maxKey, + ReaderImpl.Options options, int statementId) throws IOException { + + // Record reader + RowNumberProvidingRecordReader recordReader = Mockito.mock(RowNumberProvidingRecordReader.class, settings); + Mockito.when(recordReader.getProgress()).thenReturn(1f); + Mockito.when(recordReader.createValue()).thenReturn(new OrcStruct(OrcRecordUpdater.FIELDS)); + Mockito.when(recordReader.next(Mockito.any(), Mockito.any())).then(new Answer() { + int i = 0; + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable { + OrcStruct row = (OrcStruct) invocation.getArguments()[1]; + switch (i) { + case 0: + setRow(row, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 20, 100, "first"); + break; + case 1: + setRow(row, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 30, 110, "second"); + break; + case 2: + setRow(row, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 40, 120, "third"); + break; + case 3: + setRow(row, OrcRecordUpdater.INSERT_OPERATION, 40, 50, 60, 130, "fourth"); + break; + case 4: + setRow(row, OrcRecordUpdater.INSERT_OPERATION, 40, 50, 61, 140, "fifth"); + break; + } + i++; + return i <= 5; + } + }); + + // Reader pair + ReaderPair readerPair = Mockito.mock(ReaderPair.class, settings); + Mockito.when(readerPair.getRecordReader( + Mockito.any(), Mockito.any(), Mockito.anyBoolean())). + thenReturn(recordReader); + Mockito.when(readerPair.init(Mockito.any(), Mockito.any(), + Mockito.anyInt(), Mockito.any(), Mockito.any(), + Mockito.any(), Mockito.anyInt(), Mockito.anyBoolean())). + thenCallRealMethod(); + Mockito.when(readerPair.next(Mockito.any(OrcStruct.class))).thenCallRealMethod(); + + readerPair.init(key, null, bucket, minKey, maxKey, options, statementId, + false); + return readerPair; } @Test public void testReaderPair() throws Exception { ReaderKey key = new ReaderKey(); - Reader reader = createMockReader(); RecordIdentifier minKey = new RecordIdentifier(10, 20, 30); RecordIdentifier maxKey = new RecordIdentifier(40, 50, 60); - ReaderPair pair = new ReaderPair(key, reader, 20, minKey, maxKey, - new Reader.Options(), 0); - RecordReader recordReader = pair.recordReader; + ReaderPair pair = createMockReaderPair(key, 20, minKey, maxKey, new Reader.Options(), 0); + RowNumberProvidingRecordReader recordReader = pair.recordReader; assertEquals(10, key.getTransactionId()); assertEquals(20, key.getBucketId()); assertEquals(40, key.getRowId()); @@ -211,11 +231,9 @@ public void testReaderPair() throws Exception { @Test public void testReaderPairNoMin() throws Exception { ReaderKey key = new ReaderKey(); - Reader reader = createMockReader(); - ReaderPair pair = new ReaderPair(key, reader, 20, null, null, - new Reader.Options(), 0); - RecordReader recordReader = pair.recordReader; + ReaderPair pair = createMockReaderPair(key, 20, null, null, new Reader.Options(), 0); + RowNumberProvidingRecordReader recordReader = pair.recordReader; assertEquals(10, key.getTransactionId()); assertEquals(20, key.getBucketId()); assertEquals(20, key.getRowId()); @@ -255,44 +273,71 @@ public void testReaderPairNoMin() throws Exception { Mockito.verify(recordReader).close(); } - private static OrcStruct createOriginalRow(String value) { - OrcStruct result = new OrcStruct(1); - result.setFieldValue(0, new Text(value)); - return result; + private static void setOriginalRow(OrcStruct row, String value) { + row.setFieldValue(0, new Text(value)); } - private Reader createMockOriginalReader() throws IOException { - Reader reader = Mockito.mock(Reader.class, settings); - RecordReader recordReader = Mockito.mock(RecordReader.class, settings); - OrcStruct row1 = createOriginalRow("first"); - OrcStruct row2 = createOriginalRow("second"); - OrcStruct row3 = createOriginalRow("third"); - OrcStruct row4 = createOriginalRow("fourth"); - OrcStruct row5 = createOriginalRow("fifth"); - - Mockito.when(reader.rowsOptions(Mockito.any(Reader.Options.class))) - .thenReturn(recordReader); - Mockito.when(recordReader.hasNext()). - thenReturn(true, true, true, true, true, false); + private OriginalReaderPair createMockOriginalReaderPair(ReaderKey key, int bucket, + RecordIdentifier minKey, RecordIdentifier maxKey, + ReaderImpl.Options options) throws IOException { + + // Record reader + RowNumberProvidingRecordReader recordReader = Mockito.mock(RowNumberProvidingRecordReader.class, settings); + Mockito.when(recordReader.getProgress()).thenReturn(1f); + Mockito.when(recordReader.createValue()).thenReturn(new OrcStruct(OrcRecordUpdater.FIELDS)); + Mockito.when(recordReader.next(Mockito.any(NullWritable.class), Mockito.any(OrcStruct.class))). + then(new Answer() { + int i = 0; + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable { + OrcStruct row = (OrcStruct) invocation.getArguments()[1]; + switch (i) { + case 0: + setOriginalRow(row, "first"); + break; + case 1: + setOriginalRow(row, "second"); + break; + case 2: + setOriginalRow(row, "third"); + break; + case 3: + setOriginalRow(row, "fourth"); + break; + case 4: + setOriginalRow(row, "fifth"); + break; + } + i++; + return i <= 5; + } + }); + + // Reader pair + OriginalReaderPair readerPair = Mockito.mock(OriginalReaderPair.class, settings); + Mockito.when(readerPair.getRecordReader( + Mockito.any(), Mockito.any(), Mockito.anyBoolean())). + thenReturn(recordReader); + Mockito.when(readerPair.init(Mockito.any(), Mockito.any(), + Mockito.anyInt(), Mockito.any(), Mockito.any(), + Mockito.any(), Mockito.anyInt(), Mockito.anyBoolean())). + thenCallRealMethod(); + Mockito.when(readerPair.next(Mockito.any(OrcStruct.class))).thenCallRealMethod(); Mockito.when(recordReader.getRowNumber()).thenReturn(0L, 1L, 2L, 3L, 4L); - Mockito.when(recordReader.next(null)).thenReturn(row1); - Mockito.when(recordReader.next(row1)).thenReturn(row2); - Mockito.when(recordReader.next(row2)).thenReturn(row3); - Mockito.when(recordReader.next(row3)).thenReturn(row4); - Mockito.when(recordReader.next(row4)).thenReturn(row5); - return reader; + + readerPair.init(key, null, bucket, minKey, maxKey, options, 0, false); + return readerPair; } @Test public void testOriginalReaderPair() throws Exception { ReaderKey key = new ReaderKey(); - Reader reader = createMockOriginalReader(); RecordIdentifier minKey = new RecordIdentifier(0, 10, 1); RecordIdentifier maxKey = new RecordIdentifier(0, 10, 3); boolean[] includes = new boolean[]{true, true}; - ReaderPair pair = new OriginalReaderPair(key, reader, 10, minKey, maxKey, + ReaderPair pair = createMockOriginalReaderPair(key, 10, minKey, maxKey, new Reader.Options().include(includes)); - RecordReader recordReader = pair.recordReader; + RowNumberProvidingRecordReader recordReader = pair.recordReader; assertEquals(0, key.getTransactionId()); assertEquals(10, key.getBucketId()); assertEquals(2, key.getRowId()); @@ -318,8 +363,7 @@ private static ValidTxnList createMaximalTxnList() { @Test public void testOriginalReaderPairNoMin() throws Exception { ReaderKey key = new ReaderKey(); - Reader reader = createMockOriginalReader(); - ReaderPair pair = new OriginalReaderPair(key, reader, 10, null, null, + ReaderPair pair = createMockOriginalReaderPair(key, 10, null, null, new Reader.Options()); assertEquals("first", value(pair.nextRecord)); assertEquals(0, key.getTransactionId()); @@ -367,7 +411,6 @@ public void testNewBase() throws Exception { conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "string"); HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true); Reader reader = Mockito.mock(Reader.class, settings); - RecordReader recordReader = Mockito.mock(RecordReader.class, settings); List types = new ArrayList(); OrcProto.Type.Builder typeBuilder = OrcProto.Type.newBuilder(); @@ -391,30 +434,6 @@ public void testNewBase() throws Exception { types.add(typeBuilder.build()); Mockito.when(reader.getTypes()).thenReturn(types); - Mockito.when(reader.rowsOptions(Mockito.any(Reader.Options.class))) - .thenReturn(recordReader); - - OrcStruct row1 = new OrcStruct(OrcRecordUpdater.FIELDS); - setRow(row1, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 20, 100, "first"); - OrcStruct row2 = new OrcStruct(OrcRecordUpdater.FIELDS); - setRow(row2, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 30, 110, "second"); - OrcStruct row3 = new OrcStruct(OrcRecordUpdater.FIELDS); - setRow(row3, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 40, 120, "third"); - OrcStruct row4 = new OrcStruct(OrcRecordUpdater.FIELDS); - setRow(row4, OrcRecordUpdater.INSERT_OPERATION, 40, 50, 60, 130, "fourth"); - OrcStruct row5 = new OrcStruct(OrcRecordUpdater.FIELDS); - setRow(row5, OrcRecordUpdater.INSERT_OPERATION, 40, 50, 61, 140, "fifth"); - - Mockito.when(recordReader.hasNext()). - thenReturn(true, true, true, true, true, false); - - Mockito.when(recordReader.getProgress()).thenReturn(1.0f); - - Mockito.when(recordReader.next(null)).thenReturn(row1, row4); - Mockito.when(recordReader.next(row1)).thenReturn(row2); - Mockito.when(recordReader.next(row2)).thenReturn(row3); - Mockito.when(recordReader.next(row3)).thenReturn(row5); - Mockito.when(reader.getMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME)) .thenReturn(ByteBuffer.wrap("10,20,30;40,50,60;40,50,61" .getBytes("UTF-8"))); @@ -423,8 +442,20 @@ public void testNewBase() throws Exception { OrcRawRecordMerger merger = new OrcRawRecordMerger(conf, false, reader, false, 10, createMaximalTxnList(), - new Reader.Options().range(1000, 1000), null); - RecordReader rr = merger.getCurrentReader().recordReader; + new Reader.Options().range(1000, 1000), null) { + @Override + protected ReaderPair getReaderPair(boolean isOriginal, ReaderImpl.Options options, + ReaderImpl.Options eventOptions, ReaderKey key, Reader reader, int bucket) + throws IOException { + if (isOriginal) { + return createMockOriginalReaderPair(key, bucket, getMinKey(), getMaxKey(), options); + } else { + return createMockReaderPair(key, bucket, getMinKey(), getMaxKey(), eventOptions, 0); + } + } + }; + + RowNumberProvidingRecordReader rr = merger.getCurrentReader().recordReader; assertEquals(0, merger.getOtherReaders().size()); assertEquals(new RecordIdentifier(10, 20, 30), merger.getMinKey()); diff --git ql/src/test/queries/clientpositive/llap_acid.q ql/src/test/queries/clientpositive/llap_acid.q index 6bd216a55f..ca2005a590 100644 --- ql/src/test/queries/clientpositive/llap_acid.q +++ ql/src/test/queries/clientpositive/llap_acid.q @@ -27,7 +27,7 @@ select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limi insert into table orc_llap partition (csmallint = 2) select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limit 10; -alter table orc_llap SET TBLPROPERTIES ('transactional'='true'); +alter table orc_llap SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default'); insert into table orc_llap partition (csmallint = 3) select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble desc limit 10; @@ -51,3 +51,39 @@ select cint, csmallint, cbigint from orc_llap where cint is not null order by csmallint, cint; DROP TABLE orc_llap; + +DROP TABLE orc_llap_2; + +CREATE TABLE orc_llap_2 ( + cint INT, + cbigint BIGINT, + cfloat FLOAT, + cdouble DOUBLE) +partitioned by (csmallint smallint) +clustered by (cint) into 2 buckets stored as orc +TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default'); + +insert into table orc_llap_2 partition (csmallint = 1) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limit 10; +insert into table orc_llap_2 partition (csmallint = 2) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limit 10; +insert into table orc_llap_2 partition (csmallint = 3) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble desc limit 10; + +explain +select cint, csmallint, cbigint from orc_llap_2 where cint is not null order +by csmallint, cint; +select cint, csmallint, cbigint from orc_llap_2 where cint is not null order +by csmallint, cint; + +insert into table orc_llap_2 partition (csmallint = 1) values (1, 1, 1, 1); + +update orc_llap_2 set cbigint = 2 where cint = 1; + +explain +select cint, csmallint, cbigint from orc_llap_2 where cint is not null order +by csmallint, cint; +select cint, csmallint, cbigint from orc_llap_2 where cint is not null order +by csmallint, cint; + +DROP TABLE orc_llap_2; diff --git ql/src/test/results/clientpositive/llap/llap_acid.q.out ql/src/test/results/clientpositive/llap/llap_acid.q.out new file mode 100644 index 0000000000..5288a09c14 --- /dev/null +++ ql/src/test/results/clientpositive/llap/llap_acid.q.out @@ -0,0 +1,636 @@ +PREHOOK: query: DROP TABLE orc_llap +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE orc_llap +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE orc_llap ( + cint INT, + cbigint BIGINT, + cfloat FLOAT, + cdouble DOUBLE) +partitioned by (csmallint smallint) +clustered by (cint) into 2 buckets stored as orc +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@orc_llap +POSTHOOK: query: CREATE TABLE orc_llap ( + cint INT, + cbigint BIGINT, + cfloat FLOAT, + cdouble DOUBLE) +partitioned by (csmallint smallint) +clustered by (cint) into 2 buckets stored as orc +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@orc_llap +PREHOOK: query: insert into table orc_llap partition (csmallint = 1) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +PREHOOK: Output: default@orc_llap@csmallint=1 +POSTHOOK: query: insert into table orc_llap partition (csmallint = 1) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +POSTHOOK: Output: default@orc_llap@csmallint=1 +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=1).cbigint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ] +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=1).cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ] +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=1).cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=1).cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +PREHOOK: query: insert into table orc_llap partition (csmallint = 2) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +PREHOOK: Output: default@orc_llap@csmallint=2 +POSTHOOK: query: insert into table orc_llap partition (csmallint = 2) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +POSTHOOK: Output: default@orc_llap@csmallint=2 +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=2).cbigint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ] +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=2).cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ] +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=2).cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=2).cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +PREHOOK: query: alter table orc_llap SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default') +PREHOOK: type: ALTERTABLE_PROPERTIES +PREHOOK: Input: default@orc_llap +PREHOOK: Output: default@orc_llap +POSTHOOK: query: alter table orc_llap SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default') +POSTHOOK: type: ALTERTABLE_PROPERTIES +POSTHOOK: Input: default@orc_llap +POSTHOOK: Output: default@orc_llap +PREHOOK: query: insert into table orc_llap partition (csmallint = 3) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble desc limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +PREHOOK: Output: default@orc_llap@csmallint=3 +POSTHOOK: query: insert into table orc_llap partition (csmallint = 3) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble desc limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +POSTHOOK: Output: default@orc_llap@csmallint=3 +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=3).cbigint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ] +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=3).cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ] +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=3).cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=3).cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +PREHOOK: query: explain +select cint, csmallint, cbigint from orc_llap where cint is not null order +by csmallint, cint +PREHOOK: type: QUERY +POSTHOOK: query: explain +select cint, csmallint, cbigint from orc_llap where cint is not null order +by csmallint, cint +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: orc_llap + filterExpr: cint is not null (type: boolean) + Statistics: Num rows: 20 Data size: 376 Basic stats: COMPLETE Column stats: PARTIAL + Filter Operator + predicate: cint is not null (type: boolean) + Statistics: Num rows: 20 Data size: 80 Basic stats: COMPLETE Column stats: PARTIAL + Select Operator + expressions: cint (type: int), csmallint (type: smallint), cbigint (type: bigint) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 20 Data size: 80 Basic stats: COMPLETE Column stats: PARTIAL + Reduce Output Operator + key expressions: _col1 (type: smallint), _col0 (type: int) + sort order: ++ + Statistics: Num rows: 20 Data size: 80 Basic stats: COMPLETE Column stats: PARTIAL + value expressions: _col2 (type: bigint) + Execution mode: vectorized, llap + LLAP IO: may be used (ACID table) + Reducer 2 + Execution mode: vectorized, llap + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey1 (type: int), KEY.reducesinkkey0 (type: smallint), VALUE._col0 (type: bigint) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 20 Data size: 80 Basic stats: COMPLETE Column stats: PARTIAL + File Output Operator + compressed: false + Statistics: Num rows: 20 Data size: 80 Basic stats: COMPLETE Column stats: PARTIAL + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select cint, csmallint, cbigint from orc_llap where cint is not null order +by csmallint, cint +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap +PREHOOK: Input: default@orc_llap@csmallint=1 +PREHOOK: Input: default@orc_llap@csmallint=2 +PREHOOK: Input: default@orc_llap@csmallint=3 +#### A masked pattern was here #### +POSTHOOK: query: select cint, csmallint, cbigint from orc_llap where cint is not null order +by csmallint, cint +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap +POSTHOOK: Input: default@orc_llap@csmallint=1 +POSTHOOK: Input: default@orc_llap@csmallint=2 +POSTHOOK: Input: default@orc_llap@csmallint=3 +#### A masked pattern was here #### +-285355633 1 -1241163445 +-109813638 1 -58941842 +164554497 1 1161977292 +199879534 1 123351087 +246423894 1 -1645852809 +354670578 1 562841852 +455419170 1 1108177470 +665801232 1 480783141 +708885482 1 -1645852809 +-285355633 2 -1241163445 +-109813638 2 -58941842 +164554497 2 1161977292 +199879534 2 123351087 +246423894 2 -1645852809 +354670578 2 562841852 +455419170 2 1108177470 +665801232 2 480783141 +708885482 2 -1645852809 +-923308739 3 -1887561756 +-3728 3 -1887561756 +762 3 -1645852809 +6981 3 -1887561756 +253665376 3 NULL +497728223 3 -1887561756 +528534767 3 NULL +528534767 3 NULL +528534767 3 NULL +528534767 3 NULL +PREHOOK: query: insert into table orc_llap partition (csmallint = 1) values (1, 1, 1, 1) +PREHOOK: type: QUERY +PREHOOK: Output: default@orc_llap@csmallint=1 +POSTHOOK: query: insert into table orc_llap partition (csmallint = 1) values (1, 1, 1, 1) +POSTHOOK: type: QUERY +POSTHOOK: Output: default@orc_llap@csmallint=1 +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=1).cbigint EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=1).cdouble EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col4, type:string, comment:), ] +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=1).cfloat EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col3, type:string, comment:), ] +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=1).cint EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +PREHOOK: query: update orc_llap set cbigint = 2 where cint = 1 +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap +PREHOOK: Input: default@orc_llap@csmallint=1 +PREHOOK: Input: default@orc_llap@csmallint=2 +PREHOOK: Input: default@orc_llap@csmallint=3 +PREHOOK: Output: default@orc_llap@csmallint=1 +PREHOOK: Output: default@orc_llap@csmallint=2 +PREHOOK: Output: default@orc_llap@csmallint=3 +POSTHOOK: query: update orc_llap set cbigint = 2 where cint = 1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap +POSTHOOK: Input: default@orc_llap@csmallint=1 +POSTHOOK: Input: default@orc_llap@csmallint=2 +POSTHOOK: Input: default@orc_llap@csmallint=3 +POSTHOOK: Output: default@orc_llap@csmallint=1 +POSTHOOK: Output: default@orc_llap@csmallint=2 +POSTHOOK: Output: default@orc_llap@csmallint=3 +PREHOOK: query: explain +select cint, csmallint, cbigint from orc_llap where cint is not null order +by csmallint, cint +PREHOOK: type: QUERY +POSTHOOK: query: explain +select cint, csmallint, cbigint from orc_llap where cint is not null order +by csmallint, cint +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: orc_llap + filterExpr: cint is not null (type: boolean) + Statistics: Num rows: 20 Data size: 376 Basic stats: COMPLETE Column stats: PARTIAL + Filter Operator + predicate: cint is not null (type: boolean) + Statistics: Num rows: 20 Data size: 80 Basic stats: COMPLETE Column stats: PARTIAL + Select Operator + expressions: cint (type: int), csmallint (type: smallint), cbigint (type: bigint) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 20 Data size: 80 Basic stats: COMPLETE Column stats: PARTIAL + Reduce Output Operator + key expressions: _col1 (type: smallint), _col0 (type: int) + sort order: ++ + Statistics: Num rows: 20 Data size: 80 Basic stats: COMPLETE Column stats: PARTIAL + value expressions: _col2 (type: bigint) + Execution mode: vectorized, llap + LLAP IO: may be used (ACID table) + Reducer 2 + Execution mode: vectorized, llap + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey1 (type: int), KEY.reducesinkkey0 (type: smallint), VALUE._col0 (type: bigint) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 20 Data size: 80 Basic stats: COMPLETE Column stats: PARTIAL + File Output Operator + compressed: false + Statistics: Num rows: 20 Data size: 80 Basic stats: COMPLETE Column stats: PARTIAL + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select cint, csmallint, cbigint from orc_llap where cint is not null order +by csmallint, cint +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap +PREHOOK: Input: default@orc_llap@csmallint=1 +PREHOOK: Input: default@orc_llap@csmallint=2 +PREHOOK: Input: default@orc_llap@csmallint=3 +#### A masked pattern was here #### +POSTHOOK: query: select cint, csmallint, cbigint from orc_llap where cint is not null order +by csmallint, cint +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap +POSTHOOK: Input: default@orc_llap@csmallint=1 +POSTHOOK: Input: default@orc_llap@csmallint=2 +POSTHOOK: Input: default@orc_llap@csmallint=3 +#### A masked pattern was here #### +-285355633 1 -1241163445 +-109813638 1 -58941842 +1 1 2 +164554497 1 1161977292 +199879534 1 123351087 +246423894 1 -1645852809 +354670578 1 562841852 +455419170 1 1108177470 +665801232 1 480783141 +708885482 1 -1645852809 +-285355633 2 -1241163445 +-109813638 2 -58941842 +164554497 2 1161977292 +199879534 2 123351087 +246423894 2 -1645852809 +354670578 2 562841852 +455419170 2 1108177470 +665801232 2 480783141 +708885482 2 -1645852809 +-923308739 3 -1887561756 +-3728 3 -1887561756 +762 3 -1645852809 +6981 3 -1887561756 +253665376 3 NULL +497728223 3 -1887561756 +528534767 3 NULL +528534767 3 NULL +528534767 3 NULL +528534767 3 NULL +PREHOOK: query: DROP TABLE orc_llap +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@orc_llap +PREHOOK: Output: default@orc_llap +POSTHOOK: query: DROP TABLE orc_llap +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@orc_llap +POSTHOOK: Output: default@orc_llap +PREHOOK: query: DROP TABLE orc_llap_2 +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE orc_llap_2 +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE orc_llap_2 ( + cint INT, + cbigint BIGINT, + cfloat FLOAT, + cdouble DOUBLE) +partitioned by (csmallint smallint) +clustered by (cint) into 2 buckets stored as orc +TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@orc_llap_2 +POSTHOOK: query: CREATE TABLE orc_llap_2 ( + cint INT, + cbigint BIGINT, + cfloat FLOAT, + cdouble DOUBLE) +partitioned by (csmallint smallint) +clustered by (cint) into 2 buckets stored as orc +TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@orc_llap_2 +PREHOOK: query: insert into table orc_llap_2 partition (csmallint = 1) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +PREHOOK: Output: default@orc_llap_2@csmallint=1 +POSTHOOK: query: insert into table orc_llap_2 partition (csmallint = 1) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +POSTHOOK: Output: default@orc_llap_2@csmallint=1 +POSTHOOK: Lineage: orc_llap_2 PARTITION(csmallint=1).cbigint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ] +POSTHOOK: Lineage: orc_llap_2 PARTITION(csmallint=1).cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ] +POSTHOOK: Lineage: orc_llap_2 PARTITION(csmallint=1).cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: orc_llap_2 PARTITION(csmallint=1).cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +PREHOOK: query: insert into table orc_llap_2 partition (csmallint = 2) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +PREHOOK: Output: default@orc_llap_2@csmallint=2 +POSTHOOK: query: insert into table orc_llap_2 partition (csmallint = 2) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +POSTHOOK: Output: default@orc_llap_2@csmallint=2 +POSTHOOK: Lineage: orc_llap_2 PARTITION(csmallint=2).cbigint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ] +POSTHOOK: Lineage: orc_llap_2 PARTITION(csmallint=2).cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ] +POSTHOOK: Lineage: orc_llap_2 PARTITION(csmallint=2).cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: orc_llap_2 PARTITION(csmallint=2).cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +PREHOOK: query: insert into table orc_llap_2 partition (csmallint = 3) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble desc limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +PREHOOK: Output: default@orc_llap_2@csmallint=3 +POSTHOOK: query: insert into table orc_llap_2 partition (csmallint = 3) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble desc limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +POSTHOOK: Output: default@orc_llap_2@csmallint=3 +POSTHOOK: Lineage: orc_llap_2 PARTITION(csmallint=3).cbigint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ] +POSTHOOK: Lineage: orc_llap_2 PARTITION(csmallint=3).cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ] +POSTHOOK: Lineage: orc_llap_2 PARTITION(csmallint=3).cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: orc_llap_2 PARTITION(csmallint=3).cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +PREHOOK: query: explain +select cint, csmallint, cbigint from orc_llap_2 where cint is not null order +by csmallint, cint +PREHOOK: type: QUERY +POSTHOOK: query: explain +select cint, csmallint, cbigint from orc_llap_2 where cint is not null order +by csmallint, cint +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: orc_llap_2 + filterExpr: cint is not null (type: boolean) + Statistics: Num rows: 250 Data size: 4023 Basic stats: COMPLETE Column stats: PARTIAL + Filter Operator + predicate: cint is not null (type: boolean) + Statistics: Num rows: 250 Data size: 1000 Basic stats: COMPLETE Column stats: PARTIAL + Select Operator + expressions: cint (type: int), csmallint (type: smallint), cbigint (type: bigint) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 250 Data size: 1000 Basic stats: COMPLETE Column stats: PARTIAL + Reduce Output Operator + key expressions: _col1 (type: smallint), _col0 (type: int) + sort order: ++ + Statistics: Num rows: 250 Data size: 1000 Basic stats: COMPLETE Column stats: PARTIAL + value expressions: _col2 (type: bigint) + Execution mode: vectorized, llap + LLAP IO: may be used (ACID table) + Reducer 2 + Execution mode: vectorized, llap + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey1 (type: int), KEY.reducesinkkey0 (type: smallint), VALUE._col0 (type: bigint) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 250 Data size: 1000 Basic stats: COMPLETE Column stats: PARTIAL + File Output Operator + compressed: false + Statistics: Num rows: 250 Data size: 1000 Basic stats: COMPLETE Column stats: PARTIAL + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select cint, csmallint, cbigint from orc_llap_2 where cint is not null order +by csmallint, cint +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap_2 +PREHOOK: Input: default@orc_llap_2@csmallint=1 +PREHOOK: Input: default@orc_llap_2@csmallint=2 +PREHOOK: Input: default@orc_llap_2@csmallint=3 +#### A masked pattern was here #### +POSTHOOK: query: select cint, csmallint, cbigint from orc_llap_2 where cint is not null order +by csmallint, cint +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap_2 +POSTHOOK: Input: default@orc_llap_2@csmallint=1 +POSTHOOK: Input: default@orc_llap_2@csmallint=2 +POSTHOOK: Input: default@orc_llap_2@csmallint=3 +#### A masked pattern was here #### +-285355633 1 -1241163445 +-109813638 1 -58941842 +164554497 1 1161977292 +199879534 1 123351087 +246423894 1 -1645852809 +354670578 1 562841852 +455419170 1 1108177470 +665801232 1 480783141 +708885482 1 -1645852809 +-285355633 2 -1241163445 +-109813638 2 -58941842 +164554497 2 1161977292 +199879534 2 123351087 +246423894 2 -1645852809 +354670578 2 562841852 +455419170 2 1108177470 +665801232 2 480783141 +708885482 2 -1645852809 +-923308739 3 -1887561756 +-3728 3 -1887561756 +762 3 -1645852809 +6981 3 -1887561756 +253665376 3 NULL +497728223 3 -1887561756 +528534767 3 NULL +528534767 3 NULL +528534767 3 NULL +528534767 3 NULL +PREHOOK: query: insert into table orc_llap_2 partition (csmallint = 1) values (1, 1, 1, 1) +PREHOOK: type: QUERY +PREHOOK: Output: default@orc_llap_2@csmallint=1 +POSTHOOK: query: insert into table orc_llap_2 partition (csmallint = 1) values (1, 1, 1, 1) +POSTHOOK: type: QUERY +POSTHOOK: Output: default@orc_llap_2@csmallint=1 +POSTHOOK: Lineage: orc_llap_2 PARTITION(csmallint=1).cbigint EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +POSTHOOK: Lineage: orc_llap_2 PARTITION(csmallint=1).cdouble EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col4, type:string, comment:), ] +POSTHOOK: Lineage: orc_llap_2 PARTITION(csmallint=1).cfloat EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col3, type:string, comment:), ] +POSTHOOK: Lineage: orc_llap_2 PARTITION(csmallint=1).cint EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +PREHOOK: query: update orc_llap_2 set cbigint = 2 where cint = 1 +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap_2 +PREHOOK: Input: default@orc_llap_2@csmallint=1 +PREHOOK: Input: default@orc_llap_2@csmallint=2 +PREHOOK: Input: default@orc_llap_2@csmallint=3 +PREHOOK: Output: default@orc_llap_2@csmallint=1 +PREHOOK: Output: default@orc_llap_2@csmallint=2 +PREHOOK: Output: default@orc_llap_2@csmallint=3 +POSTHOOK: query: update orc_llap_2 set cbigint = 2 where cint = 1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap_2 +POSTHOOK: Input: default@orc_llap_2@csmallint=1 +POSTHOOK: Input: default@orc_llap_2@csmallint=2 +POSTHOOK: Input: default@orc_llap_2@csmallint=3 +POSTHOOK: Output: default@orc_llap_2@csmallint=1 +POSTHOOK: Output: default@orc_llap_2@csmallint=2 +POSTHOOK: Output: default@orc_llap_2@csmallint=3 +PREHOOK: query: explain +select cint, csmallint, cbigint from orc_llap_2 where cint is not null order +by csmallint, cint +PREHOOK: type: QUERY +POSTHOOK: query: explain +select cint, csmallint, cbigint from orc_llap_2 where cint is not null order +by csmallint, cint +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: orc_llap_2 + filterExpr: cint is not null (type: boolean) + Statistics: Num rows: 428 Data size: 6863 Basic stats: COMPLETE Column stats: PARTIAL + Filter Operator + predicate: cint is not null (type: boolean) + Statistics: Num rows: 428 Data size: 1712 Basic stats: COMPLETE Column stats: PARTIAL + Select Operator + expressions: cint (type: int), csmallint (type: smallint), cbigint (type: bigint) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 428 Data size: 1712 Basic stats: COMPLETE Column stats: PARTIAL + Reduce Output Operator + key expressions: _col1 (type: smallint), _col0 (type: int) + sort order: ++ + Statistics: Num rows: 428 Data size: 1712 Basic stats: COMPLETE Column stats: PARTIAL + value expressions: _col2 (type: bigint) + Execution mode: vectorized, llap + LLAP IO: may be used (ACID table) + Reducer 2 + Execution mode: vectorized, llap + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey1 (type: int), KEY.reducesinkkey0 (type: smallint), VALUE._col0 (type: bigint) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 428 Data size: 1712 Basic stats: COMPLETE Column stats: PARTIAL + File Output Operator + compressed: false + Statistics: Num rows: 428 Data size: 1712 Basic stats: COMPLETE Column stats: PARTIAL + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select cint, csmallint, cbigint from orc_llap_2 where cint is not null order +by csmallint, cint +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap_2 +PREHOOK: Input: default@orc_llap_2@csmallint=1 +PREHOOK: Input: default@orc_llap_2@csmallint=2 +PREHOOK: Input: default@orc_llap_2@csmallint=3 +#### A masked pattern was here #### +POSTHOOK: query: select cint, csmallint, cbigint from orc_llap_2 where cint is not null order +by csmallint, cint +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap_2 +POSTHOOK: Input: default@orc_llap_2@csmallint=1 +POSTHOOK: Input: default@orc_llap_2@csmallint=2 +POSTHOOK: Input: default@orc_llap_2@csmallint=3 +#### A masked pattern was here #### +-285355633 1 -1241163445 +-109813638 1 -58941842 +1 1 2 +164554497 1 1161977292 +199879534 1 123351087 +246423894 1 -1645852809 +354670578 1 562841852 +455419170 1 1108177470 +665801232 1 480783141 +708885482 1 -1645852809 +-285355633 2 -1241163445 +-109813638 2 -58941842 +164554497 2 1161977292 +199879534 2 123351087 +246423894 2 -1645852809 +354670578 2 562841852 +455419170 2 1108177470 +665801232 2 480783141 +708885482 2 -1645852809 +-923308739 3 -1887561756 +-3728 3 -1887561756 +762 3 -1645852809 +6981 3 -1887561756 +253665376 3 NULL +497728223 3 -1887561756 +528534767 3 NULL +528534767 3 NULL +528534767 3 NULL +528534767 3 NULL +PREHOOK: query: DROP TABLE orc_llap_2 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@orc_llap_2 +PREHOOK: Output: default@orc_llap_2 +POSTHOOK: query: DROP TABLE orc_llap_2 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@orc_llap_2 +POSTHOOK: Output: default@orc_llap_2 diff --git ql/src/test/results/clientpositive/llap_acid.q.out ql/src/test/results/clientpositive/llap_acid.q.out index 5970fd78cb..aed65475a3 100644 --- ql/src/test/results/clientpositive/llap_acid.q.out +++ ql/src/test/results/clientpositive/llap_acid.q.out @@ -50,11 +50,11 @@ POSTHOOK: Lineage: orc_llap PARTITION(csmallint=2).cbigint SIMPLE [(alltypesorc) POSTHOOK: Lineage: orc_llap PARTITION(csmallint=2).cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ] POSTHOOK: Lineage: orc_llap PARTITION(csmallint=2).cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ] POSTHOOK: Lineage: orc_llap PARTITION(csmallint=2).cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] -PREHOOK: query: alter table orc_llap SET TBLPROPERTIES ('transactional'='true') +PREHOOK: query: alter table orc_llap SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default') PREHOOK: type: ALTERTABLE_PROPERTIES PREHOOK: Input: default@orc_llap PREHOOK: Output: default@orc_llap -POSTHOOK: query: alter table orc_llap SET TBLPROPERTIES ('transactional'='true') +POSTHOOK: query: alter table orc_llap SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default') POSTHOOK: type: ALTERTABLE_PROPERTIES POSTHOOK: Input: default@orc_llap POSTHOOK: Output: default@orc_llap @@ -303,3 +303,302 @@ POSTHOOK: query: DROP TABLE orc_llap POSTHOOK: type: DROPTABLE POSTHOOK: Input: default@orc_llap POSTHOOK: Output: default@orc_llap +PREHOOK: query: DROP TABLE orc_llap_2 +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE orc_llap_2 +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE orc_llap_2 ( + cint INT, + cbigint BIGINT, + cfloat FLOAT, + cdouble DOUBLE) +partitioned by (csmallint smallint) +clustered by (cint) into 2 buckets stored as orc +TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@orc_llap_2 +POSTHOOK: query: CREATE TABLE orc_llap_2 ( + cint INT, + cbigint BIGINT, + cfloat FLOAT, + cdouble DOUBLE) +partitioned by (csmallint smallint) +clustered by (cint) into 2 buckets stored as orc +TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@orc_llap_2 +PREHOOK: query: insert into table orc_llap_2 partition (csmallint = 1) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +PREHOOK: Output: default@orc_llap_2@csmallint=1 +POSTHOOK: query: insert into table orc_llap_2 partition (csmallint = 1) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +POSTHOOK: Output: default@orc_llap_2@csmallint=1 +POSTHOOK: Lineage: orc_llap_2 PARTITION(csmallint=1).cbigint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ] +POSTHOOK: Lineage: orc_llap_2 PARTITION(csmallint=1).cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ] +POSTHOOK: Lineage: orc_llap_2 PARTITION(csmallint=1).cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: orc_llap_2 PARTITION(csmallint=1).cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +PREHOOK: query: insert into table orc_llap_2 partition (csmallint = 2) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +PREHOOK: Output: default@orc_llap_2@csmallint=2 +POSTHOOK: query: insert into table orc_llap_2 partition (csmallint = 2) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +POSTHOOK: Output: default@orc_llap_2@csmallint=2 +POSTHOOK: Lineage: orc_llap_2 PARTITION(csmallint=2).cbigint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ] +POSTHOOK: Lineage: orc_llap_2 PARTITION(csmallint=2).cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ] +POSTHOOK: Lineage: orc_llap_2 PARTITION(csmallint=2).cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: orc_llap_2 PARTITION(csmallint=2).cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +PREHOOK: query: insert into table orc_llap_2 partition (csmallint = 3) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble desc limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +PREHOOK: Output: default@orc_llap_2@csmallint=3 +POSTHOOK: query: insert into table orc_llap_2 partition (csmallint = 3) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble desc limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +POSTHOOK: Output: default@orc_llap_2@csmallint=3 +POSTHOOK: Lineage: orc_llap_2 PARTITION(csmallint=3).cbigint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ] +POSTHOOK: Lineage: orc_llap_2 PARTITION(csmallint=3).cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ] +POSTHOOK: Lineage: orc_llap_2 PARTITION(csmallint=3).cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: orc_llap_2 PARTITION(csmallint=3).cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +PREHOOK: query: explain +select cint, csmallint, cbigint from orc_llap_2 where cint is not null order +by csmallint, cint +PREHOOK: type: QUERY +POSTHOOK: query: explain +select cint, csmallint, cbigint from orc_llap_2 where cint is not null order +by csmallint, cint +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: orc_llap_2 + filterExpr: cint is not null (type: boolean) + Statistics: Num rows: 468 Data size: 5641 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: cint is not null (type: boolean) + Statistics: Num rows: 468 Data size: 5641 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: cint (type: int), csmallint (type: smallint), cbigint (type: bigint) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 468 Data size: 5641 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col1 (type: smallint), _col0 (type: int) + sort order: ++ + Statistics: Num rows: 468 Data size: 5641 Basic stats: COMPLETE Column stats: NONE + value expressions: _col2 (type: bigint) + Execution mode: vectorized + LLAP IO: may be used (ACID table) + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey1 (type: int), KEY.reducesinkkey0 (type: smallint), VALUE._col0 (type: bigint) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 468 Data size: 5641 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 468 Data size: 5641 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select cint, csmallint, cbigint from orc_llap_2 where cint is not null order +by csmallint, cint +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap_2 +PREHOOK: Input: default@orc_llap_2@csmallint=1 +PREHOOK: Input: default@orc_llap_2@csmallint=2 +PREHOOK: Input: default@orc_llap_2@csmallint=3 +#### A masked pattern was here #### +POSTHOOK: query: select cint, csmallint, cbigint from orc_llap_2 where cint is not null order +by csmallint, cint +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap_2 +POSTHOOK: Input: default@orc_llap_2@csmallint=1 +POSTHOOK: Input: default@orc_llap_2@csmallint=2 +POSTHOOK: Input: default@orc_llap_2@csmallint=3 +#### A masked pattern was here #### +-285355633 1 -1241163445 +-109813638 1 -58941842 +164554497 1 1161977292 +199879534 1 123351087 +246423894 1 -1645852809 +354670578 1 562841852 +455419170 1 1108177470 +665801232 1 480783141 +708885482 1 -1645852809 +-285355633 2 -1241163445 +-109813638 2 -58941842 +164554497 2 1161977292 +199879534 2 123351087 +246423894 2 -1645852809 +354670578 2 562841852 +455419170 2 1108177470 +665801232 2 480783141 +708885482 2 -1645852809 +-923308739 3 -1887561756 +-3728 3 -1887561756 +762 3 -1645852809 +6981 3 -1887561756 +253665376 3 NULL +497728223 3 -1887561756 +528534767 3 NULL +528534767 3 NULL +528534767 3 NULL +528534767 3 NULL +PREHOOK: query: insert into table orc_llap_2 partition (csmallint = 1) values (1, 1, 1, 1) +PREHOOK: type: QUERY +PREHOOK: Output: default@orc_llap_2@csmallint=1 +POSTHOOK: query: insert into table orc_llap_2 partition (csmallint = 1) values (1, 1, 1, 1) +POSTHOOK: type: QUERY +POSTHOOK: Output: default@orc_llap_2@csmallint=1 +POSTHOOK: Lineage: orc_llap_2 PARTITION(csmallint=1).cbigint EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +POSTHOOK: Lineage: orc_llap_2 PARTITION(csmallint=1).cdouble EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col4, type:string, comment:), ] +POSTHOOK: Lineage: orc_llap_2 PARTITION(csmallint=1).cfloat EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col3, type:string, comment:), ] +POSTHOOK: Lineage: orc_llap_2 PARTITION(csmallint=1).cint EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +PREHOOK: query: update orc_llap_2 set cbigint = 2 where cint = 1 +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap_2 +PREHOOK: Input: default@orc_llap_2@csmallint=1 +PREHOOK: Input: default@orc_llap_2@csmallint=2 +PREHOOK: Input: default@orc_llap_2@csmallint=3 +PREHOOK: Output: default@orc_llap_2@csmallint=1 +PREHOOK: Output: default@orc_llap_2@csmallint=2 +PREHOOK: Output: default@orc_llap_2@csmallint=3 +POSTHOOK: query: update orc_llap_2 set cbigint = 2 where cint = 1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap_2 +POSTHOOK: Input: default@orc_llap_2@csmallint=1 +POSTHOOK: Input: default@orc_llap_2@csmallint=2 +POSTHOOK: Input: default@orc_llap_2@csmallint=3 +POSTHOOK: Output: default@orc_llap_2@csmallint=1 +POSTHOOK: Output: default@orc_llap_2@csmallint=2 +POSTHOOK: Output: default@orc_llap_2@csmallint=3 +PREHOOK: query: explain +select cint, csmallint, cbigint from orc_llap_2 where cint is not null order +by csmallint, cint +PREHOOK: type: QUERY +POSTHOOK: query: explain +select cint, csmallint, cbigint from orc_llap_2 where cint is not null order +by csmallint, cint +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: orc_llap_2 + filterExpr: cint is not null (type: boolean) + Statistics: Num rows: 647 Data size: 7780 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: cint is not null (type: boolean) + Statistics: Num rows: 647 Data size: 7780 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: cint (type: int), csmallint (type: smallint), cbigint (type: bigint) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 647 Data size: 7780 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col1 (type: smallint), _col0 (type: int) + sort order: ++ + Statistics: Num rows: 647 Data size: 7780 Basic stats: COMPLETE Column stats: NONE + value expressions: _col2 (type: bigint) + Execution mode: vectorized + LLAP IO: may be used (ACID table) + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey1 (type: int), KEY.reducesinkkey0 (type: smallint), VALUE._col0 (type: bigint) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 647 Data size: 7780 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 647 Data size: 7780 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select cint, csmallint, cbigint from orc_llap_2 where cint is not null order +by csmallint, cint +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap_2 +PREHOOK: Input: default@orc_llap_2@csmallint=1 +PREHOOK: Input: default@orc_llap_2@csmallint=2 +PREHOOK: Input: default@orc_llap_2@csmallint=3 +#### A masked pattern was here #### +POSTHOOK: query: select cint, csmallint, cbigint from orc_llap_2 where cint is not null order +by csmallint, cint +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap_2 +POSTHOOK: Input: default@orc_llap_2@csmallint=1 +POSTHOOK: Input: default@orc_llap_2@csmallint=2 +POSTHOOK: Input: default@orc_llap_2@csmallint=3 +#### A masked pattern was here #### +-285355633 1 -1241163445 +-109813638 1 -58941842 +1 1 2 +164554497 1 1161977292 +199879534 1 123351087 +246423894 1 -1645852809 +354670578 1 562841852 +455419170 1 1108177470 +665801232 1 480783141 +708885482 1 -1645852809 +-285355633 2 -1241163445 +-109813638 2 -58941842 +164554497 2 1161977292 +199879534 2 123351087 +246423894 2 -1645852809 +354670578 2 562841852 +455419170 2 1108177470 +665801232 2 480783141 +708885482 2 -1645852809 +-923308739 3 -1887561756 +-3728 3 -1887561756 +762 3 -1645852809 +6981 3 -1887561756 +253665376 3 NULL +497728223 3 -1887561756 +528534767 3 NULL +528534767 3 NULL +528534767 3 NULL +528534767 3 NULL +PREHOOK: query: DROP TABLE orc_llap_2 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@orc_llap_2 +PREHOOK: Output: default@orc_llap_2 +POSTHOOK: query: DROP TABLE orc_llap_2 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@orc_llap_2 +POSTHOOK: Output: default@orc_llap_2 diff --git storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java index 5e5f13ddc7..f379850c85 100644 --- storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java +++ storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java @@ -38,6 +38,7 @@ public int[] selected; // array of positions of selected values public int[] projectedColumns; public int projectionSize; + public long rowNumber; private int dataColumnCount; private int partitionColumnCount;