diff --git llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java index e5ab601..b2f19e3 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java @@ -18,11 +18,12 @@ package org.apache.hadoop.hive.llap.io.api; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.InputFormat; public interface LlapIo { - InputFormat getInputFormat(InputFormat sourceInputFormat, Deserializer serde); + InputFormat getInputFormat(InputFormat sourceInputFormat, Deserializer serde, Configuration conf); void close(); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java index 7c309a4..25a04a5 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java @@ -23,14 +23,13 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.management.ObjectName; import org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool; +import org.apache.hadoop.hive.llap.io.decode.OrcAcidColumnVectorProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -66,8 +65,6 @@ import org.apache.hadoop.metrics2.util.MBeans; import com.google.common.primitives.Ints; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; public class LlapIoImpl implements LlapIo { @@ -80,6 +77,7 @@ // TODO: later, we may have a map private final ColumnVectorProducer orcCvp, genericCvp; + private final ColumnVectorProducer acidOrcCvp; private final ExecutorService executor; private final LlapDaemonCacheMetrics cacheMetrics; private final LlapDaemonIOMetrics ioMetrics; @@ -162,6 +160,8 @@ private LlapIoImpl(Configuration conf) throws IOException { metadataCache, cache, bufferManager, conf, cacheMetrics, ioMetrics); this.genericCvp = isEncodeEnabled ? new GenericColumnVectorProducer( serdeCache, bufferManager, conf, cacheMetrics, ioMetrics) : null; + this.acidOrcCvp = new OrcAcidColumnVectorProducer( + metadataCache, cache, bufferManager, conf, cacheMetrics, ioMetrics); LOG.info("LLAP IO initialized"); registerMXBeans(); @@ -174,10 +174,15 @@ private void registerMXBeans() { @SuppressWarnings("rawtypes") @Override public InputFormat getInputFormat( - InputFormat sourceInputFormat, Deserializer sourceSerDe) { + InputFormat sourceInputFormat, Deserializer sourceSerDe, Configuration conf) { ColumnVectorProducer cvp = genericCvp; if (sourceInputFormat instanceof OrcInputFormat) { - cvp = orcCvp; // Special-case for ORC. + OrcInputFormat orcInputFormat = (OrcInputFormat) sourceInputFormat; + if (orcInputFormat.isAcidRead(conf)) { + cvp = acidOrcCvp; // Special case for ACID ORC. + } else { + cvp = orcCvp; // Special case for non-ACID ORC. + } } else if (cvp == null) { LOG.warn("LLAP encode is disabled; cannot use for " + sourceInputFormat.getClass()); return null; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcAcidColumnVectorProducer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcAcidColumnVectorProducer.java new file mode 100644 index 0000000..6e15d33 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcAcidColumnVectorProducer.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.io.decode; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.llap.cache.BufferUsageManager; +import org.apache.hadoop.hive.llap.cache.LowLevelCache; +import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; +import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch; +import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; +import org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader; +import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics; +import org.apache.hadoop.hive.ql.io.AcidMergeUtils; +import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.orc.OrcConf; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * OrcAcidColumnVectorProducer produces a ReadPipeline of ORC ACID data for LLAP. + */ +public class OrcAcidColumnVectorProducer implements ColumnVectorProducer { + + private final OrcMetadataCache metadataCache; + private final LowLevelCache baseCache; + private final BufferUsageManager bufferManager; + private final Configuration conf; + private boolean _skipCorrupt; // TODO: get rid of this + private LlapDaemonCacheMetrics cacheMetrics; + private LlapDaemonIOMetrics ioMetrics; + + public OrcAcidColumnVectorProducer( + OrcMetadataCache metadataCache, LowLevelCache baseCache, BufferUsageManager bufferManager, + Configuration conf, LlapDaemonCacheMetrics cacheMetrics, LlapDaemonIOMetrics ioMetrics) { + LlapIoImpl.LOG.info("Initializing ORC ACID column vector producer"); + + this.metadataCache = metadataCache; + this.baseCache = baseCache; + this.bufferManager = bufferManager; + this.conf = conf; + this._skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf); + this.cacheMetrics = cacheMetrics; + this.ioMetrics = ioMetrics; + } + + @Override + public ReadPipeline createReadPipeline( + Consumer consumer, FileSplit split, List columnIds, + SearchArgument sarg, String[] columnNames, QueryFragmentCounters counters, + TypeDescription readerSchema, InputFormat unused0, Deserializer unused1, + Reporter reporter, JobConf job, Map unused2) throws IOException { + cacheMetrics.incrCacheReadRequests(); + OrcEncodedDataConsumer edc; + if (AcidMergeUtils.isAcid(job, split)) { + // If the split is ACID, then use ORC ACID consumer + edc = new OrcAcidEncodedDataConsumer(consumer, columnIds.size(), + _skipCorrupt, counters, ioMetrics, job, split); + } else { + // If the split is non-ACID, then use ORC consumer + edc = new OrcEncodedDataConsumer( + consumer, columnIds.size(), _skipCorrupt, counters, ioMetrics); + } + // Note: we use global conf here and ignore JobConf. + OrcEncodedDataReader reader = new OrcEncodedDataReader(baseCache, bufferManager, + metadataCache, conf, split, columnIds, sarg, columnNames, edc, counters, readerSchema); + edc.init(reader, reader); + return edc; + } +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcAcidEncodedDataConsumer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcAcidEncodedDataConsumer.java new file mode 100644 index 0000000..c1eebaf --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcAcidEncodedDataConsumer.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.io.decode; + +import org.apache.hadoop.hive.common.ValidReadTxnList; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; +import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.io.AcidMergeUtils; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger; +import org.apache.hadoop.hive.ql.io.orc.OrcSplit; +import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer; +import org.apache.hadoop.hive.ql.io.orc.encoded.Reader; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; + +import java.io.IOException; +import java.util.BitSet; + +import static org.apache.hadoop.hive.ql.io.AcidMergeUtils.findRecordsWithInvalidTransactionIds; + +/** + * OrcAcidEncodeDataConsumer consumers data after merging the base data with ACID delta data. + */ +public class OrcAcidEncodedDataConsumer extends OrcEncodedDataConsumer implements ReadPipeline { + private final InnerConsumer innerConsumer = new InnerConsumer(); + private final JobConf conf; + private final FileSplit split; + + public OrcAcidEncodedDataConsumer( + Consumer consumer, int size, boolean skipCorrupt, + QueryFragmentCounters counters, LlapDaemonIOMetrics ioMetrics, + JobConf conf, FileSplit split) throws IOException { + super(consumer, size, skipCorrupt, counters, ioMetrics); + this.split = split; + this.conf = conf; + } + + @Override + protected void decodeBatch(Reader.OrcEncodedColumnBatch batch, + Consumer downstreamConsumer) { + innerConsumer.downstreamConsumer = downstreamConsumer; + super.decodeBatch(batch, innerConsumer); + } + + private class InnerConsumer implements Consumer { + Consumer downstreamConsumer; + + @Override + public void consumeData(ColumnVectorBatch data) { + // Clone readerOptions for deleteEvents. + Reader.Options readerOptions = OrcInputFormat.createOptionsForReader(conf); + readerOptions = OrcRawRecordMerger.createEventOptions(readerOptions); + Reader.Options deleteEventReaderOptions = readerOptions.clone(); + // Set the range on the deleteEventReaderOptions to 0 to INTEGER_MAX because + // we always want to read all the delete delta files. + deleteEventReaderOptions.range(0, Long.MAX_VALUE); + // Disable SARGs for deleteEventReaders, as SARGs have no meaning. + deleteEventReaderOptions.searchArgument(null, null); + OrcSplit orcSplit = (OrcSplit) split; + + AcidMergeUtils.DeleteEventRegistry deleteEventRegistry; + try { + try { + // See if we can load all the delete events from all the delete deltas in memory... + deleteEventRegistry = new AcidMergeUtils.ColumnizedDeleteEventRegistry( + conf, orcSplit, deleteEventReaderOptions); + } catch (AcidMergeUtils.DeleteEventsOverflowMemoryException e) { + // If not, then create a set of hanging readers that do sort-merge to find the next smallest + // delete event on-demand. Caps the memory consumption to (some_const * no. of readers). + deleteEventRegistry = new AcidMergeUtils.SortMergedDeleteEventRegistry( + conf, orcSplit, deleteEventReaderOptions); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + + BitSet selectedBitSet = new BitSet(data.size); + + String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY); + ValidTxnList validTxnList = (txnString == null) ? new ValidReadTxnList() : new ValidReadTxnList(txnString); + + // Case 1- find rows which belong to transactions that are not valid. + findRecordsWithInvalidTransactionIds(data.cols, data.size, selectedBitSet, validTxnList); + + // Case 2- find rows which have been deleted. + try { + deleteEventRegistry.findDeletedRecords(data.cols, data.size, selectedBitSet); + } catch (IOException e) { + throw new RuntimeException(e); + } + + // Select only not deleted ones + int cardinality = selectedBitSet.cardinality(); + if (cardinality != data.size) { + data.size = cardinality; + for (int setBitIndex = selectedBitSet.nextSetBit(0), selectedItr = 0; + setBitIndex >= 0; + setBitIndex = selectedBitSet.nextSetBit(setBitIndex+1), ++selectedItr) { + for (ColumnVector columnVector : data.cols) { + columnVector.setElement(selectedItr, setBitIndex, columnVector); + } + } + } + + downstreamConsumer.consumeData(data); + } + + @Override + public void setDone() { + downstreamConsumer.setDone(); + } + + @Override + public void setError(Throwable t) { + downstreamConsumer.setError(t); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidMergeUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidMergeUtils.java new file mode 100644 index 0000000..386cb3a --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidMergeUtils.java @@ -0,0 +1,623 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ValidReadTxnList; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.orc.OrcFile; +import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger; +import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; +import org.apache.hadoop.hive.ql.io.orc.OrcSplit; +import org.apache.hadoop.hive.ql.io.orc.OrcStruct; +import org.apache.hadoop.hive.ql.io.orc.Reader; +import org.apache.hadoop.hive.ql.io.orc.RecordReader; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.orc.impl.AcidStats; +import org.apache.orc.impl.OrcAcidUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.BitSet; +import java.util.Map; +import java.util.TreeMap; + +/** + * Utilities that merge ACID delta and bases. + */ +public class AcidMergeUtils { + /** + * Returns whether it is possible to create a valid instance of this class for a given split. + * @param conf is the job configuration + * @param inputSplit + * @return true if it is possible, else false. + */ + public static boolean isAcid(JobConf conf, InputSplit inputSplit) { + if (!(inputSplit instanceof OrcSplit)) { + return false; // must be an instance of OrcSplit. + } + // First check if we are reading any original files in the split. + // To simplify the vectorization logic, the vectorized acid row batch reader does not handle + // original files for now as they have a different schema than a regular ACID file. + final OrcSplit split = (OrcSplit) inputSplit; + if (AcidUtils.getAcidOperationalProperties(conf).isSplitUpdate() && !split.isOriginal()) { + // When split-update is turned on for ACID, a more optimized vectorized batch reader + // can be created. But still only possible when we are *NOT* reading any originals. + return true; + } + return false; // no split-update or possibly reading originals! + } + + + public static Path[] getDeleteDeltaDirsFromSplit(OrcSplit orcSplit) throws IOException { + Path path = orcSplit.getPath(); + Path root; + if (orcSplit.hasBase()) { + if (orcSplit.isOriginal()) { + root = path.getParent(); + } else { + root = path.getParent().getParent(); + } + } else { + root = path; + } + return AcidUtils.deserializeDeleteDeltas(root, orcSplit.getDeltas()); + } + + public static void findRecordsWithInvalidTransactionIds( + ColumnVector[] cols, int size, BitSet selectedBitSet, ValidTxnList validTxnList) { + if (cols[OrcRecordUpdater.CURRENT_TRANSACTION].isRepeating) { + // When we have repeating values, we can unset the whole bitset at once + // if the repeating value is not a valid transaction. + long currentTransactionIdForBatch = ((LongColumnVector) + cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector[0]; + if (!validTxnList.isTxnValid(currentTransactionIdForBatch)) { + selectedBitSet.clear(0, size); + } + return; + } + long[] currentTransactionVector = + ((LongColumnVector) cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector; + // Loop through the bits that are set to true and mark those rows as false, if their + // current transactions are not valid. + for (int setBitIndex = selectedBitSet.nextSetBit(0); + setBitIndex >= 0; + setBitIndex = selectedBitSet.nextSetBit(setBitIndex+1)) { + if (!validTxnList.isTxnValid(currentTransactionVector[setBitIndex])) { + selectedBitSet.clear(setBitIndex); + } + } + } + + /** + * An interface that can determine which rows have been deleted + * from a given vectorized row batch. Implementations of this interface + * will read the delete delta files and will create their own internal + * data structures to maintain record ids of the records that got deleted. + */ + public static interface DeleteEventRegistry { + /** + * Modifies the passed bitset to indicate which of the rows in the batch + * have been deleted. Assumes that the batch.size is equal to bitset size. + * @param cols + * @param size + * @param selectedBitSet + * @throws IOException + */ + public void findDeletedRecords(ColumnVector[] cols, int size, BitSet selectedBitSet) throws IOException; + + /** + * The close() method can be called externally to signal the implementing classes + * to free up resources. + * @throws IOException + */ + public void close() throws IOException; + } + + /** + * An implementation for DeleteEventRegistry that opens the delete delta files all + * at once, and then uses the sort-merge algorithm to maintain a sorted list of + * delete events. This internally uses the OrcRawRecordMerger and maintains a constant + * amount of memory usage, given the number of delete delta files. Therefore, this + * implementation will be picked up when the memory pressure is high. + */ + public static class SortMergedDeleteEventRegistry implements DeleteEventRegistry { + private OrcRawRecordMerger deleteRecords; + private OrcRawRecordMerger.ReaderKey deleteRecordKey; + private OrcStruct deleteRecordValue; + private boolean isDeleteRecordAvailable = true; + private ValidTxnList validTxnList; + + public SortMergedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, Reader.Options readerOptions) + throws IOException { + final Path[] deleteDeltas = getDeleteDeltaDirsFromSplit(orcSplit); + if (deleteDeltas.length > 0) { + int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucket(); + String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY); + this.validTxnList = (txnString == null) ? new ValidReadTxnList() : new ValidReadTxnList(txnString); + this.deleteRecords = new OrcRawRecordMerger(conf, true, null, false, bucket, + validTxnList, readerOptions, deleteDeltas); + this.deleteRecordKey = new OrcRawRecordMerger.ReaderKey(); + this.deleteRecordValue = this.deleteRecords.createValue(); + // Initialize the first value in the delete reader. + this.isDeleteRecordAvailable = this.deleteRecords.next(deleteRecordKey, deleteRecordValue); + } else { + this.isDeleteRecordAvailable = false; + this.deleteRecordKey = null; + this.deleteRecordValue = null; + this.deleteRecords = null; + } + } + + @Override + public void findDeletedRecords(ColumnVector[] cols, int size, BitSet selectedBitSet) + throws IOException { + if (!isDeleteRecordAvailable) { + return; + } + + long[] originalTransaction = + cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? null + : ((LongColumnVector) cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector; + long[] bucket = + cols[OrcRecordUpdater.BUCKET].isRepeating ? null + : ((LongColumnVector) cols[OrcRecordUpdater.BUCKET]).vector; + long[] rowId = + cols[OrcRecordUpdater.ROW_ID].isRepeating ? null + : ((LongColumnVector) cols[OrcRecordUpdater.ROW_ID]).vector; + + // The following repeatedX values will be set, if any of the columns are repeating. + long repeatedOriginalTransaction = (originalTransaction != null) ? -1 + : ((LongColumnVector) cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[0]; + long repeatedBucket = (bucket != null) ? -1 + : ((LongColumnVector) cols[OrcRecordUpdater.BUCKET]).vector[0]; + long repeatedRowId = (rowId != null) ? -1 + : ((LongColumnVector) cols[OrcRecordUpdater.ROW_ID]).vector[0]; + + + // Get the first valid row in the batch still available. + int firstValidIndex = selectedBitSet.nextSetBit(0); + if (firstValidIndex == -1) { + return; // Everything in the batch has already been filtered out. + } + RecordIdentifier firstRecordIdInBatch = + new RecordIdentifier( + originalTransaction != null ? originalTransaction[firstValidIndex] : repeatedOriginalTransaction, + bucket != null ? (int) bucket[firstValidIndex] : (int) repeatedBucket, + rowId != null ? (int) rowId[firstValidIndex] : repeatedRowId); + + // Get the last valid row in the batch still available. + int lastValidIndex = selectedBitSet.previousSetBit(size - 1); + RecordIdentifier lastRecordIdInBatch = + new RecordIdentifier( + originalTransaction != null ? originalTransaction[lastValidIndex] : repeatedOriginalTransaction, + bucket != null ? (int) bucket[lastValidIndex] : (int) repeatedBucket, + rowId != null ? (int) rowId[lastValidIndex] : repeatedRowId); + + // We must iterate over all the delete records, until we find one record with + // deleteRecord >= firstRecordInBatch or until we exhaust all the delete records. + while (deleteRecordKey.compareRow(firstRecordIdInBatch) == -1) { + isDeleteRecordAvailable = deleteRecords.next(deleteRecordKey, deleteRecordValue); + if (!isDeleteRecordAvailable) return; // exhausted all delete records, return. + } + + // If we are here, then we have established that firstRecordInBatch <= deleteRecord. + // Now continue marking records which have been deleted until we reach the end of the batch + // or we exhaust all the delete records. + + int currIndex = firstValidIndex; + RecordIdentifier currRecordIdInBatch = new RecordIdentifier(); + while (isDeleteRecordAvailable && currIndex != -1 && currIndex <= lastValidIndex) { + currRecordIdInBatch.setValues( + (originalTransaction != null) ? originalTransaction[currIndex] : repeatedOriginalTransaction, + (bucket != null) ? (int) bucket[currIndex] : (int) repeatedBucket, + (rowId != null) ? rowId[currIndex] : repeatedRowId); + + if (deleteRecordKey.compareRow(currRecordIdInBatch) == 0) { + // When deleteRecordId == currRecordIdInBatch, this record in the batch has been deleted. + selectedBitSet.clear(currIndex); + currIndex = selectedBitSet.nextSetBit(currIndex + 1); // Move to next valid index. + } else if (deleteRecordKey.compareRow(currRecordIdInBatch) == 1) { + // When deleteRecordId > currRecordIdInBatch, we have to move on to look at the + // next record in the batch. + // But before that, can we short-circuit and skip the entire batch itself + // by checking if the deleteRecordId > lastRecordInBatch? + if (deleteRecordKey.compareRow(lastRecordIdInBatch) == 1) { + return; // Yay! We short-circuited, skip everything remaining in the batch and return. + } + currIndex = selectedBitSet.nextSetBit(currIndex + 1); // Move to next valid index. + } else { + // We have deleteRecordId < currRecordIdInBatch, we must now move on to find + // next the larger deleteRecordId that can possibly match anything in the batch. + isDeleteRecordAvailable = deleteRecords.next(deleteRecordKey, deleteRecordValue); + } + } + } + + @Override + public void close() throws IOException { + if (this.deleteRecords != null) { + this.deleteRecords.close(); + } + } + } + + /** + * An implementation for DeleteEventRegistry that optimizes for performance by loading + * all the delete events into memory at once from all the delete delta files. + * It starts by reading all the delete events through a regular sort merge logic + * into two vectors- one for original transaction id (otid), and the other for row id. + * (In the current version, since the bucket id should be same for all the delete deltas, + * it is not stored). The otids are likely to be repeated very often, as a single transaction + * often deletes thousands of rows. Hence, the otid vector is compressed to only store the + * toIndex and fromIndex ranges in the larger row id vector. Now, querying whether a + * record id is deleted or not, is done by performing a binary search on the + * compressed otid range. If a match is found, then a binary search is then performed on + * the larger rowId vector between the given toIndex and fromIndex. Of course, there is rough + * heuristic that prevents creation of an instance of this class if the memory pressure is high. + * The SortMergedDeleteEventRegistry is then the fallback method for such scenarios. + */ + public static class ColumnizedDeleteEventRegistry implements DeleteEventRegistry { + private static final Logger LOG = LoggerFactory.getLogger(ColumnizedDeleteEventRegistry.class); + + /** + * A simple wrapper class to hold the (otid, rowId) pair. + */ + static class DeleteRecordKey implements Comparable { + private long originalTransactionId; + private long rowId; + public DeleteRecordKey() { + this.originalTransactionId = -1; + this.rowId = -1; + } + public DeleteRecordKey(long otid, long rowId) { + this.originalTransactionId = otid; + this.rowId = rowId; + } + public void set(long otid, long rowId) { + this.originalTransactionId = otid; + this.rowId = rowId; + } + + @Override + public int compareTo(DeleteRecordKey other) { + if (other == null) { + return -1; + } + if (originalTransactionId != other.originalTransactionId) { + return originalTransactionId < other.originalTransactionId ? -1 : 1; + } + if (rowId != other.rowId) { + return rowId < other.rowId ? -1 : 1; + } + return 0; + } + } + + /** + * This class actually reads the delete delta files in vectorized row batches. + * For every call to next(), it returns the next smallest record id in the file if available. + * Internally, the next() buffers a row batch and maintains an index pointer, reading the + * next batch when the previous batch is exhausted. + */ + static class DeleteReaderValue { + private VectorizedRowBatch batch; + private final RecordReader recordReader; + private int indexPtrInBatch; + private final int bucketForSplit; // The bucket value should be same for all the records. + private final ValidTxnList validTxnList; + + public DeleteReaderValue(Reader deleteDeltaReader, Reader.Options readerOptions, int bucket, + ValidTxnList validTxnList) throws IOException { + this.recordReader = deleteDeltaReader.rowsOptions(readerOptions); + this.bucketForSplit = bucket; + this.batch = deleteDeltaReader.getSchema().createRowBatch(); + if (!recordReader.nextBatch(batch)) { // Read the first batch. + this.batch = null; // Oh! the first batch itself was null. Close the reader. + } + this.indexPtrInBatch = 0; + this.validTxnList = validTxnList; + } + + public boolean next(DeleteRecordKey deleteRecordKey) throws IOException { + if (batch == null) { + return false; + } + boolean isValidNext = false; + while (!isValidNext) { + if (indexPtrInBatch >= batch.size) { + // We have exhausted our current batch, read the next batch. + if (recordReader.nextBatch(batch)) { + // Whenever we are reading a batch, we must ensure that all the records in the batch + // have the same bucket id as the bucket id of the split. If not, throw exception. + // NOTE: this assertion might not hold, once virtual bucketing is in place. However, + // it should be simple to fix that case. Just replace check for bucket equality with + // a check for valid bucket mapping. Until virtual bucketing is added, it means + // either the split computation got messed up or we found some corrupted records. + long bucketForRecord = ((LongColumnVector) batch.cols[OrcRecordUpdater.BUCKET]).vector[0]; + if ((batch.size > 1 && !batch.cols[OrcRecordUpdater.BUCKET].isRepeating) + || (bucketForRecord != bucketForSplit)){ + throw new IOException("Corrupted records with different bucket ids " + + "from the containing bucket file found! Expected bucket id " + + bucketForSplit + ", however found the bucket id " + bucketForRecord); + } + indexPtrInBatch = 0; // After reading the batch, reset the pointer to beginning. + } else { + return false; // no more batches to read, exhausted the reader. + } + } + int originalTransactionIndex = + batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? 0 : indexPtrInBatch; + long originalTransaction = + ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[originalTransactionIndex]; + long rowId = ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector[indexPtrInBatch]; + int currentTransactionIndex = + batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION].isRepeating ? 0 : indexPtrInBatch; + long currentTransaction = + ((LongColumnVector) batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector[currentTransactionIndex]; + ++indexPtrInBatch; + if (validTxnList.isTxnValid(currentTransaction)) { + isValidNext = true; + deleteRecordKey.set(originalTransaction, rowId); + } + } + return true; + } + + public void close() throws IOException { + this.recordReader.close(); + } + } + + /** + * A CompressedOtid class stores a compressed representation of the original + * transaction ids (otids) read from the delete delta files. Since the record ids + * are sorted by (otid, rowId) and otids are highly likely to be repetitive, it is + * efficient to compress them as a CompressedOtid that stores the fromIndex and + * the toIndex. These fromIndex and toIndex reference the larger vector formed by + * concatenating the correspondingly ordered rowIds. + */ + private class CompressedOtid implements Comparable { + long originalTransactionId; + int fromIndex; // inclusive + int toIndex; // exclusive + + public CompressedOtid(long otid, int fromIndex, int toIndex) { + this.originalTransactionId = otid; + this.fromIndex = fromIndex; + this.toIndex = toIndex; + } + + @Override + public int compareTo(CompressedOtid other) { + // When comparing the CompressedOtid, the one with the lesser value is smaller. + if (originalTransactionId != other.originalTransactionId) { + return originalTransactionId < other.originalTransactionId ? -1 : 1; + } + return 0; + } + } + + private TreeMap sortMerger; + private long rowIds[]; + private CompressedOtid compressedOtids[]; + private ValidTxnList validTxnList; + + public ColumnizedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, + Reader.Options readerOptions) throws IOException, DeleteEventsOverflowMemoryException { + int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucket(); + String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY); + this.validTxnList = (txnString == null) ? new ValidReadTxnList() : new ValidReadTxnList(txnString); + this.sortMerger = new TreeMap(); + this.rowIds = null; + this.compressedOtids = null; + int maxEventsInMemory = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY); + + try { + final Path[] deleteDeltaDirs = getDeleteDeltaDirsFromSplit(orcSplit); + if (deleteDeltaDirs.length > 0) { + int totalDeleteEventCount = 0; + for (Path deleteDeltaDir : deleteDeltaDirs) { + Path deleteDeltaFile = AcidUtils.createBucketFile(deleteDeltaDir, bucket); + FileSystem fs = deleteDeltaFile.getFileSystem(conf); + // NOTE: Calling last flush length below is more for future-proofing when we have + // streaming deletes. But currently we don't support streaming deletes, and this can + // be removed if this becomes a performance issue. + long length = OrcAcidUtils.getLastFlushLength(fs, deleteDeltaFile); + // NOTE: A check for existence of deleteDeltaFile is required because we may not have + // deletes for the bucket being taken into consideration for this split processing. + if (length != -1 && fs.exists(deleteDeltaFile)) { + Reader deleteDeltaReader = OrcFile.createReader(deleteDeltaFile, + OrcFile.readerOptions(conf).maxLength(length)); + AcidStats acidStats = OrcAcidUtils.parseAcidStats(deleteDeltaReader); + if (acidStats.deletes == 0) { + continue; // just a safe check to ensure that we are not reading empty delete files. + } + totalDeleteEventCount += acidStats.deletes; + if (totalDeleteEventCount > maxEventsInMemory) { + // ColumnizedDeleteEventRegistry loads all the delete events from all the delete deltas + // into memory. To prevent out-of-memory errors, this check is a rough heuristic that + // prevents creation of an object of this class if the total number of delete events + // exceed this value. By default, it has been set to 10 million delete events per bucket. + LOG.info("Total number of delete events exceeds the maximum number of delete events " + + "that can be loaded into memory for the delete deltas in the directory at : " + + deleteDeltaDirs.toString() +". The max limit is currently set at " + + maxEventsInMemory + " and can be changed by setting the Hive config variable " + + HiveConf.ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname); + throw new DeleteEventsOverflowMemoryException(); + } + DeleteReaderValue deleteReaderValue = new DeleteReaderValue(deleteDeltaReader, + readerOptions, bucket, validTxnList); + DeleteRecordKey deleteRecordKey = new DeleteRecordKey(); + if (deleteReaderValue.next(deleteRecordKey)) { + sortMerger.put(deleteRecordKey, deleteReaderValue); + } else { + deleteReaderValue.close(); + } + } + } + if (totalDeleteEventCount > 0) { + // Initialize the rowId array when we have some delete events. + rowIds = new long[totalDeleteEventCount]; + readAllDeleteEventsFromDeleteDeltas(); + } + } + } catch(IOException|DeleteEventsOverflowMemoryException e) { + close(); // close any open readers, if there was some exception during initialization. + throw e; // rethrow the exception so that the caller can handle. + } + } + + private void readAllDeleteEventsFromDeleteDeltas() throws IOException { + if (sortMerger == null || sortMerger.isEmpty()) return; // trivial case, nothing to read. + int distinctOtids = 0; + long lastSeenOtid = -1; + long otids[] = new long[rowIds.length]; + int index = 0; + while (!sortMerger.isEmpty()) { + // The sortMerger is a heap data structure that stores a pair of + // (deleteRecordKey, deleteReaderValue) at each node and is ordered by deleteRecordKey. + // The deleteReaderValue is the actual wrapper class that has the reference to the + // underlying delta file that is being read, and its corresponding deleteRecordKey + // is the smallest record id for that file. In each iteration of this loop, we extract(poll) + // the minimum deleteRecordKey pair. Once we have processed that deleteRecordKey, we + // advance the pointer for the corresponding deleteReaderValue. If the underlying file + // itself has no more records, then we remove that pair from the heap, or else we + // add the updated pair back to the heap. + Map.Entry entry = sortMerger.pollFirstEntry(); + DeleteRecordKey deleteRecordKey = entry.getKey(); + DeleteReaderValue deleteReaderValue = entry.getValue(); + otids[index] = deleteRecordKey.originalTransactionId; + rowIds[index] = deleteRecordKey.rowId; + ++index; + if (lastSeenOtid != deleteRecordKey.originalTransactionId) { + ++distinctOtids; + lastSeenOtid = deleteRecordKey.originalTransactionId; + } + if (deleteReaderValue.next(deleteRecordKey)) { + sortMerger.put(deleteRecordKey, deleteReaderValue); + } else { + deleteReaderValue.close(); // Exhausted reading all records, close the reader. + } + } + + // Once we have processed all the delete events and seen all the distinct otids, + // we compress the otids into CompressedOtid data structure that records + // the fromIndex(inclusive) and toIndex(exclusive) for each unique otid. + this.compressedOtids = new CompressedOtid[distinctOtids]; + lastSeenOtid = otids[0]; + int fromIndex = 0, pos = 0; + for (int i = 1; i < otids.length; ++i) { + if (otids[i] != lastSeenOtid) { + compressedOtids[pos] = new CompressedOtid(lastSeenOtid, fromIndex, i); + lastSeenOtid = otids[i]; + fromIndex = i; + ++pos; + } + } + // account for the last distinct otid + compressedOtids[pos] = new CompressedOtid(lastSeenOtid, fromIndex, otids.length); + } + + private boolean isDeleted(long otid, long rowId) { + if (compressedOtids == null || rowIds == null) { + return false; + } + // To find if a given (otid, rowId) pair is deleted or not, we perform + // two binary searches at most. The first binary search is on the + // compressed otids. If a match is found, only then we do the next + // binary search in the larger rowId vector between the given toIndex & fromIndex. + + // Check if otid is outside the range of all otids present. + if (otid < compressedOtids[0].originalTransactionId + || otid > compressedOtids[compressedOtids.length - 1].originalTransactionId) { + return false; + } + // Create a dummy key for searching the otid in the compressed otid ranges. + CompressedOtid key = new CompressedOtid(otid, -1, -1); + int pos = Arrays.binarySearch(compressedOtids, key); + if (pos >= 0) { + // Otid with the given value found! Searching now for rowId... + key = compressedOtids[pos]; // Retrieve the actual CompressedOtid that matched. + // Check if rowId is outside the range of all rowIds present for this otid. + if (rowId < rowIds[key.fromIndex] + || rowId > rowIds[key.toIndex - 1]) { + return false; + } + if (Arrays.binarySearch(rowIds, key.fromIndex, key.toIndex, rowId) >= 0) { + return true; // rowId also found! + } + } + return false; + } + + @Override + public void findDeletedRecords(ColumnVector[] cols, int size, BitSet selectedBitSet) + throws IOException { + if (rowIds == null || compressedOtids == null) { + return; + } + // Iterate through the batch and for each (otid, rowid) in the batch + // check if it is deleted or not. + + long[] originalTransactionVector = + cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? null + : ((LongColumnVector) cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector; + long repeatedOriginalTransaction = (originalTransactionVector != null) ? -1 + : ((LongColumnVector) cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[0]; + + long[] rowIdVector = + ((LongColumnVector) cols[OrcRecordUpdater.ROW_ID]).vector; + + for (int setBitIndex = selectedBitSet.nextSetBit(0); + setBitIndex >= 0; + setBitIndex = selectedBitSet.nextSetBit(setBitIndex+1)) { + long otid = originalTransactionVector != null ? originalTransactionVector[setBitIndex] + : repeatedOriginalTransaction ; + long rowId = rowIdVector[setBitIndex]; + if (isDeleted(otid, rowId)) { + selectedBitSet.clear(setBitIndex); + } + } + } + + @Override + public void close() throws IOException { + // ColumnizedDeleteEventRegistry reads all the delete events into memory during initialization + // and it closes the delete event readers after it. If an exception gets thrown during + // initialization, we may have to close any readers that are still left open. + while (!sortMerger.isEmpty()) { + Map.Entry entry = sortMerger.pollFirstEntry(); + entry.getValue().close(); // close the reader for this entry + } + } + } + + public static class DeleteEventsOverflowMemoryException extends Exception { + private static final long serialVersionUID = 1L; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 51530ac..e565447 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -278,7 +278,7 @@ public void configure(JobConf job) { throw new HiveException("Error creating SerDe for LLAP IO", e); } } - InputFormat wrappedIf = llapIo.getInputFormat(inputFormat, serde); + InputFormat wrappedIf = llapIo.getInputFormat(inputFormat, serde, conf); if (wrappedIf == null) { return inputFormat; // We cannot wrap; the cause is logged inside. } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 369584b..a8d4945 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.AcidInputFormat; +import org.apache.hadoop.hive.ql.io.AcidMergeUtils; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.AcidUtils.AcidBaseFileInfo; @@ -210,6 +211,20 @@ public boolean isAcidRead(Configuration conf, InputSplit inputSplit) { return HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN); } + public boolean isAcidRead(Configuration conf) { + /* + * If OrcSplit.isAcid returns true, we know for sure it is ACID. + */ + // if (((OrcSplit) inputSplit).isAcid()) { + // return true; + // } + + /* + * Fallback for the case when OrcSplit flags do not contain hasBase and deltas + */ + return HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN); + } + private static class OrcRecordReader implements org.apache.hadoop.mapred.RecordReader, StatsProvidingRecordReader { @@ -1844,8 +1859,7 @@ private static void scheduleSplits(ETLSplitStrategy splitStrategy, Context conte reporter.setStatus(inputSplit.toString()); - boolean isFastVectorizedReaderAvailable = - VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, inputSplit); + boolean isFastVectorizedReaderAvailable = AcidMergeUtils.isAcid(conf, inputSplit); if (vectorMode && isFastVectorizedReaderAvailable) { // Faster vectorized ACID row batch reader is available that avoids row-by-row stitching. @@ -2030,7 +2044,7 @@ static Path findOriginalBucket(FileSystem fs, directory); } - static Reader.Options createOptionsForReader(Configuration conf) { + public static Reader.Options createOptionsForReader(Configuration conf) { /** * Do we have schema on read in the configuration variables? */ diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index dd53afa..424cc7f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -74,7 +74,7 @@ * reader is collapsing events to just the last update, just the first * instance of each record is required. */ - final static class ReaderKey extends RecordIdentifier{ + public final static class ReaderKey extends RecordIdentifier{ private long currentTransactionId; private int statementId;//sort on this descending, like currentTransactionId @@ -390,7 +390,7 @@ private void discoverKeyBounds(Reader reader, * @param options options for the row reader * @return a cloned options object that is modified for the event reader */ - static Reader.Options createEventOptions(Reader.Options options) { + public static Reader.Options createEventOptions(Reader.Options options) { Reader.Options result = options.clone(); result.range(options.getOffset(), Long.MAX_VALUE); result.include(options.getInclude()); @@ -417,7 +417,7 @@ private void discoverKeyBounds(Reader reader, * @param deltaDirectory the list of delta directories to include * @throws IOException */ - OrcRawRecordMerger(Configuration conf, + public OrcRawRecordMerger(Configuration conf, boolean collapseEvents, Reader reader, boolean isOriginal, diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 65f4a24..d36bd93 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -37,7 +37,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.io.IntWritable; @@ -66,13 +65,13 @@ final static int UPDATE_OPERATION = 1; final static int DELETE_OPERATION = 2; - final static int OPERATION = 0; - final static int ORIGINAL_TRANSACTION = 1; - final static int BUCKET = 2; - final static int ROW_ID = 3; - final static int CURRENT_TRANSACTION = 4; - final static int ROW = 5; - final static int FIELDS = 6; + public final static int OPERATION = 0; + public final static int ORIGINAL_TRANSACTION = 1; + public final static int BUCKET = 2; + public final static int ROW_ID = 3; + public final static int CURRENT_TRANSACTION = 4; + public final static int ROW = 5; + public final static int FIELDS = 6; final static int DELTA_BUFFER_SIZE = 16 * 1024; final static long DELTA_STRIPE_SIZE = 16 * 1024 * 1024; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java index d61b24b..eb6231f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java @@ -218,7 +218,7 @@ public long getColumnarProjectionSize() { @Override public boolean canUseLlapIo() { - return isOriginal && (deltas == null || deltas.isEmpty()); + return isOriginal; } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 75c7680..67bdf0c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -19,36 +19,22 @@ package org.apache.hadoop.hive.ql.io.orc; import java.io.IOException; -import java.util.Arrays; import java.util.BitSet; -import java.util.List; -import java.util.Map.Entry; -import java.util.TreeMap; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.io.AcidMergeUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; -import org.apache.orc.OrcProto; -import org.apache.orc.OrcUtils; -import org.apache.orc.TypeDescription; -import org.apache.orc.impl.AcidStats; -import org.apache.orc.impl.OrcAcidUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; /** @@ -62,8 +48,6 @@ public class VectorizedOrcAcidRowBatchReader implements org.apache.hadoop.mapred.RecordReader { - private static final Logger LOG = LoggerFactory.getLogger(VectorizedOrcAcidRowBatchReader.class); - private org.apache.hadoop.hive.ql.io.orc.RecordReader baseReader; private VectorizedRowBatchCtx rbCtx; private VectorizedRowBatch vectorizedRowBatchBase; @@ -73,7 +57,7 @@ private Object[] partitionValues; private boolean addPartitionCols = true; private ValidTxnList validTxnList; - private DeleteEventRegistry deleteEventRegistry; + private AcidMergeUtils.DeleteEventRegistry deleteEventRegistry; public VectorizedOrcAcidRowBatchReader(InputSplit inputSplit, JobConf conf, Reporter reporter) throws IOException { @@ -133,49 +117,12 @@ public VectorizedOrcAcidRowBatchReader(InputSplit inputSplit, JobConf conf, deleteEventReaderOptions.searchArgument(null, null); try { // See if we can load all the delete events from all the delete deltas in memory... - this.deleteEventRegistry = new ColumnizedDeleteEventRegistry(conf, orcSplit, deleteEventReaderOptions); - } catch (DeleteEventsOverflowMemoryException e) { + this.deleteEventRegistry = new AcidMergeUtils.ColumnizedDeleteEventRegistry(conf, orcSplit, deleteEventReaderOptions); + } catch (AcidMergeUtils.DeleteEventsOverflowMemoryException e) { // If not, then create a set of hanging readers that do sort-merge to find the next smallest // delete event on-demand. Caps the memory consumption to (some_const * no. of readers). - this.deleteEventRegistry = new SortMergedDeleteEventRegistry(conf, orcSplit, deleteEventReaderOptions); - } - } - - /** - * Returns whether it is possible to create a valid instance of this class for a given split. - * @param conf is the job configuration - * @param inputSplit - * @return true if it is possible, else false. - */ - public static boolean canCreateVectorizedAcidRowBatchReaderOnSplit(JobConf conf, InputSplit inputSplit) { - if (!(inputSplit instanceof OrcSplit)) { - return false; // must be an instance of OrcSplit. - } - // First check if we are reading any original files in the split. - // To simplify the vectorization logic, the vectorized acid row batch reader does not handle - // original files for now as they have a different schema than a regular ACID file. - final OrcSplit split = (OrcSplit) inputSplit; - if (AcidUtils.getAcidOperationalProperties(conf).isSplitUpdate() && !split.isOriginal()) { - // When split-update is turned on for ACID, a more optimized vectorized batch reader - // can be created. But still only possible when we are *NOT* reading any originals. - return true; - } - return false; // no split-update or possibly reading originals! - } - - private static Path[] getDeleteDeltaDirsFromSplit(OrcSplit orcSplit) throws IOException { - Path path = orcSplit.getPath(); - Path root; - if (orcSplit.hasBase()) { - if (orcSplit.isOriginal()) { - root = path.getParent(); - } else { - root = path.getParent().getParent(); - } - } else { - root = path; + this.deleteEventRegistry = new AcidMergeUtils.SortMergedDeleteEventRegistry(conf, orcSplit, deleteEventReaderOptions); } - return AcidUtils.deserializeDeleteDeltas(root, orcSplit.getDeltas()); } @Override @@ -221,10 +168,12 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti } // Case 1- find rows which belong to transactions that are not valid. - findRecordsWithInvalidTransactionIds(vectorizedRowBatchBase, selectedBitSet); + AcidMergeUtils.findRecordsWithInvalidTransactionIds( + vectorizedRowBatchBase.cols, vectorizedRowBatchBase.size, selectedBitSet, validTxnList); // Case 2- find rows which have been deleted. - this.deleteEventRegistry.findDeletedRecords(vectorizedRowBatchBase, selectedBitSet); + this.deleteEventRegistry.findDeletedRecords( + vectorizedRowBatchBase.cols, vectorizedRowBatchBase.size, selectedBitSet); if (selectedBitSet.cardinality() == vectorizedRowBatchBase.size) { // None of the cases above matched and everything is selected. Hence, we will use the @@ -257,30 +206,6 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti return true; } - private void findRecordsWithInvalidTransactionIds(VectorizedRowBatch batch, BitSet selectedBitSet) { - if (batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION].isRepeating) { - // When we have repeating values, we can unset the whole bitset at once - // if the repeating value is not a valid transaction. - long currentTransactionIdForBatch = ((LongColumnVector) - batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector[0]; - if (!validTxnList.isTxnValid(currentTransactionIdForBatch)) { - selectedBitSet.clear(0, batch.size); - } - return; - } - long[] currentTransactionVector = - ((LongColumnVector) batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector; - // Loop through the bits that are set to true and mark those rows as false, if their - // current transactions are not valid. - for (int setBitIndex = selectedBitSet.nextSetBit(0); - setBitIndex >= 0; - setBitIndex = selectedBitSet.nextSetBit(setBitIndex+1)) { - if (!validTxnList.isTxnValid(currentTransactionVector[setBitIndex])) { - selectedBitSet.clear(setBitIndex); - } - } - } - @Override public NullWritable createKey() { return NullWritable.get(); @@ -311,512 +236,8 @@ public float getProgress() throws IOException { } @VisibleForTesting - DeleteEventRegistry getDeleteEventRegistry() { + AcidMergeUtils.DeleteEventRegistry getDeleteEventRegistry() { return deleteEventRegistry; } - /** - * An interface that can determine which rows have been deleted - * from a given vectorized row batch. Implementations of this interface - * will read the delete delta files and will create their own internal - * data structures to maintain record ids of the records that got deleted. - */ - static interface DeleteEventRegistry { - /** - * Modifies the passed bitset to indicate which of the rows in the batch - * have been deleted. Assumes that the batch.size is equal to bitset size. - * @param batch - * @param selectedBitSet - * @throws IOException - */ - public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet) throws IOException; - - /** - * The close() method can be called externally to signal the implementing classes - * to free up resources. - * @throws IOException - */ - public void close() throws IOException; - } - - /** - * An implementation for DeleteEventRegistry that opens the delete delta files all - * at once, and then uses the sort-merge algorithm to maintain a sorted list of - * delete events. This internally uses the OrcRawRecordMerger and maintains a constant - * amount of memory usage, given the number of delete delta files. Therefore, this - * implementation will be picked up when the memory pressure is high. - */ - static class SortMergedDeleteEventRegistry implements DeleteEventRegistry { - private OrcRawRecordMerger deleteRecords; - private OrcRawRecordMerger.ReaderKey deleteRecordKey; - private OrcStruct deleteRecordValue; - private boolean isDeleteRecordAvailable = true; - private ValidTxnList validTxnList; - - public SortMergedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, Reader.Options readerOptions) - throws IOException { - final Path[] deleteDeltas = getDeleteDeltaDirsFromSplit(orcSplit); - if (deleteDeltas.length > 0) { - int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucket(); - String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY); - this.validTxnList = (txnString == null) ? new ValidReadTxnList() : new ValidReadTxnList(txnString); - this.deleteRecords = new OrcRawRecordMerger(conf, true, null, false, bucket, - validTxnList, readerOptions, deleteDeltas); - this.deleteRecordKey = new OrcRawRecordMerger.ReaderKey(); - this.deleteRecordValue = this.deleteRecords.createValue(); - // Initialize the first value in the delete reader. - this.isDeleteRecordAvailable = this.deleteRecords.next(deleteRecordKey, deleteRecordValue); - } else { - this.isDeleteRecordAvailable = false; - this.deleteRecordKey = null; - this.deleteRecordValue = null; - this.deleteRecords = null; - } - } - - @Override - public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet) - throws IOException { - if (!isDeleteRecordAvailable) { - return; - } - - long[] originalTransaction = - batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? null - : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector; - long[] bucket = - batch.cols[OrcRecordUpdater.BUCKET].isRepeating ? null - : ((LongColumnVector) batch.cols[OrcRecordUpdater.BUCKET]).vector; - long[] rowId = - batch.cols[OrcRecordUpdater.ROW_ID].isRepeating ? null - : ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector; - - // The following repeatedX values will be set, if any of the columns are repeating. - long repeatedOriginalTransaction = (originalTransaction != null) ? -1 - : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[0]; - long repeatedBucket = (bucket != null) ? -1 - : ((LongColumnVector) batch.cols[OrcRecordUpdater.BUCKET]).vector[0]; - long repeatedRowId = (rowId != null) ? -1 - : ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector[0]; - - - // Get the first valid row in the batch still available. - int firstValidIndex = selectedBitSet.nextSetBit(0); - if (firstValidIndex == -1) { - return; // Everything in the batch has already been filtered out. - } - RecordIdentifier firstRecordIdInBatch = - new RecordIdentifier( - originalTransaction != null ? originalTransaction[firstValidIndex] : repeatedOriginalTransaction, - bucket != null ? (int) bucket[firstValidIndex] : (int) repeatedBucket, - rowId != null ? (int) rowId[firstValidIndex] : repeatedRowId); - - // Get the last valid row in the batch still available. - int lastValidIndex = selectedBitSet.previousSetBit(batch.size - 1); - RecordIdentifier lastRecordIdInBatch = - new RecordIdentifier( - originalTransaction != null ? originalTransaction[lastValidIndex] : repeatedOriginalTransaction, - bucket != null ? (int) bucket[lastValidIndex] : (int) repeatedBucket, - rowId != null ? (int) rowId[lastValidIndex] : repeatedRowId); - - // We must iterate over all the delete records, until we find one record with - // deleteRecord >= firstRecordInBatch or until we exhaust all the delete records. - while (deleteRecordKey.compareRow(firstRecordIdInBatch) == -1) { - isDeleteRecordAvailable = deleteRecords.next(deleteRecordKey, deleteRecordValue); - if (!isDeleteRecordAvailable) return; // exhausted all delete records, return. - } - - // If we are here, then we have established that firstRecordInBatch <= deleteRecord. - // Now continue marking records which have been deleted until we reach the end of the batch - // or we exhaust all the delete records. - - int currIndex = firstValidIndex; - RecordIdentifier currRecordIdInBatch = new RecordIdentifier(); - while (isDeleteRecordAvailable && currIndex != -1 && currIndex <= lastValidIndex) { - currRecordIdInBatch.setValues( - (originalTransaction != null) ? originalTransaction[currIndex] : repeatedOriginalTransaction, - (bucket != null) ? (int) bucket[currIndex] : (int) repeatedBucket, - (rowId != null) ? rowId[currIndex] : repeatedRowId); - - if (deleteRecordKey.compareRow(currRecordIdInBatch) == 0) { - // When deleteRecordId == currRecordIdInBatch, this record in the batch has been deleted. - selectedBitSet.clear(currIndex); - currIndex = selectedBitSet.nextSetBit(currIndex + 1); // Move to next valid index. - } else if (deleteRecordKey.compareRow(currRecordIdInBatch) == 1) { - // When deleteRecordId > currRecordIdInBatch, we have to move on to look at the - // next record in the batch. - // But before that, can we short-circuit and skip the entire batch itself - // by checking if the deleteRecordId > lastRecordInBatch? - if (deleteRecordKey.compareRow(lastRecordIdInBatch) == 1) { - return; // Yay! We short-circuited, skip everything remaining in the batch and return. - } - currIndex = selectedBitSet.nextSetBit(currIndex + 1); // Move to next valid index. - } else { - // We have deleteRecordId < currRecordIdInBatch, we must now move on to find - // next the larger deleteRecordId that can possibly match anything in the batch. - isDeleteRecordAvailable = deleteRecords.next(deleteRecordKey, deleteRecordValue); - } - } - } - - @Override - public void close() throws IOException { - if (this.deleteRecords != null) { - this.deleteRecords.close(); - } - } - } - - /** - * An implementation for DeleteEventRegistry that optimizes for performance by loading - * all the delete events into memory at once from all the delete delta files. - * It starts by reading all the delete events through a regular sort merge logic - * into two vectors- one for original transaction id (otid), and the other for row id. - * (In the current version, since the bucket id should be same for all the delete deltas, - * it is not stored). The otids are likely to be repeated very often, as a single transaction - * often deletes thousands of rows. Hence, the otid vector is compressed to only store the - * toIndex and fromIndex ranges in the larger row id vector. Now, querying whether a - * record id is deleted or not, is done by performing a binary search on the - * compressed otid range. If a match is found, then a binary search is then performed on - * the larger rowId vector between the given toIndex and fromIndex. Of course, there is rough - * heuristic that prevents creation of an instance of this class if the memory pressure is high. - * The SortMergedDeleteEventRegistry is then the fallback method for such scenarios. - */ - static class ColumnizedDeleteEventRegistry implements DeleteEventRegistry { - /** - * A simple wrapper class to hold the (otid, rowId) pair. - */ - static class DeleteRecordKey implements Comparable { - private long originalTransactionId; - private long rowId; - public DeleteRecordKey() { - this.originalTransactionId = -1; - this.rowId = -1; - } - public DeleteRecordKey(long otid, long rowId) { - this.originalTransactionId = otid; - this.rowId = rowId; - } - public void set(long otid, long rowId) { - this.originalTransactionId = otid; - this.rowId = rowId; - } - - @Override - public int compareTo(DeleteRecordKey other) { - if (other == null) { - return -1; - } - if (originalTransactionId != other.originalTransactionId) { - return originalTransactionId < other.originalTransactionId ? -1 : 1; - } - if (rowId != other.rowId) { - return rowId < other.rowId ? -1 : 1; - } - return 0; - } - } - - /** - * This class actually reads the delete delta files in vectorized row batches. - * For every call to next(), it returns the next smallest record id in the file if available. - * Internally, the next() buffers a row batch and maintains an index pointer, reading the - * next batch when the previous batch is exhausted. - */ - static class DeleteReaderValue { - private VectorizedRowBatch batch; - private final RecordReader recordReader; - private int indexPtrInBatch; - private final int bucketForSplit; // The bucket value should be same for all the records. - private final ValidTxnList validTxnList; - - public DeleteReaderValue(Reader deleteDeltaReader, Reader.Options readerOptions, int bucket, - ValidTxnList validTxnList) throws IOException { - this.recordReader = deleteDeltaReader.rowsOptions(readerOptions); - this.bucketForSplit = bucket; - this.batch = deleteDeltaReader.getSchema().createRowBatch(); - if (!recordReader.nextBatch(batch)) { // Read the first batch. - this.batch = null; // Oh! the first batch itself was null. Close the reader. - } - this.indexPtrInBatch = 0; - this.validTxnList = validTxnList; - } - - public boolean next(DeleteRecordKey deleteRecordKey) throws IOException { - if (batch == null) { - return false; - } - boolean isValidNext = false; - while (!isValidNext) { - if (indexPtrInBatch >= batch.size) { - // We have exhausted our current batch, read the next batch. - if (recordReader.nextBatch(batch)) { - // Whenever we are reading a batch, we must ensure that all the records in the batch - // have the same bucket id as the bucket id of the split. If not, throw exception. - // NOTE: this assertion might not hold, once virtual bucketing is in place. However, - // it should be simple to fix that case. Just replace check for bucket equality with - // a check for valid bucket mapping. Until virtual bucketing is added, it means - // either the split computation got messed up or we found some corrupted records. - long bucketForRecord = ((LongColumnVector) batch.cols[OrcRecordUpdater.BUCKET]).vector[0]; - if ((batch.size > 1 && !batch.cols[OrcRecordUpdater.BUCKET].isRepeating) - || (bucketForRecord != bucketForSplit)){ - throw new IOException("Corrupted records with different bucket ids " - + "from the containing bucket file found! Expected bucket id " - + bucketForSplit + ", however found the bucket id " + bucketForRecord); - } - indexPtrInBatch = 0; // After reading the batch, reset the pointer to beginning. - } else { - return false; // no more batches to read, exhausted the reader. - } - } - int originalTransactionIndex = - batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? 0 : indexPtrInBatch; - long originalTransaction = - ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[originalTransactionIndex]; - long rowId = ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector[indexPtrInBatch]; - int currentTransactionIndex = - batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION].isRepeating ? 0 : indexPtrInBatch; - long currentTransaction = - ((LongColumnVector) batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector[currentTransactionIndex]; - ++indexPtrInBatch; - if (validTxnList.isTxnValid(currentTransaction)) { - isValidNext = true; - deleteRecordKey.set(originalTransaction, rowId); - } - } - return true; - } - - public void close() throws IOException { - this.recordReader.close(); - } - } - - /** - * A CompressedOtid class stores a compressed representation of the original - * transaction ids (otids) read from the delete delta files. Since the record ids - * are sorted by (otid, rowId) and otids are highly likely to be repetitive, it is - * efficient to compress them as a CompressedOtid that stores the fromIndex and - * the toIndex. These fromIndex and toIndex reference the larger vector formed by - * concatenating the correspondingly ordered rowIds. - */ - private class CompressedOtid implements Comparable { - long originalTransactionId; - int fromIndex; // inclusive - int toIndex; // exclusive - - public CompressedOtid(long otid, int fromIndex, int toIndex) { - this.originalTransactionId = otid; - this.fromIndex = fromIndex; - this.toIndex = toIndex; - } - - @Override - public int compareTo(CompressedOtid other) { - // When comparing the CompressedOtid, the one with the lesser value is smaller. - if (originalTransactionId != other.originalTransactionId) { - return originalTransactionId < other.originalTransactionId ? -1 : 1; - } - return 0; - } - } - - private TreeMap sortMerger; - private long rowIds[]; - private CompressedOtid compressedOtids[]; - private ValidTxnList validTxnList; - - public ColumnizedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, - Reader.Options readerOptions) throws IOException, DeleteEventsOverflowMemoryException { - int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucket(); - String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY); - this.validTxnList = (txnString == null) ? new ValidReadTxnList() : new ValidReadTxnList(txnString); - this.sortMerger = new TreeMap(); - this.rowIds = null; - this.compressedOtids = null; - int maxEventsInMemory = HiveConf.getIntVar(conf, ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY); - - try { - final Path[] deleteDeltaDirs = getDeleteDeltaDirsFromSplit(orcSplit); - if (deleteDeltaDirs.length > 0) { - int totalDeleteEventCount = 0; - for (Path deleteDeltaDir : deleteDeltaDirs) { - Path deleteDeltaFile = AcidUtils.createBucketFile(deleteDeltaDir, bucket); - FileSystem fs = deleteDeltaFile.getFileSystem(conf); - // NOTE: Calling last flush length below is more for future-proofing when we have - // streaming deletes. But currently we don't support streaming deletes, and this can - // be removed if this becomes a performance issue. - long length = OrcAcidUtils.getLastFlushLength(fs, deleteDeltaFile); - // NOTE: A check for existence of deleteDeltaFile is required because we may not have - // deletes for the bucket being taken into consideration for this split processing. - if (length != -1 && fs.exists(deleteDeltaFile)) { - Reader deleteDeltaReader = OrcFile.createReader(deleteDeltaFile, - OrcFile.readerOptions(conf).maxLength(length)); - AcidStats acidStats = OrcAcidUtils.parseAcidStats(deleteDeltaReader); - if (acidStats.deletes == 0) { - continue; // just a safe check to ensure that we are not reading empty delete files. - } - totalDeleteEventCount += acidStats.deletes; - if (totalDeleteEventCount > maxEventsInMemory) { - // ColumnizedDeleteEventRegistry loads all the delete events from all the delete deltas - // into memory. To prevent out-of-memory errors, this check is a rough heuristic that - // prevents creation of an object of this class if the total number of delete events - // exceed this value. By default, it has been set to 10 million delete events per bucket. - LOG.info("Total number of delete events exceeds the maximum number of delete events " - + "that can be loaded into memory for the delete deltas in the directory at : " - + deleteDeltaDirs.toString() +". The max limit is currently set at " - + maxEventsInMemory + " and can be changed by setting the Hive config variable " - + ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname); - throw new DeleteEventsOverflowMemoryException(); - } - DeleteReaderValue deleteReaderValue = new DeleteReaderValue(deleteDeltaReader, - readerOptions, bucket, validTxnList); - DeleteRecordKey deleteRecordKey = new DeleteRecordKey(); - if (deleteReaderValue.next(deleteRecordKey)) { - sortMerger.put(deleteRecordKey, deleteReaderValue); - } else { - deleteReaderValue.close(); - } - } - } - if (totalDeleteEventCount > 0) { - // Initialize the rowId array when we have some delete events. - rowIds = new long[totalDeleteEventCount]; - readAllDeleteEventsFromDeleteDeltas(); - } - } - } catch(IOException|DeleteEventsOverflowMemoryException e) { - close(); // close any open readers, if there was some exception during initialization. - throw e; // rethrow the exception so that the caller can handle. - } - } - - private void readAllDeleteEventsFromDeleteDeltas() throws IOException { - if (sortMerger == null || sortMerger.isEmpty()) return; // trivial case, nothing to read. - int distinctOtids = 0; - long lastSeenOtid = -1; - long otids[] = new long[rowIds.length]; - int index = 0; - while (!sortMerger.isEmpty()) { - // The sortMerger is a heap data structure that stores a pair of - // (deleteRecordKey, deleteReaderValue) at each node and is ordered by deleteRecordKey. - // The deleteReaderValue is the actual wrapper class that has the reference to the - // underlying delta file that is being read, and its corresponding deleteRecordKey - // is the smallest record id for that file. In each iteration of this loop, we extract(poll) - // the minimum deleteRecordKey pair. Once we have processed that deleteRecordKey, we - // advance the pointer for the corresponding deleteReaderValue. If the underlying file - // itself has no more records, then we remove that pair from the heap, or else we - // add the updated pair back to the heap. - Entry entry = sortMerger.pollFirstEntry(); - DeleteRecordKey deleteRecordKey = entry.getKey(); - DeleteReaderValue deleteReaderValue = entry.getValue(); - otids[index] = deleteRecordKey.originalTransactionId; - rowIds[index] = deleteRecordKey.rowId; - ++index; - if (lastSeenOtid != deleteRecordKey.originalTransactionId) { - ++distinctOtids; - lastSeenOtid = deleteRecordKey.originalTransactionId; - } - if (deleteReaderValue.next(deleteRecordKey)) { - sortMerger.put(deleteRecordKey, deleteReaderValue); - } else { - deleteReaderValue.close(); // Exhausted reading all records, close the reader. - } - } - - // Once we have processed all the delete events and seen all the distinct otids, - // we compress the otids into CompressedOtid data structure that records - // the fromIndex(inclusive) and toIndex(exclusive) for each unique otid. - this.compressedOtids = new CompressedOtid[distinctOtids]; - lastSeenOtid = otids[0]; - int fromIndex = 0, pos = 0; - for (int i = 1; i < otids.length; ++i) { - if (otids[i] != lastSeenOtid) { - compressedOtids[pos] = new CompressedOtid(lastSeenOtid, fromIndex, i); - lastSeenOtid = otids[i]; - fromIndex = i; - ++pos; - } - } - // account for the last distinct otid - compressedOtids[pos] = new CompressedOtid(lastSeenOtid, fromIndex, otids.length); - } - - private boolean isDeleted(long otid, long rowId) { - if (compressedOtids == null || rowIds == null) { - return false; - } - // To find if a given (otid, rowId) pair is deleted or not, we perform - // two binary searches at most. The first binary search is on the - // compressed otids. If a match is found, only then we do the next - // binary search in the larger rowId vector between the given toIndex & fromIndex. - - // Check if otid is outside the range of all otids present. - if (otid < compressedOtids[0].originalTransactionId - || otid > compressedOtids[compressedOtids.length - 1].originalTransactionId) { - return false; - } - // Create a dummy key for searching the otid in the compressed otid ranges. - CompressedOtid key = new CompressedOtid(otid, -1, -1); - int pos = Arrays.binarySearch(compressedOtids, key); - if (pos >= 0) { - // Otid with the given value found! Searching now for rowId... - key = compressedOtids[pos]; // Retrieve the actual CompressedOtid that matched. - // Check if rowId is outside the range of all rowIds present for this otid. - if (rowId < rowIds[key.fromIndex] - || rowId > rowIds[key.toIndex - 1]) { - return false; - } - if (Arrays.binarySearch(rowIds, key.fromIndex, key.toIndex, rowId) >= 0) { - return true; // rowId also found! - } - } - return false; - } - - @Override - public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet) - throws IOException { - if (rowIds == null || compressedOtids == null) { - return; - } - // Iterate through the batch and for each (otid, rowid) in the batch - // check if it is deleted or not. - - long[] originalTransactionVector = - batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? null - : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector; - long repeatedOriginalTransaction = (originalTransactionVector != null) ? -1 - : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[0]; - - long[] rowIdVector = - ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector; - - for (int setBitIndex = selectedBitSet.nextSetBit(0); - setBitIndex >= 0; - setBitIndex = selectedBitSet.nextSetBit(setBitIndex+1)) { - long otid = originalTransactionVector != null ? originalTransactionVector[setBitIndex] - : repeatedOriginalTransaction ; - long rowId = rowIdVector[setBitIndex]; - if (isDeleted(otid, rowId)) { - selectedBitSet.clear(setBitIndex); - } - } - } - - @Override - public void close() throws IOException { - // ColumnizedDeleteEventRegistry reads all the delete events into memory during initialization - // and it closes the delete event readers after it. If an exception gets thrown during - // initialization, we may have to close any readers that are still left open. - while (!sortMerger.isEmpty()) { - Entry entry = sortMerger.pollFirstEntry(); - entry.getValue().close(); // close the reader for this entry - } - } - } - - static class DeleteEventsOverflowMemoryException extends Exception { - private static final long serialVersionUID = 1L; - } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index d4bdd96..2cc5987 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -254,7 +254,7 @@ public void deriveExplainAttributes() { } public void deriveLlap(Configuration conf, boolean isExecDriver) { - boolean hasLlap = false, hasNonLlap = false, hasAcid = false; + boolean hasLlap = false, hasNonLlap = false; // Assume the IO is enabled on the daemon by default. We cannot reasonably check it here. boolean isLlapOn = HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ENABLED, llapMode); boolean canWrapAny = false, doCheckIfs = false; @@ -274,12 +274,7 @@ public void deriveLlap(Configuration conf, boolean isExecDriver) { boolean isUsingLlapIo = HiveInputFormat.canWrapForLlap( part.getInputFileFormatClass(), doCheckIfs); if (isUsingLlapIo) { - if (part.getTableDesc() != null && - AcidUtils.isTablePropertyTransactional(part.getTableDesc().getProperties())) { - hasAcid = true; - } else { - hasLlap = true; - } + hasLlap = true; } else { hasNonLlap = true; } @@ -292,7 +287,7 @@ public void deriveLlap(Configuration conf, boolean isExecDriver) { } llapIoDesc = deriveLlapIoDescString( - isLlapOn, canWrapAny, hasPathToPartInfo, hasLlap, hasNonLlap, hasAcid); + isLlapOn, canWrapAny, hasPathToPartInfo, hasLlap, hasNonLlap); } private boolean checkVectorizerSupportedTypes(boolean hasLlap) { @@ -317,11 +312,10 @@ private boolean checkVectorizerSupportedTypes(boolean hasLlap) { } private static String deriveLlapIoDescString(boolean isLlapOn, boolean canWrapAny, - boolean hasPathToPartInfo, boolean hasLlap, boolean hasNonLlap, boolean hasAcid) { + boolean hasPathToPartInfo, boolean hasLlap, boolean hasNonLlap) { if (!isLlapOn) return null; // LLAP IO is off, don't output. if (!canWrapAny) return "no inputs"; // Cannot use with input formats. if (!hasPathToPartInfo) return "unknown"; // No information to judge. - if (hasAcid) return "may be used (ACID table)"; return (hasLlap ? (hasNonLlap ? "some inputs" : "all inputs") : "no inputs"); } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java index 6bf1312..2dfe21d 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java @@ -31,13 +31,12 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.AcidMergeUtils; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.RecordUpdater; -import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader.ColumnizedDeleteEventRegistry; -import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader.SortMergedDeleteEventRegistry; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.io.LongWritable; @@ -195,13 +194,13 @@ public void setup() throws Exception { @Test public void testVectorizedOrcAcidRowBatchReader() throws Exception { - testVectorizedOrcAcidRowBatchReader(ColumnizedDeleteEventRegistry.class.getName()); + testVectorizedOrcAcidRowBatchReader(AcidMergeUtils.ColumnizedDeleteEventRegistry.class.getName()); // To test the SortMergedDeleteEventRegistry, we need to explicitly set the // HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY constant to a smaller value. int oldValue = conf.getInt(HiveConf.ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname, 1000000); conf.setInt(HiveConf.ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname, 1000); - testVectorizedOrcAcidRowBatchReader(SortMergedDeleteEventRegistry.class.getName()); + testVectorizedOrcAcidRowBatchReader(AcidMergeUtils.SortMergedDeleteEventRegistry.class.getName()); // Restore the old value. conf.setInt(HiveConf.ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname, oldValue); @@ -215,11 +214,11 @@ private void testVectorizedOrcAcidRowBatchReader(String deleteEventRegistry) thr conf.set(ValidTxnList.VALID_TXNS_KEY, "14:1:1:5"); // Exclude transaction 5 VectorizedOrcAcidRowBatchReader vectorizedReader = new VectorizedOrcAcidRowBatchReader(splits.get(0), conf, Reporter.NULL); - if (deleteEventRegistry.equals(ColumnizedDeleteEventRegistry.class.getName())) { - assertTrue(vectorizedReader.getDeleteEventRegistry() instanceof ColumnizedDeleteEventRegistry); + if (deleteEventRegistry.equals(AcidMergeUtils.ColumnizedDeleteEventRegistry.class.getName())) { + assertTrue(vectorizedReader.getDeleteEventRegistry() instanceof AcidMergeUtils.ColumnizedDeleteEventRegistry); } - if (deleteEventRegistry.equals(SortMergedDeleteEventRegistry.class.getName())) { - assertTrue(vectorizedReader.getDeleteEventRegistry() instanceof SortMergedDeleteEventRegistry); + if (deleteEventRegistry.equals(AcidMergeUtils.SortMergedDeleteEventRegistry.class.getName())) { + assertTrue(vectorizedReader.getDeleteEventRegistry() instanceof AcidMergeUtils.SortMergedDeleteEventRegistry); } TypeDescription schema = OrcInputFormat.getDesiredRowTypeDescr(conf, true, Integer.MAX_VALUE); VectorizedRowBatch vectorizedRowBatch = schema.createRowBatch(); @@ -247,17 +246,17 @@ public void testCanCreateVectorizedAcidRowBatchReaderOnSplit() throws Exception conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, AcidUtils.AcidOperationalProperties.getLegacy().toInt()); // Test false when trying to create a vectorized ACID row batch reader for a legacy table. - assertFalse(VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, mockSplit)); + assertFalse(AcidMergeUtils.isAcid(conf, mockSplit)); conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, AcidUtils.AcidOperationalProperties.getDefault().toInt()); Mockito.when(mockSplit.isOriginal()).thenReturn(true); // Test false when trying to create a vectorized ACID row batch reader when reading originals. - assertFalse(VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, mockSplit)); + assertFalse(AcidMergeUtils.isAcid(conf, mockSplit)); // A positive test case. Mockito.when(mockSplit.isOriginal()).thenReturn(false); - assertTrue(VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, mockSplit)); + assertTrue(AcidMergeUtils.isAcid(conf, mockSplit)); } } diff --git ql/src/test/queries/clientpositive/llap_acid.q ql/src/test/queries/clientpositive/llap_acid.q index 6bd216a..41d86af 100644 --- ql/src/test/queries/clientpositive/llap_acid.q +++ ql/src/test/queries/clientpositive/llap_acid.q @@ -27,7 +27,7 @@ select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limi insert into table orc_llap partition (csmallint = 2) select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limit 10; -alter table orc_llap SET TBLPROPERTIES ('transactional'='true'); +alter table orc_llap SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default'); insert into table orc_llap partition (csmallint = 3) select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble desc limit 10; diff --git ql/src/test/results/clientpositive/llap_acid.q.out ql/src/test/results/clientpositive/llap_acid.q.out index 5970fd7..e53c3b9 100644 --- ql/src/test/results/clientpositive/llap_acid.q.out +++ ql/src/test/results/clientpositive/llap_acid.q.out @@ -50,11 +50,11 @@ POSTHOOK: Lineage: orc_llap PARTITION(csmallint=2).cbigint SIMPLE [(alltypesorc) POSTHOOK: Lineage: orc_llap PARTITION(csmallint=2).cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ] POSTHOOK: Lineage: orc_llap PARTITION(csmallint=2).cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ] POSTHOOK: Lineage: orc_llap PARTITION(csmallint=2).cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] -PREHOOK: query: alter table orc_llap SET TBLPROPERTIES ('transactional'='true') +PREHOOK: query: alter table orc_llap SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default') PREHOOK: type: ALTERTABLE_PROPERTIES PREHOOK: Input: default@orc_llap PREHOOK: Output: default@orc_llap -POSTHOOK: query: alter table orc_llap SET TBLPROPERTIES ('transactional'='true') +POSTHOOK: query: alter table orc_llap SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default') POSTHOOK: type: ALTERTABLE_PROPERTIES POSTHOOK: Input: default@orc_llap POSTHOOK: Output: default@orc_llap @@ -105,7 +105,7 @@ STAGE PLANS: Statistics: Num rows: 20 Data size: 296 Basic stats: COMPLETE Column stats: NONE value expressions: _col2 (type: bigint) Execution mode: vectorized - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Reduce Operator Tree: Select Operator expressions: KEY.reducesinkkey1 (type: int), KEY.reducesinkkey0 (type: smallint), VALUE._col0 (type: bigint) @@ -230,7 +230,7 @@ STAGE PLANS: Statistics: Num rows: 20 Data size: 296 Basic stats: COMPLETE Column stats: NONE value expressions: _col2 (type: bigint) Execution mode: vectorized - LLAP IO: may be used (ACID table) + LLAP IO: all inputs Reduce Operator Tree: Select Operator expressions: KEY.reducesinkkey1 (type: int), KEY.reducesinkkey0 (type: smallint), VALUE._col0 (type: bigint)