diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java index 7c309a4..2da49aa 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java @@ -23,14 +23,13 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.management.ObjectName; import org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool; +import org.apache.hadoop.hive.llap.io.decode.OrcAcidColumnVectorProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -66,8 +65,6 @@ import org.apache.hadoop.metrics2.util.MBeans; import com.google.common.primitives.Ints; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; public class LlapIoImpl implements LlapIo { @@ -80,13 +77,16 @@ // TODO: later, we may have a map private final ColumnVectorProducer orcCvp, genericCvp; + private final ColumnVectorProducer acidOrcCvp; private final ExecutorService executor; private final LlapDaemonCacheMetrics cacheMetrics; private final LlapDaemonIOMetrics ioMetrics; private ObjectName buddyAllocatorMXBean; private final Allocator allocator; + private final Configuration conf; private LlapIoImpl(Configuration conf) throws IOException { + this.conf = conf; String ioMode = HiveConf.getVar(conf, HiveConf.ConfVars.LLAP_IO_MEMORY_MODE); boolean useLowLevelCache = LlapIoImpl.MODE_CACHE.equalsIgnoreCase(ioMode); LOG.info("Initializing LLAP IO in {} mode", useLowLevelCache ? LlapIoImpl.MODE_CACHE : "none"); @@ -162,6 +162,8 @@ private LlapIoImpl(Configuration conf) throws IOException { metadataCache, cache, bufferManager, conf, cacheMetrics, ioMetrics); this.genericCvp = isEncodeEnabled ? new GenericColumnVectorProducer( serdeCache, bufferManager, conf, cacheMetrics, ioMetrics) : null; + this.acidOrcCvp = new OrcAcidColumnVectorProducer( + metadataCache, cache, bufferManager, conf, cacheMetrics, ioMetrics); LOG.info("LLAP IO initialized"); registerMXBeans(); @@ -177,7 +179,12 @@ private void registerMXBeans() { InputFormat sourceInputFormat, Deserializer sourceSerDe) { ColumnVectorProducer cvp = genericCvp; if (sourceInputFormat instanceof OrcInputFormat) { - cvp = orcCvp; // Special-case for ORC. + OrcInputFormat orcInputFormat = (OrcInputFormat) sourceInputFormat; + if (orcInputFormat.isAcidRead(conf)) { + cvp = acidOrcCvp; // Special case for ACID ORC. + } else { + cvp = orcCvp; // Special case for non-ACID ORC. + } } else if (cvp == null) { LOG.warn("LLAP encode is disabled; cannot use for " + sourceInputFormat.getClass()); return null; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcAcidColumnVectorProducer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcAcidColumnVectorProducer.java new file mode 100644 index 0000000..a255448 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcAcidColumnVectorProducer.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.io.decode; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.llap.cache.BufferUsageManager; +import org.apache.hadoop.hive.llap.cache.LowLevelCache; +import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; +import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch; +import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; +import org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader; +import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics; +import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader; +import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.orc.OrcConf; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * OrcAcidColumnVectorProducer produces a ReadPipeline of ORC ACID data for LLAP. + */ +public class OrcAcidColumnVectorProducer implements ColumnVectorProducer { + + private final OrcMetadataCache metadataCache; + private final LowLevelCache baseCache; + private final BufferUsageManager bufferManager; + private final Configuration conf; + private boolean _skipCorrupt; // TODO: get rid of this + private LlapDaemonCacheMetrics cacheMetrics; + private LlapDaemonIOMetrics ioMetrics; + + public OrcAcidColumnVectorProducer( + OrcMetadataCache metadataCache, LowLevelCache baseCache, BufferUsageManager bufferManager, + Configuration conf, LlapDaemonCacheMetrics cacheMetrics, LlapDaemonIOMetrics ioMetrics) { + LlapIoImpl.LOG.info("Initializing ORC ACID column vector producer"); + + this.metadataCache = metadataCache; + this.baseCache = baseCache; + this.bufferManager = bufferManager; + this.conf = conf; + this._skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf); + this.cacheMetrics = cacheMetrics; + this.ioMetrics = ioMetrics; + } + + @Override + public ReadPipeline createReadPipeline( + Consumer consumer, FileSplit split, List columnIds, + SearchArgument sarg, String[] columnNames, QueryFragmentCounters counters, + TypeDescription readerSchema, InputFormat unused0, Deserializer unused1, + Reporter reporter, JobConf job, Map unused2) throws IOException { + cacheMetrics.incrCacheReadRequests(); + OrcEncodedDataConsumer edc; + if (VectorizedOrcAcidRowBatchReader.isAcid(job, split)) { + // If the split is ACID, then use ORC ACID consumer + edc = new OrcAcidEncodedDataConsumer(consumer, columnIds.size(), + _skipCorrupt, counters, ioMetrics, job, split); + } else { + // If the split is non-ACID, then use ORC consumer + edc = new OrcEncodedDataConsumer( + consumer, columnIds.size(), _skipCorrupt, counters, ioMetrics); + } + // Note: we use global conf here and ignore JobConf. + OrcEncodedDataReader reader = new OrcEncodedDataReader(baseCache, bufferManager, + metadataCache, conf, split, columnIds, sarg, columnNames, edc, counters, readerSchema); + edc.init(reader, reader); + return edc; + } +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcAcidEncodedDataConsumer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcAcidEncodedDataConsumer.java new file mode 100644 index 0000000..5d5c560 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcAcidEncodedDataConsumer.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.io.decode; + +import org.apache.hadoop.hive.common.ValidReadTxnList; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; +import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger; +import org.apache.hadoop.hive.ql.io.orc.OrcSplit; +import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader; +import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer; +import org.apache.hadoop.hive.ql.io.orc.encoded.Reader; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; + +import java.io.IOException; +import java.util.BitSet; + +import static org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader.findRecordsWithInvalidTransactionIds; + +/** + * OrcAcidEncodeDataConsumer consumes data after merging the base, delta, and delete delta. + */ +public class OrcAcidEncodedDataConsumer extends OrcEncodedDataConsumer implements ReadPipeline { + private final InnerConsumer innerConsumer = new InnerConsumer(); + private final JobConf conf; + private final FileSplit split; + + public OrcAcidEncodedDataConsumer( + Consumer consumer, int size, boolean skipCorrupt, + QueryFragmentCounters counters, LlapDaemonIOMetrics ioMetrics, + JobConf conf, FileSplit split) throws IOException { + super(consumer, size, skipCorrupt, counters, ioMetrics); + this.split = split; + this.conf = conf; + } + + @Override + protected void decodeBatch(Reader.OrcEncodedColumnBatch batch, + Consumer downstreamConsumer) { + innerConsumer.downstreamConsumer = downstreamConsumer; + super.decodeBatch(batch, innerConsumer); + } + + private class InnerConsumer implements Consumer { + Consumer downstreamConsumer; + VectorizedOrcAcidRowBatchReader.DeleteEventRegistry deleteEventRegistry; + + InnerConsumer() { + // Clone readerOptions for deleteEvents. + Reader.Options readerOptions = OrcInputFormat.createOptionsForReader(conf); + readerOptions = OrcRawRecordMerger.createEventOptions(readerOptions); + Reader.Options deleteEventReaderOptions = readerOptions.clone(); + // Set the range on the deleteEventReaderOptions to 0 to INTEGER_MAX because + // we always want to read all the delete delta files. + deleteEventReaderOptions.range(0, Long.MAX_VALUE); + // Disable SARGs for deleteEventReaders, as SARGs have no meaning. + deleteEventReaderOptions.searchArgument(null, null); + OrcSplit orcSplit = (OrcSplit) split; + + try { + try { + // See if we can load all the delete events from all the delete deltas in memory... + deleteEventRegistry = new VectorizedOrcAcidRowBatchReader.ColumnizedDeleteEventRegistry( + conf, orcSplit, deleteEventReaderOptions); + } catch (VectorizedOrcAcidRowBatchReader.DeleteEventsOverflowMemoryException e) { + // If not, then create a set of hanging readers that do sort-merge to find the next smallest + // delete event on-demand. Caps the memory consumption to (some_const * no. of readers). + deleteEventRegistry = new VectorizedOrcAcidRowBatchReader.SortMergedDeleteEventRegistry( + conf, orcSplit, deleteEventReaderOptions); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void consumeData(ColumnVectorBatch data) { + BitSet selectedBitSet = new BitSet(data.size); + + String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY); + ValidTxnList validTxnList = (txnString == null) ? new ValidReadTxnList() : new ValidReadTxnList(txnString); + + // Case 1- find rows which belong to transactions that are not valid. + findRecordsWithInvalidTransactionIds(data.cols, data.size, selectedBitSet, validTxnList); + + // Case 2- find rows which have been deleted. + try { + deleteEventRegistry.findDeletedRecords(data.cols, data.size, selectedBitSet); + } catch (IOException e) { + throw new RuntimeException(e); + } + + // Select only not deleted ones + int cardinality = selectedBitSet.cardinality(); + if (cardinality != data.size) { + data.size = cardinality; + for (int setBitIndex = selectedBitSet.nextSetBit(0), selectedItr = 0; + setBitIndex >= 0; + setBitIndex = selectedBitSet.nextSetBit(setBitIndex+1), ++selectedItr) { + for (ColumnVector columnVector : data.cols) { + columnVector.setElement(selectedItr, setBitIndex, columnVector); + } + } + } + + downstreamConsumer.consumeData(data); + } + + @Override + public void setDone() { + downstreamConsumer.setDone(); + try { + deleteEventRegistry.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void setError(Throwable t) { + downstreamConsumer.setError(t); + try { + deleteEventRegistry.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 369584b..c6dd270 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -210,6 +210,13 @@ public boolean isAcidRead(Configuration conf, InputSplit inputSplit) { return HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN); } + public boolean isAcidRead(Configuration conf) { + /* + * Fallback for the case when OrcSplit flags do not contain hasBase and deltas + */ + return HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN); + } + private static class OrcRecordReader implements org.apache.hadoop.mapred.RecordReader, StatsProvidingRecordReader { @@ -1844,8 +1851,7 @@ private static void scheduleSplits(ETLSplitStrategy splitStrategy, Context conte reporter.setStatus(inputSplit.toString()); - boolean isFastVectorizedReaderAvailable = - VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, inputSplit); + boolean isFastVectorizedReaderAvailable = VectorizedOrcAcidRowBatchReader.isAcid(conf, inputSplit); if (vectorMode && isFastVectorizedReaderAvailable) { // Faster vectorized ACID row batch reader is available that avoids row-by-row stitching. @@ -2030,7 +2036,7 @@ static Path findOriginalBucket(FileSystem fs, directory); } - static Reader.Options createOptionsForReader(Configuration conf) { + public static Reader.Options createOptionsForReader(Configuration conf) { /** * Do we have schema on read in the configuration variables? */ diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 95b8806..de0f0bc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -399,7 +399,7 @@ private void discoverKeyBounds(Reader reader, * @param options options for the row reader * @return a cloned options object that is modified for the event reader */ - static Reader.Options createEventOptions(Reader.Options options) { + public static Reader.Options createEventOptions(Reader.Options options) { Reader.Options result = options.clone(); result.range(options.getOffset(), Long.MAX_VALUE); result.include(options.getInclude()); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java index d61b24b..eb6231f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java @@ -218,7 +218,7 @@ public long getColumnarProjectionSize() { @Override public boolean canUseLlapIo() { - return isOriginal && (deltas == null || deltas.isEmpty()); + return isOriginal; } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 75c7680..3eba09b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.Arrays; import java.util.BitSet; -import java.util.List; import java.util.Map.Entry; import java.util.TreeMap; @@ -32,6 +31,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; @@ -42,15 +42,13 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; -import org.apache.orc.OrcProto; -import org.apache.orc.OrcUtils; -import org.apache.orc.TypeDescription; + +import com.google.common.annotations.VisibleForTesting; import org.apache.orc.impl.AcidStats; import org.apache.orc.impl.OrcAcidUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; /** * A fast vectorized batch reader class for ACID when split-update behavior is enabled. * When split-update is turned on, row-by-row stitching could be avoided to create the final @@ -147,7 +145,7 @@ public VectorizedOrcAcidRowBatchReader(InputSplit inputSplit, JobConf conf, * @param inputSplit * @return true if it is possible, else false. */ - public static boolean canCreateVectorizedAcidRowBatchReaderOnSplit(JobConf conf, InputSplit inputSplit) { + public static boolean isAcid(JobConf conf, InputSplit inputSplit) { if (!(inputSplit instanceof OrcSplit)) { return false; // must be an instance of OrcSplit. } @@ -163,7 +161,7 @@ public static boolean canCreateVectorizedAcidRowBatchReaderOnSplit(JobConf conf, return false; // no split-update or possibly reading originals! } - private static Path[] getDeleteDeltaDirsFromSplit(OrcSplit orcSplit) throws IOException { + public static Path[] getDeleteDeltaDirsFromSplit(OrcSplit orcSplit) throws IOException { Path path = orcSplit.getPath(); Path root; if (orcSplit.hasBase()) { @@ -221,10 +219,12 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti } // Case 1- find rows which belong to transactions that are not valid. - findRecordsWithInvalidTransactionIds(vectorizedRowBatchBase, selectedBitSet); + findRecordsWithInvalidTransactionIds( + vectorizedRowBatchBase.cols, vectorizedRowBatchBase.size, selectedBitSet, validTxnList); // Case 2- find rows which have been deleted. - this.deleteEventRegistry.findDeletedRecords(vectorizedRowBatchBase, selectedBitSet); + this.deleteEventRegistry.findDeletedRecords( + vectorizedRowBatchBase.cols, vectorizedRowBatchBase.size, selectedBitSet); if (selectedBitSet.cardinality() == vectorizedRowBatchBase.size) { // None of the cases above matched and everything is selected. Hence, we will use the @@ -257,19 +257,20 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti return true; } - private void findRecordsWithInvalidTransactionIds(VectorizedRowBatch batch, BitSet selectedBitSet) { - if (batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION].isRepeating) { + public static void findRecordsWithInvalidTransactionIds( + ColumnVector[] cols, int size, BitSet selectedBitSet, ValidTxnList validTxnList) { + if (cols[OrcRecordUpdater.CURRENT_TRANSACTION].isRepeating) { // When we have repeating values, we can unset the whole bitset at once // if the repeating value is not a valid transaction. long currentTransactionIdForBatch = ((LongColumnVector) - batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector[0]; + cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector[0]; if (!validTxnList.isTxnValid(currentTransactionIdForBatch)) { - selectedBitSet.clear(0, batch.size); + selectedBitSet.clear(0, size); } return; } long[] currentTransactionVector = - ((LongColumnVector) batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector; + ((LongColumnVector) cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector; // Loop through the bits that are set to true and mark those rows as false, if their // current transactions are not valid. for (int setBitIndex = selectedBitSet.nextSetBit(0); @@ -278,7 +279,7 @@ private void findRecordsWithInvalidTransactionIds(VectorizedRowBatch batch, BitS if (!validTxnList.isTxnValid(currentTransactionVector[setBitIndex])) { selectedBitSet.clear(setBitIndex); } - } + } } @Override @@ -321,22 +322,24 @@ DeleteEventRegistry getDeleteEventRegistry() { * will read the delete delta files and will create their own internal * data structures to maintain record ids of the records that got deleted. */ - static interface DeleteEventRegistry { + public interface DeleteEventRegistry { /** * Modifies the passed bitset to indicate which of the rows in the batch * have been deleted. Assumes that the batch.size is equal to bitset size. - * @param batch + * @param cols + * @param size * @param selectedBitSet * @throws IOException */ - public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet) throws IOException; + public void findDeletedRecords( + ColumnVector[] cols, int size, BitSet selectedBitSet) throws IOException; /** * The close() method can be called externally to signal the implementing classes * to free up resources. * @throws IOException */ - public void close() throws IOException; + void close() throws IOException; } /** @@ -346,7 +349,7 @@ DeleteEventRegistry getDeleteEventRegistry() { * amount of memory usage, given the number of delete delta files. Therefore, this * implementation will be picked up when the memory pressure is high. */ - static class SortMergedDeleteEventRegistry implements DeleteEventRegistry { + public static class SortMergedDeleteEventRegistry implements DeleteEventRegistry { private OrcRawRecordMerger deleteRecords; private OrcRawRecordMerger.ReaderKey deleteRecordKey; private OrcStruct deleteRecordValue; @@ -375,29 +378,29 @@ public SortMergedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, Reader.Opt } @Override - public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet) + public void findDeletedRecords(ColumnVector[] cols, int size, BitSet selectedBitSet) throws IOException { if (!isDeleteRecordAvailable) { return; } long[] originalTransaction = - batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? null - : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector; + cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? null + : ((LongColumnVector) cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector; long[] bucket = - batch.cols[OrcRecordUpdater.BUCKET].isRepeating ? null - : ((LongColumnVector) batch.cols[OrcRecordUpdater.BUCKET]).vector; + cols[OrcRecordUpdater.BUCKET].isRepeating ? null + : ((LongColumnVector) cols[OrcRecordUpdater.BUCKET]).vector; long[] rowId = - batch.cols[OrcRecordUpdater.ROW_ID].isRepeating ? null - : ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector; + cols[OrcRecordUpdater.ROW_ID].isRepeating ? null + : ((LongColumnVector) cols[OrcRecordUpdater.ROW_ID]).vector; // The following repeatedX values will be set, if any of the columns are repeating. long repeatedOriginalTransaction = (originalTransaction != null) ? -1 - : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[0]; + : ((LongColumnVector) cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[0]; long repeatedBucket = (bucket != null) ? -1 - : ((LongColumnVector) batch.cols[OrcRecordUpdater.BUCKET]).vector[0]; + : ((LongColumnVector) cols[OrcRecordUpdater.BUCKET]).vector[0]; long repeatedRowId = (rowId != null) ? -1 - : ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector[0]; + : ((LongColumnVector) cols[OrcRecordUpdater.ROW_ID]).vector[0]; // Get the first valid row in the batch still available. @@ -412,7 +415,7 @@ public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet) rowId != null ? (int) rowId[firstValidIndex] : repeatedRowId); // Get the last valid row in the batch still available. - int lastValidIndex = selectedBitSet.previousSetBit(batch.size - 1); + int lastValidIndex = selectedBitSet.previousSetBit(size - 1); RecordIdentifier lastRecordIdInBatch = new RecordIdentifier( originalTransaction != null ? originalTransaction[lastValidIndex] : repeatedOriginalTransaction, @@ -482,7 +485,7 @@ public void close() throws IOException { * heuristic that prevents creation of an instance of this class if the memory pressure is high. * The SortMergedDeleteEventRegistry is then the fallback method for such scenarios. */ - static class ColumnizedDeleteEventRegistry implements DeleteEventRegistry { + public static class ColumnizedDeleteEventRegistry implements DeleteEventRegistry { /** * A simple wrapper class to hold the (otid, rowId) pair. */ @@ -626,8 +629,9 @@ public int compareTo(CompressedOtid other) { private CompressedOtid compressedOtids[]; private ValidTxnList validTxnList; - public ColumnizedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, - Reader.Options readerOptions) throws IOException, DeleteEventsOverflowMemoryException { + public ColumnizedDeleteEventRegistry( + JobConf conf, OrcSplit orcSplit, Reader.Options readerOptions) + throws IOException, DeleteEventsOverflowMemoryException { int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucket(); String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY); this.validTxnList = (txnString == null) ? new ValidReadTxnList() : new ValidReadTxnList(txnString); @@ -775,7 +779,7 @@ private boolean isDeleted(long otid, long rowId) { } @Override - public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet) + public void findDeletedRecords(ColumnVector[] cols, int size, BitSet selectedBitSet) throws IOException { if (rowIds == null || compressedOtids == null) { return; @@ -784,13 +788,13 @@ public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet) // check if it is deleted or not. long[] originalTransactionVector = - batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? null - : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector; + cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? null + : ((LongColumnVector) cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector; long repeatedOriginalTransaction = (originalTransactionVector != null) ? -1 - : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[0]; + : ((LongColumnVector) cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[0]; long[] rowIdVector = - ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector; + ((LongColumnVector) cols[OrcRecordUpdater.ROW_ID]).vector; for (int setBitIndex = selectedBitSet.nextSetBit(0); setBitIndex >= 0; @@ -801,7 +805,7 @@ public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet) if (isDeleted(otid, rowId)) { selectedBitSet.clear(setBitIndex); } - } + } } @Override @@ -816,7 +820,7 @@ public void close() throws IOException { } } - static class DeleteEventsOverflowMemoryException extends Exception { + public static class DeleteEventsOverflowMemoryException extends Exception { private static final long serialVersionUID = 1L; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index 2120400..2d4d7d4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -258,7 +258,7 @@ public void deriveExplainAttributes() { } public void deriveLlap(Configuration conf, boolean isExecDriver) { - boolean hasLlap = false, hasNonLlap = false, hasAcid = false; + boolean hasLlap = false, hasNonLlap = false; // Assume the IO is enabled on the daemon by default. We cannot reasonably check it here. boolean isLlapOn = HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ENABLED, llapMode); boolean canWrapAny = false, doCheckIfs = false; @@ -278,12 +278,7 @@ public void deriveLlap(Configuration conf, boolean isExecDriver) { boolean isUsingLlapIo = HiveInputFormat.canWrapForLlap( part.getInputFileFormatClass(), doCheckIfs); if (isUsingLlapIo) { - if (part.getTableDesc() != null && - AcidUtils.isTablePropertyTransactional(part.getTableDesc().getProperties())) { - hasAcid = true; - } else { - hasLlap = true; - } + hasLlap = true; } else { hasNonLlap = true; } @@ -296,7 +291,7 @@ public void deriveLlap(Configuration conf, boolean isExecDriver) { } llapIoDesc = deriveLlapIoDescString( - isLlapOn, canWrapAny, hasPathToPartInfo, hasLlap, hasNonLlap, hasAcid); + isLlapOn, canWrapAny, hasPathToPartInfo, hasLlap, hasNonLlap); } private boolean checkVectorizerSupportedTypes(boolean hasLlap) { @@ -321,11 +316,10 @@ private boolean checkVectorizerSupportedTypes(boolean hasLlap) { } private static String deriveLlapIoDescString(boolean isLlapOn, boolean canWrapAny, - boolean hasPathToPartInfo, boolean hasLlap, boolean hasNonLlap, boolean hasAcid) { + boolean hasPathToPartInfo, boolean hasLlap, boolean hasNonLlap) { if (!isLlapOn) return null; // LLAP IO is off, don't output. if (!canWrapAny) return "no inputs"; // Cannot use with input formats. if (!hasPathToPartInfo) return "unknown"; // No information to judge. - if (hasAcid) return "may be used (ACID table)"; return (hasLlap ? (hasNonLlap ? "some inputs" : "all inputs") : "no inputs"); } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java index 6bf1312..1957f9d 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java @@ -247,17 +247,17 @@ public void testCanCreateVectorizedAcidRowBatchReaderOnSplit() throws Exception conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, AcidUtils.AcidOperationalProperties.getLegacy().toInt()); // Test false when trying to create a vectorized ACID row batch reader for a legacy table. - assertFalse(VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, mockSplit)); + assertFalse(VectorizedOrcAcidRowBatchReader.isAcid(conf, mockSplit)); conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, AcidUtils.AcidOperationalProperties.getDefault().toInt()); Mockito.when(mockSplit.isOriginal()).thenReturn(true); // Test false when trying to create a vectorized ACID row batch reader when reading originals. - assertFalse(VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, mockSplit)); + assertFalse(VectorizedOrcAcidRowBatchReader.isAcid(conf, mockSplit)); // A positive test case. Mockito.when(mockSplit.isOriginal()).thenReturn(false); - assertTrue(VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, mockSplit)); + assertTrue(VectorizedOrcAcidRowBatchReader.isAcid(conf, mockSplit)); } } diff --git ql/src/test/queries/clientpositive/llap_acid.q ql/src/test/queries/clientpositive/llap_acid.q index 6bd216a..41d86af 100644 --- ql/src/test/queries/clientpositive/llap_acid.q +++ ql/src/test/queries/clientpositive/llap_acid.q @@ -27,7 +27,7 @@ select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limi insert into table orc_llap partition (csmallint = 2) select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limit 10; -alter table orc_llap SET TBLPROPERTIES ('transactional'='true'); +alter table orc_llap SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default'); insert into table orc_llap partition (csmallint = 3) select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble desc limit 10; diff --git ql/src/test/results/clientpositive/llap/acid_bucket_pruning.q.out ql/src/test/results/clientpositive/llap/acid_bucket_pruning.q.out index d05bf64..e54fdec 100644 --- ql/src/test/results/clientpositive/llap/acid_bucket_pruning.q.out +++ ql/src/test/results/clientpositive/llap/acid_bucket_pruning.q.out @@ -76,7 +76,7 @@ STAGE PLANS: GatherStats: false MultiFileSpray: false Execution mode: llap - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Path -> Alias: #### A masked pattern was here #### Path -> Partition: diff --git ql/src/test/results/clientpositive/llap/dynpart_sort_optimization_acid.q.out ql/src/test/results/clientpositive/llap/dynpart_sort_optimization_acid.q.out index 788854a..6799d69 100644 --- ql/src/test/results/clientpositive/llap/dynpart_sort_optimization_acid.q.out +++ ql/src/test/results/clientpositive/llap/dynpart_sort_optimization_acid.q.out @@ -106,7 +106,7 @@ STAGE PLANS: Map-reduce partition columns: UDFToInteger(_col0) (type: int) Statistics: Num rows: 800 Data size: 15400 Basic stats: COMPLETE Column stats: NONE Execution mode: llap - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Reducer 2 Execution mode: llap Reduce Operator Tree: @@ -200,7 +200,7 @@ STAGE PLANS: Statistics: Num rows: 800 Data size: 347200 Basic stats: COMPLETE Column stats: PARTIAL value expressions: _col3 (type: string) Execution mode: llap - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Reducer 2 Execution mode: llap Reduce Operator Tree: @@ -386,7 +386,7 @@ STAGE PLANS: Map-reduce partition columns: UDFToInteger(_col0) (type: int) Statistics: Num rows: 800 Data size: 15400 Basic stats: COMPLETE Column stats: NONE Execution mode: llap - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Reducer 2 Execution mode: llap Reduce Operator Tree: @@ -479,7 +479,7 @@ STAGE PLANS: Map-reduce partition columns: _col3 (type: string) Statistics: Num rows: 800 Data size: 347200 Basic stats: COMPLETE Column stats: PARTIAL Execution mode: llap - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Reducer 2 Execution mode: llap Reduce Operator Tree: @@ -675,7 +675,7 @@ STAGE PLANS: Map-reduce partition columns: UDFToInteger(_col0) (type: int) Statistics: Num rows: 800 Data size: 15400 Basic stats: COMPLETE Column stats: NONE Execution mode: llap - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Reducer 2 Execution mode: llap Reduce Operator Tree: @@ -770,7 +770,7 @@ STAGE PLANS: Statistics: Num rows: 1600 Data size: 556800 Basic stats: COMPLETE Column stats: PARTIAL value expressions: _col4 (type: int) Execution mode: llap - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Reducer 2 Execution mode: llap Reduce Operator Tree: @@ -894,7 +894,7 @@ STAGE PLANS: Statistics: Num rows: 1600 Data size: 422400 Basic stats: COMPLETE Column stats: PARTIAL value expressions: _col1 (type: string), _col2 (type: int) Execution mode: llap - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Reducer 2 Execution mode: llap Reduce Operator Tree: @@ -1087,7 +1087,7 @@ STAGE PLANS: Map-reduce partition columns: UDFToInteger(_col0) (type: int) Statistics: Num rows: 800 Data size: 15400 Basic stats: COMPLETE Column stats: NONE Execution mode: llap - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Reducer 2 Execution mode: llap Reduce Operator Tree: @@ -1181,7 +1181,7 @@ STAGE PLANS: Map-reduce partition columns: '2008-04-08' (type: string), _col4 (type: int) Statistics: Num rows: 1600 Data size: 556800 Basic stats: COMPLETE Column stats: PARTIAL Execution mode: llap - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Reducer 2 Execution mode: llap Reduce Operator Tree: @@ -1305,7 +1305,7 @@ STAGE PLANS: Map-reduce partition columns: _col1 (type: string), _col2 (type: int) Statistics: Num rows: 1600 Data size: 422400 Basic stats: COMPLETE Column stats: PARTIAL Execution mode: llap - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Reducer 2 Execution mode: llap Reduce Operator Tree: @@ -1500,7 +1500,7 @@ STAGE PLANS: Statistics: Num rows: 800 Data size: 280800 Basic stats: COMPLETE Column stats: PARTIAL value expressions: _col1 (type: string), 'bar' (type: string) Execution mode: llap - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Reducer 2 Execution mode: llap Reduce Operator Tree: @@ -1596,7 +1596,7 @@ STAGE PLANS: Statistics: Num rows: 1600 Data size: 561600 Basic stats: COMPLETE Column stats: PARTIAL value expressions: _col1 (type: string), 'bar' (type: string) Execution mode: llap - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Reducer 2 Execution mode: llap Reduce Operator Tree: diff --git ql/src/test/results/clientpositive/llap/sqlmerge.q.out ql/src/test/results/clientpositive/llap/sqlmerge.q.out index c73e0d2..c3995b0 100644 --- ql/src/test/results/clientpositive/llap/sqlmerge.q.out +++ ql/src/test/results/clientpositive/llap/sqlmerge.q.out @@ -60,7 +60,7 @@ STAGE PLANS: Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE value expressions: ROW__ID (type: struct) Execution mode: llap - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Map 7 Map Operator Tree: TableScan @@ -301,7 +301,7 @@ STAGE PLANS: Map-reduce partition columns: a (type: int) Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE Execution mode: llap - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Map 4 Map Operator Tree: TableScan diff --git ql/src/test/results/clientpositive/llap_acid.q.out ql/src/test/results/clientpositive/llap_acid.q.out index 5970fd7..e53c3b9 100644 --- ql/src/test/results/clientpositive/llap_acid.q.out +++ ql/src/test/results/clientpositive/llap_acid.q.out @@ -50,11 +50,11 @@ POSTHOOK: Lineage: orc_llap PARTITION(csmallint=2).cbigint SIMPLE [(alltypesorc) POSTHOOK: Lineage: orc_llap PARTITION(csmallint=2).cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ] POSTHOOK: Lineage: orc_llap PARTITION(csmallint=2).cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ] POSTHOOK: Lineage: orc_llap PARTITION(csmallint=2).cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] -PREHOOK: query: alter table orc_llap SET TBLPROPERTIES ('transactional'='true') +PREHOOK: query: alter table orc_llap SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default') PREHOOK: type: ALTERTABLE_PROPERTIES PREHOOK: Input: default@orc_llap PREHOOK: Output: default@orc_llap -POSTHOOK: query: alter table orc_llap SET TBLPROPERTIES ('transactional'='true') +POSTHOOK: query: alter table orc_llap SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default') POSTHOOK: type: ALTERTABLE_PROPERTIES POSTHOOK: Input: default@orc_llap POSTHOOK: Output: default@orc_llap @@ -105,7 +105,7 @@ STAGE PLANS: Statistics: Num rows: 20 Data size: 296 Basic stats: COMPLETE Column stats: NONE value expressions: _col2 (type: bigint) Execution mode: vectorized - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Reduce Operator Tree: Select Operator expressions: KEY.reducesinkkey1 (type: int), KEY.reducesinkkey0 (type: smallint), VALUE._col0 (type: bigint) @@ -230,7 +230,7 @@ STAGE PLANS: Statistics: Num rows: 20 Data size: 296 Basic stats: COMPLETE Column stats: NONE value expressions: _col2 (type: bigint) Execution mode: vectorized - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Reduce Operator Tree: Select Operator expressions: KEY.reducesinkkey1 (type: int), KEY.reducesinkkey0 (type: smallint), VALUE._col0 (type: bigint)