diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 348e07bbf8..f41eb4b2df 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1858,7 +1858,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal + "into memory to optimize for performance. To prevent out-of-memory errors, this is a rough heuristic\n" + "that limits the total number of delete events that can be loaded into memory at once.\n" + "Roughly it has been set to 10 million delete events per bucket (~160 MB).\n"), - + FILTER_DELETE_EVENTS("hive.txn.filter.delete.events", true, + "If true, VectorizedOrcAcidRowBatchReader will compute min/max " + + "ROW__ID for the split and only load delete events in that range.\n" + ), HIVESAMPLERANDOMNUM("hive.sample.seednumber", 0, "A number used to percentage sampling. By changing this number, user will change the subsets of data sampled."), diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 9593975c6c..88e92ff11f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -983,7 +983,7 @@ public void process(Object row, int tag) throws HiveException { int writerOffset; // This if/else chain looks ugly in the inner loop, but given that it will be 100% the same // for a given operator branch prediction should work quite nicely on it. - // RecordUpdateer expects to get the actual row, not a serialized version of it. Thus we + // RecordUpdater expects to get the actual row, not a serialized version of it. Thus we // pass the row rather than recordValue. if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || conf.isMmTable()) { rowOutWriters[findWriterOffset(row)].write(recordValue); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java index ea7ba53a3a..e38e21a3d0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java @@ -176,6 +176,7 @@ protected int compareToInternal(RecordIdentifier other) { @Override public int compareTo(RecordIdentifier other) { if (other.getClass() != RecordIdentifier.class) { + //WTF? assumes that other instanceof OrcRawRecordMerger.ReaderKey??? return -other.compareTo(this); } return compareToInternal(other); @@ -219,17 +220,15 @@ public int hashCode() { @Override public String toString() { - BucketCodec codec = - BucketCodec.determineVersion(bucketId); - String s = "(" + codec.getVersion() + "." + codec.decodeWriterId(bucketId) + - "." + codec.decodeStatementId(bucketId) + ")"; - return "{originalWriteId: " + writeId + ", " + bucketToString() + ", row: " + getRowId() +"}"; + return "RecordIdentifier(" + writeId + ", " + bucketToString(bucketId) + "," + + getRowId() +")"; } - protected String bucketToString() { - if (bucketId == -1) return ("bucket: " + bucketId); + public static String bucketToString(int bucketId) { + if (bucketId == -1) return "" + bucketId; BucketCodec codec = BucketCodec.determineVersion(bucketId); - return "bucket: " + bucketId + "(" + codec.getVersion() + "." + - codec.decodeWriterId(bucketId) + "." + codec.decodeStatementId(bucketId) + ")"; + return bucketId + "(" + codec.getVersion() + "." + + codec.decodeWriterId(bucketId) + "." + + codec.decodeStatementId(bucketId) + ")"; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 929ea9b1ed..6be0c74f4e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -20,10 +20,10 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.TreeMap; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -33,7 +33,6 @@ import org.apache.orc.OrcUtils; import org.apache.orc.StripeInformation; import org.apache.orc.TypeDescription; -import org.apache.orc.impl.SchemaEvolution; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -168,7 +167,8 @@ int compareRow(RecordIdentifier other) { @Override public String toString() { return "{originalWriteId: " + getWriteId() + ", " + - bucketToString() + ", row: " + getRowId() + ", currentWriteId " + currentWriteId + "}"; + bucketToString(getBucketProperty()) + ", row: " + getRowId() + + ", currentWriteId " + currentWriteId + "}"; } } interface ReaderPair { @@ -664,20 +664,37 @@ private Reader advanceToNextFile() throws IOException { // The key of the next lowest reader. private ReaderKey secondaryKey = null; - - private static final class KeyInterval { + static final class KeyInterval { private final RecordIdentifier minKey; private final RecordIdentifier maxKey; - private KeyInterval(RecordIdentifier minKey, RecordIdentifier maxKey) { + KeyInterval(RecordIdentifier minKey, RecordIdentifier maxKey) { this.minKey = minKey; this.maxKey = maxKey; } - private RecordIdentifier getMinKey() { + RecordIdentifier getMinKey() { return minKey; } - private RecordIdentifier getMaxKey() { + RecordIdentifier getMaxKey() { return maxKey; + }; + @Override + public String toString() { + return "KeyInterval[" + minKey + "," + maxKey + "]"; } + @Override + public boolean equals(Object other) { + if(!(other instanceof KeyInterval)) { + return false; + } + KeyInterval otherInterval = (KeyInterval)other; + return Objects.equals(minKey, otherInterval.getMinKey()) && + Objects.equals(maxKey, otherInterval.getMaxKey()); + } + @Override + public int hashCode() { + return Objects.hash(minKey, maxKey); + } + } /** * Find the key range for original bucket files. diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 1841cfaa2e..d4ed11ea52 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -44,11 +44,19 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; +import org.apache.orc.ColumnStatistics; +import org.apache.orc.IntegerColumnStatistics; +import org.apache.orc.OrcConf; +import org.apache.orc.StripeInformation; +import org.apache.orc.StripeStatistics; import org.apache.orc.impl.AcidStats; import org.apache.orc.impl.OrcAcidUtils; import org.slf4j.Logger; @@ -99,6 +107,12 @@ * To have access to {@link RecordReader#getRowNumber()} in the underlying file */ private RecordReader innerReader; + /** + * min/max ROW__ID for the split (if available) so that we can limit the + * number of delete events to load in memory + */ + private final OrcRawRecordMerger.KeyInterval keyInterval; + VectorizedOrcAcidRowBatchReader(OrcSplit inputSplit, JobConf conf, Reporter reporter) throws IOException { @@ -110,7 +124,7 @@ this(conf, inputSplit, reporter, rbCtx == null ? Utilities.getVectorizedRowBatchCtx(conf) : rbCtx, false); - final Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, (OrcSplit) inputSplit); + final Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, inputSplit); // Careful with the range here now, we do not want to read the whole base file like deltas. innerReader = reader.rowsOptions(readerOptions.range(offset, length), conf); baseReader = new org.apache.hadoop.mapred.RecordReader() { @@ -197,20 +211,23 @@ private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit orcSplit, Reporte String txnString = conf.get(ValidWriteIdList.VALID_WRITEIDS_KEY); this.validWriteIdList = (txnString == null) ? new ValidReaderWriteIdList() : new ValidReaderWriteIdList(txnString); - LOG.debug("VectorizedOrcAcidRowBatchReader:: Read ValidWriteIdList: " + this.validWriteIdList.toString() - + " isFullAcidTable: " + AcidUtils.isFullAcidScan(conf)); + LOG.info("Read ValidWriteIdList: " + this.validWriteIdList.toString() + + ":" + orcSplit); + + keyInterval = findMinMaxKeys(orcSplit, conf); // Clone readerOptions for deleteEvents. Reader.Options deleteEventReaderOptions = readerOptions.clone(); // Set the range on the deleteEventReaderOptions to 0 to INTEGER_MAX because // we always want to read all the delete delta files. deleteEventReaderOptions.range(0, Long.MAX_VALUE); - // Disable SARGs for deleteEventReaders, as SARGs have no meaning. - deleteEventReaderOptions.searchArgument(null, null); + setSARG(keyInterval, deleteEventReaderOptions); DeleteEventRegistry der; try { - // See if we can load all the delete events from all the delete deltas in memory... - der = new ColumnizedDeleteEventRegistry(conf, orcSplit, deleteEventReaderOptions); + // See if we can load all the relevant delete events from all the + // delete deltas in memory... + der = new ColumnizedDeleteEventRegistry(conf, orcSplit, + deleteEventReaderOptions, keyInterval); } catch (DeleteEventsOverflowMemoryException e) { // If not, then create a set of hanging readers that do sort-merge to find the next smallest // delete event on-demand. Caps the memory consumption to (some_const * no. of readers). @@ -232,6 +249,47 @@ private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit orcSplit, Reporte syntheticProps = computeOffsetAndBucket(orcSplit, conf, validWriteIdList); } + /** + * Generates a SearchArgument to push down to delete_delta files. + */ + private static void setSARG(OrcRawRecordMerger.KeyInterval keyInterval, + Reader.Options deleteEventReaderOptions) { + SearchArgument.Builder b = null; + if(keyInterval.getMinKey() != null) { + RecordIdentifier k = keyInterval.getMinKey(); + b = SearchArgumentFactory.newBuilder(); + b.startAnd() //not(ot < 7) -> ot >=7 + .startNot().lessThan("originalTransaction", + PredicateLeaf.Type.LONG, k.getWriteId()).end() + .startNot().lessThan("bucket", + PredicateLeaf.Type.LONG, (long) k.getBucketProperty()).end() + .startNot().lessThan("rowId", + PredicateLeaf.Type.LONG, k.getRowId()).end() + .end(); + } + if(keyInterval.getMaxKey() != null) { + RecordIdentifier k = keyInterval.getMaxKey(); + if(b == null) { + b = SearchArgumentFactory.newBuilder(); + } + b.startAnd() + .lessThanEquals("originalTransaction", + PredicateLeaf.Type.LONG, k.getWriteId()) + .lessThanEquals("bucket", + PredicateLeaf.Type.LONG, (long) k.getBucketProperty()) + .lessThanEquals("rowId", + PredicateLeaf.Type.LONG, k.getRowId()) + .end(); + } + if(b != null) { + SearchArgument sarg = b.build(); + LOG.debug("deleteReader SARG(" + sarg + ") "); + deleteEventReaderOptions.searchArgument(sarg, + new String[] {"originalTransaction", "bucket", "rowId"}); + return; + } + deleteEventReaderOptions.searchArgument(null, null); + } public void setBaseAndInnerReader( final org.apache.hadoop.mapred.RecordReader baseReader) { this.baseReader = baseReader; @@ -239,6 +297,149 @@ public void setBaseAndInnerReader( this.vectorizedRowBatchBase = baseReader.createValue(); } + /** + * A given ORC reader will always process one or more whole stripes but the + * split boundaries may not line up with stripe boundaries if the InputFormat + * doesn't understand ORC specifics. So first we need to figure out which + * stripe(s) we are reading. + * + * Suppose txn1 writes 100K rows + * and txn2 writes 100 rows so we have events + * {1,0,0}....{1,0,100K},{2,0,0}...{2,0,100} in 2 files + * After compaction we may have 2 stripes + * {1,0,0}...{1,0,90K},{1,0,90001}...{2,0,100} + * + * Now suppose there is a delete stmt that deletes every row. So when we load + * the 2nd stripe, if we just look at stripe {@link ColumnStatistics}, + * minKey={1,0,100} and maxKey={2,0,90001}, all but the 1st 100 delete events + * will get loaded. But with {@link OrcRecordUpdater#ACID_KEY_INDEX_NAME}, + * minKey={1,0,90001} and maxKey={2,0,100} so we only load about 10K deletes. + * + * Also, even with Query Based compactor (once we have it), FileSinkOperator + * uses OrcRecordWriter to write to file, so we should have the + * hive.acid.index in place. + * + * If reading the 1st stripe, we don't have the start event, so we'll get it + * from stats, which will strictly speaking be accurate only wrt writeId and + * bucket but that is good enough. + * + * @return empty KeyInterval if KeyInterval could not be + * determined + */ + private static OrcRawRecordMerger.KeyInterval findMinMaxKeys( + OrcSplit orcSplit, Configuration conf) throws IOException { + if(!HiveConf.getBoolVar(conf, ConfVars.FILTER_DELETE_EVENTS)) { + LOG.debug("findMinMaxKeys() " + ConfVars.FILTER_DELETE_EVENTS + "=false"); + return new OrcRawRecordMerger.KeyInterval(null, null); + } + if(orcSplit.isOriginal()) { + /** + * Among originals we may have files with _copy_N suffix. To properly + * generate a synthetic ROW___ID for them we need + * {@link OffsetAndBucketProperty} which could be an expensive computation + * if there are lots of copy_N files for a given bucketId. But unless + * there are delete events, we often don't need synthetic ROW__IDs at all. + * Kind of chicken-and-egg - deal with this later. + * See {@link OrcRawRecordMerger#discoverOriginalKeyBounds(Reader, int, + * Reader.Options, Configuration, OrcRawRecordMerger.Options)}*/ + LOG.debug("findMinMaxKeys(original split) - ignoring"); + return new OrcRawRecordMerger.KeyInterval(null, null); + } + //todo: since we already have OrcSplit.orcTail, should somehow use it to + // get the acid.index, stats, etc rather than fetching the footer again + // though it seems that orcTail is mostly null.... + Reader reader = OrcFile.createReader(orcSplit.getPath(), + OrcFile.readerOptions(conf)); + List stripes = reader.getStripes(); + final long splitStart = orcSplit.getStart(); + final long splitEnd = splitStart + orcSplit.getLength(); + int firstSripeIndex = -1; + int lastStripeIndex = -1; + for(int i = 0; i < stripes.size(); i++) { + StripeInformation stripe = stripes.get(i); + long stripeEnd = stripe.getOffset() + stripe.getLength(); + if(firstSripeIndex == -1 && stripe.getOffset() >= splitStart) { + firstSripeIndex = i; + } + if(lastStripeIndex == -1 && splitEnd <= stripeEnd && + stripes.get(firstSripeIndex).getOffset() <= stripe.getOffset() ) { + //the last condition is for when both splitStart and splitEnd are in + // the same stripe + lastStripeIndex = i; + } + } + if(lastStripeIndex == -1) { + //split goes to the EOF which is > end of stripe since file has a footer + assert stripes.get(stripes.size() - 1).getOffset() + + stripes.get(stripes.size() - 1).getLength() < splitEnd; + lastStripeIndex = stripes.size() - 1; + } + if(firstSripeIndex == -1 || lastStripeIndex == -1) { + LOG.warn("Could not find stripe (" + firstSripeIndex + "," + + lastStripeIndex + ")"); + return new OrcRawRecordMerger.KeyInterval(null, null); + } + RecordIdentifier[] keyIndex = OrcRecordUpdater.parseKeyIndex(reader); + if(keyIndex == null || keyIndex.length != stripes.size()) { + LOG.warn("Could not find keyIndex or length doesn't match (" + + firstSripeIndex + "," + lastStripeIndex + "," + stripes.size() + "," + + (keyIndex == null ? -1 : keyIndex.length) + ")"); + return new OrcRawRecordMerger.KeyInterval(null, null); + } + RecordIdentifier minKey = null; + if(firstSripeIndex > 0) { + //valid keys are strictly > than this key + minKey = keyIndex[firstSripeIndex - 1]; + //add 1 to make comparison >= to match the case of 0th stripe + minKey.setRowId(minKey.getRowId() + 1); + } + else { + List stats = reader.getStripeStatistics(); + assert stripes.size() == stats.size() : "str.s=" + stripes.size() + + " sta.s=" + stats.size(); + ColumnStatistics[] colStats = stats.get(firstSripeIndex).getColumnStatistics(); + /** + * If {@link OrcConf.ROW_INDEX_STRIDE} is set to 0 all column stats on + * ORC file are disabled though objects for them exist but and have + * min/max set to MIN_LONG/MAX_LONG so we only use column stats if they + * are actually computed. Streaming ingest used to set it 0 and Minor + * compaction so there are lots of legacy files with no (rather, bad) + * column stats*/ + if(!(reader.getNumberOfRows() > 0 && + colStats[0].getNumberOfValues() < reader.getNumberOfRows())) { + /* + Structure in data is like this: + > + The +1 is to account for the top level struct which has a + ColumnStatistics object in colsStats. Top level struct is normally + dropped by the Reader (I guess because of orc.impl.SchemaEvolution) + */ + IntegerColumnStatistics origWriteId = (IntegerColumnStatistics) + colStats[OrcRecordUpdater.ORIGINAL_WRITEID + 1]; + IntegerColumnStatistics bucketProperty = (IntegerColumnStatistics) + colStats[OrcRecordUpdater.BUCKET + 1]; + IntegerColumnStatistics rowId = (IntegerColumnStatistics) + colStats[OrcRecordUpdater.ROW_ID + 1]; + //we may want to change bucketProperty from int to long in the + // future(across the stack) this protects the following cast to int + assert bucketProperty.getMinimum() <= Integer.MAX_VALUE : + "was bucketProper changed to a long (" + + bucketProperty.getMinimum() + ")?!:" + orcSplit; + //this a lower bound but not necessarily greatest lower bound + minKey = new RecordIdentifier(origWriteId.getMinimum(), + (int) bucketProperty.getMinimum(), rowId.getMinimum()); + } + else { + LOG.debug("findMinMaxKeys() No ORC column stats"); + } + } + + OrcRawRecordMerger.KeyInterval keyInterval = + new OrcRawRecordMerger.KeyInterval(minKey, keyIndex[lastStripeIndex]); + LOG.info("findMinMaxKeys(): " + keyInterval + + " stripes(" + firstSripeIndex + "," + lastStripeIndex + ")"); + return keyInterval; + } /** * Used for generating synthetic ROW__IDs for reading "original" files */ @@ -627,7 +828,7 @@ DeleteEventRegistry getDeleteEventRegistry() { * will read the delete delta files and will create their own internal * data structures to maintain record ids of the records that got deleted. */ - protected static interface DeleteEventRegistry { + protected interface DeleteEventRegistry { /** * Modifies the passed bitset to indicate which of the rows in the batch * have been deleted. Assumes that the batch.size is equal to bitset size. @@ -657,6 +858,9 @@ DeleteEventRegistry getDeleteEventRegistry() { * delete events. This internally uses the OrcRawRecordMerger and maintains a constant * amount of memory usage, given the number of delete delta files. Therefore, this * implementation will be picked up when the memory pressure is high. + * + * Don't bother to use KeyInterval from split here because since this doesn't + * buffer delete events in memory. */ static class SortMergedDeleteEventRegistry implements DeleteEventRegistry { private OrcRawRecordMerger deleteRecords; @@ -665,16 +869,15 @@ DeleteEventRegistry getDeleteEventRegistry() { private Boolean isDeleteRecordAvailable = null; private ValidWriteIdList validWriteIdList; - SortMergedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, Reader.Options readerOptions) - throws IOException { + SortMergedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, + Reader.Options readerOptions) throws IOException { final Path[] deleteDeltas = getDeleteDeltaDirsFromSplit(orcSplit); if (deleteDeltas.length > 0) { int bucket = AcidUtils.parseBucketId(orcSplit.getPath()); String txnString = conf.get(ValidWriteIdList.VALID_WRITEIDS_KEY); this.validWriteIdList = (txnString == null) ? new ValidReaderWriteIdList() : new ValidReaderWriteIdList(txnString); - LOG.debug("SortMergedDeleteEventRegistry:: Read ValidWriteIdList: " + this.validWriteIdList.toString() - + " isFullAcidTable: " + AcidUtils.isFullAcidScan(conf)); + LOG.debug("Using SortMergedDeleteEventRegistry"); OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isDeleteReader(true); assert !orcSplit.isOriginal() : "If this now supports Original splits, set up mergeOptions properly"; this.deleteRecords = new OrcRawRecordMerger(conf, true, null, false, bucket, @@ -810,8 +1013,10 @@ public void close() throws IOException { static class ColumnizedDeleteEventRegistry implements DeleteEventRegistry { /** * A simple wrapper class to hold the (owid, bucketProperty, rowId) pair. + * todo: why not use {@link RecordIdentifier}?? */ static class DeleteRecordKey implements Comparable { + private static final DeleteRecordKey otherKey = new DeleteRecordKey(); private long originalWriteId; /** * see {@link BucketCodec} @@ -844,9 +1049,18 @@ public int compareTo(DeleteRecordKey other) { } return 0; } + private int compareTo(RecordIdentifier other) { + if (other == null) { + return -1; + } + otherKey.set(other.getWriteId(), other.getBucketProperty(), + other.getRowId()); + return compareTo(otherKey); + } @Override public String toString() { - return "owid: " + originalWriteId + " bucketP:" + bucketProperty + " rowid: " + rowId; + return "DeleteRecordKey(" + originalWriteId + "," + + RecordIdentifier.bucketToString(bucketProperty) + "," + rowId +")"; } } @@ -868,10 +1082,17 @@ public String toString() { private boolean isBucketPropertyRepeating; private final boolean isBucketedTable; private final Reader reader; - - DeleteReaderValue(Reader deleteDeltaReader, Reader.Options readerOptions, int bucket, - ValidWriteIdList validWriteIdList, boolean isBucketedTable, final JobConf conf) throws IOException { + private final Path deleteDeltaFile; + private final OrcRawRecordMerger.KeyInterval keyInterval; + private final OrcSplit orcSplit; + + DeleteReaderValue(Reader deleteDeltaReader, Path deleteDeltaFile, + Reader.Options readerOptions, int bucket, ValidWriteIdList validWriteIdList, + boolean isBucketedTable, final JobConf conf, + OrcRawRecordMerger.KeyInterval keyInterval, OrcSplit orcSplit) + throws IOException { this.reader = deleteDeltaReader; + this.deleteDeltaFile = deleteDeltaFile; this.recordReader = deleteDeltaReader.rowsOptions(readerOptions, conf); this.bucketForSplit = bucket; final boolean useDecimal64ColumnVector = HiveConf.getVar(conf, ConfVars @@ -887,7 +1108,11 @@ public String toString() { this.indexPtrInBatch = 0; this.validWriteIdList = validWriteIdList; this.isBucketedTable = isBucketedTable; - checkBucketId();//check 1st batch + if(batch != null) { + checkBucketId();//check 1st batch + } + this.keyInterval = keyInterval; + this.orcSplit = orcSplit; } public boolean next(DeleteRecordKey deleteRecordKey) throws IOException { @@ -910,13 +1135,30 @@ public boolean next(DeleteRecordKey deleteRecordKey) throws IOException { checkBucketId(deleteRecordKey.bucketProperty); } ++indexPtrInBatch; + if(!isDeleteEventInRange(keyInterval, deleteRecordKey)) { + continue; + } if (validWriteIdList.isWriteIdValid(currentWriteId)) { isValidNext = true; } } return true; } - + static boolean isDeleteEventInRange( + OrcRawRecordMerger.KeyInterval keyInterval, + DeleteRecordKey deleteRecordKey) { + if(keyInterval.getMinKey() != null && + deleteRecordKey.compareTo(keyInterval.getMinKey()) < 0) { + //current deleteEvent is < than minKey + return false; + } + if(keyInterval.getMaxKey() != null && + deleteRecordKey.compareTo(keyInterval.getMaxKey()) > 0) { + //current deleteEvent is > than maxKey + return false; + } + return true; + } public void close() throws IOException { this.recordReader.close(); } @@ -964,11 +1206,11 @@ private void checkBucketId(int bucketPropertyFromRecord) throws IOException { .decodeWriterId(bucketPropertyFromRecord); if(bucketIdFromRecord != bucketForSplit) { DeleteRecordKey dummy = new DeleteRecordKey(); - long curTxnId = setCurrentDeleteKey(dummy); + setCurrentDeleteKey(dummy); throw new IOException("Corrupted records with different bucket ids " - + "from the containing bucket file found! Expected bucket id " - + bucketForSplit + ", however found the bucket id " + bucketIdFromRecord + - " from " + dummy + " curTxnId: " + curTxnId); + + "from the containing bucket file found! Expected bucket id " + + bucketForSplit + ", however found " + dummy + + ". (" + orcSplit + "," + deleteDeltaFile + ")"); } } @@ -1022,26 +1264,34 @@ public int compareTo(CompressedOwid other) { * of bucketIds where each entry points at an array of rowIds. We could probably use ArrayList * to manage insertion as the structure is built (LinkedList?). This should reduce memory * footprint (as far as OrcReader to a single reader) - probably bad for LLAP IO + * Or much simpler, make compaction of delete deltas very aggressive so that + * we never have move than a few delete files to read. */ private TreeMap sortMerger; private long rowIds[]; private CompressedOwid compressedOwids[]; private ValidWriteIdList validWriteIdList; - private Boolean isEmpty = null; + private Boolean isEmpty; + private final int maxEventsInMemory; + private final OrcSplit orcSplit; + ColumnizedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, - Reader.Options readerOptions) throws IOException, DeleteEventsOverflowMemoryException { + Reader.Options readerOptions, + OrcRawRecordMerger.KeyInterval keyInterval) + throws IOException, DeleteEventsOverflowMemoryException { int bucket = AcidUtils.parseBucketId(orcSplit.getPath()); String txnString = conf.get(ValidWriteIdList.VALID_WRITEIDS_KEY); this.validWriteIdList = (txnString == null) ? new ValidReaderWriteIdList() : new ValidReaderWriteIdList(txnString); - LOG.debug("ColumnizedDeleteEventRegistry:: Read ValidWriteIdList: " + this.validWriteIdList.toString() - + " isFullAcidTable: " + AcidUtils.isFullAcidScan(conf)); - this.sortMerger = new TreeMap(); + LOG.debug("Using ColumnizedDeleteEventRegistry"); + this.sortMerger = new TreeMap<>(); this.rowIds = null; this.compressedOwids = null; - int maxEventsInMemory = HiveConf.getIntVar(conf, ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY); + maxEventsInMemory = HiveConf + .getIntVar(conf, ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY); final boolean isBucketedTable = conf.getInt(hive_metastoreConstants.BUCKET_COUNT, 0) > 0; + this.orcSplit = orcSplit; try { final Path[] deleteDeltaDirs = getDeleteDeltaDirsFromSplit(orcSplit); @@ -1059,6 +1309,9 @@ public int compareTo(CompressedOwid other) { // NOTE: A check for existence of deleteDeltaFile is required because we may not have // deletes for the bucket being taken into consideration for this split processing. if (length != -1 && fs.exists(deleteDeltaFile)) { + /** + * todo: we have OrcSplit.orcTail so we should be able to get stats from there + */ Reader deleteDeltaReader = OrcFile.createReader(deleteDeltaFile, OrcFile.readerOptions(conf).maxLength(length)); AcidStats acidStats = OrcAcidUtils.parseAcidStats(deleteDeltaReader); @@ -1066,7 +1319,17 @@ public int compareTo(CompressedOwid other) { continue; // just a safe check to ensure that we are not reading empty delete files. } totalDeleteEventCount += acidStats.deletes; - if (totalDeleteEventCount > maxEventsInMemory) { + if (false && totalDeleteEventCount > maxEventsInMemory) { + /*todo: this estimation is suboptimal especially if we have + min/maxKey for the split. The files may have many more + deletes than actually need to be loaded in memory (which is + the point of applying KeyInterval). + When does PPD apply? presumably at read time. + see RecordReaderImpl.SargApplier.pickRowGroups + So we should start loading and count events as we load and + stop if we get too much This means we have resize array + dynamically in readAllDeleteEventsFromDeleteDeltas()*/ + // ColumnizedDeleteEventRegistry loads all the delete events from all the delete deltas // into memory. To prevent out-of-memory errors, this check is a rough heuristic that // prevents creation of an object of this class if the total number of delete events @@ -1079,7 +1342,8 @@ public int compareTo(CompressedOwid other) { throw new DeleteEventsOverflowMemoryException(); } DeleteReaderValue deleteReaderValue = new DeleteReaderValue(deleteDeltaReader, - readerOptions, bucket, validWriteIdList, isBucketedTable, conf); + deleteDeltaFile, readerOptions, bucket, validWriteIdList, isBucketedTable, conf, + keyInterval, orcSplit); DeleteRecordKey deleteRecordKey = new DeleteRecordKey(); if (deleteReaderValue.next(deleteRecordKey)) { sortMerger.put(deleteRecordKey, deleteReaderValue); @@ -1089,11 +1353,7 @@ public int compareTo(CompressedOwid other) { } } } - // Note: totalDeleteEventCount can actually be higher than real value. - // We assume here it won't be lower. Maybe we should just read and not guess... - if (totalDeleteEventCount > 0) { - readAllDeleteEventsFromDeleteDeltas(totalDeleteEventCount); - } + readAllDeleteEventsFromDeleteDeltas(); } isEmpty = compressedOwids == null || rowIds == null; } catch(IOException|DeleteEventsOverflowMemoryException e) { @@ -1101,7 +1361,28 @@ public int compareTo(CompressedOwid other) { throw e; // rethrow the exception so that the caller can handle. } } - + private void checkSize(int index) throws DeleteEventsOverflowMemoryException { + // ColumnizedDeleteEventRegistry loads all the delete events from all the delete deltas + // into memory. To prevent out-of-memory errors, this check is a rough heuristic that + // prevents creation of an object of this class if the total number of delete events + // exceed this value. By default, it has been set to 10 million delete events per bucket. + if(index > maxEventsInMemory) { + LOG.info("Total number of delete events exceeds the maximum number of delete events " + + "that can be loaded into memory for " + orcSplit + + ". The max limit is currently set at " + + maxEventsInMemory + " and can be changed by setting the Hive config variable " + + ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname); + throw new DeleteEventsOverflowMemoryException(); + } + if(index < rowIds.length) { + return; + } + int newLength = rowIds.length + 1000000; + if(rowIds.length <= 1000000) { + newLength = rowIds.length * 2; + } + rowIds = Arrays.copyOf(rowIds, newLength); + } /** * This is not done quite right. The intent of {@link CompressedOwid} is a hedge against * "delete from T" that generates a huge number of delete events possibly even 2G - max array @@ -1110,15 +1391,14 @@ public int compareTo(CompressedOwid other) { * In practice we should be filtering delete evens by min/max ROW_ID from the split. The later * is also not yet implemented: HIVE-16812. */ - private void readAllDeleteEventsFromDeleteDeltas( - int totalDeleteEventCount) throws IOException { + private void readAllDeleteEventsFromDeleteDeltas() + throws IOException, DeleteEventsOverflowMemoryException { if (sortMerger == null || sortMerger.isEmpty()) { - rowIds = new long[0]; return; // trivial case, nothing to read. } // Initialize the rowId array when we have some delete events. - rowIds = new long[totalDeleteEventCount]; + rowIds = new long[1];//todo: add config? or HIVE_IN_TEST check? int index = 0; // We compress the owids into CompressedOwid data structure that records @@ -1140,6 +1420,7 @@ private void readAllDeleteEventsFromDeleteDeltas( DeleteReaderValue deleteReaderValue = entry.getValue(); long owid = deleteRecordKey.originalWriteId; int bp = deleteRecordKey.bucketProperty; + checkSize(index); rowIds[index] = deleteRecordKey.rowId; if (lastCo == null || lastCo.originalWriteId != owid || lastCo.bucketProperty != bp) { if (lastCo != null) { @@ -1198,6 +1479,12 @@ private boolean isDeleted(long owid, int bucketProperty, long rowId) { } return false; } + /** + * @return how many delete events are actually loaded + */ + int size() { + return rowIds == null ? 0 : rowIds.length; + } @Override public boolean isEmpty() { if(isEmpty == null) { @@ -1206,8 +1493,7 @@ public boolean isEmpty() { return isEmpty; } @Override - public void findDeletedRecords(ColumnVector[] cols, int size, BitSet selectedBitSet) - throws IOException { + public void findDeletedRecords(ColumnVector[] cols, int size, BitSet selectedBitSet) { if (rowIds == null || compressedOwids == null) { return; } @@ -1258,4 +1544,8 @@ public void close() throws IOException { static class DeleteEventsOverflowMemoryException extends Exception { private static final long serialVersionUID = 1L; } + @VisibleForTesting + OrcRawRecordMerger.KeyInterval getKeyInterval() { + return keyInterval; + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index c5589b953d..c117849aa4 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -716,7 +716,7 @@ public void testNonAcidToAcidConversion3() throws Exception { int resultCount = 1; Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); - // 2. Convert NONACIDORCTBL to ACID table with split_update enabled. (txn_props=default) + // 2. Convert NONACIDORCTBL to ACID table runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')"); status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java new file mode 100644 index 0000000000..ae22ba5149 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java @@ -0,0 +1,140 @@ +package org.apache.hadoop.hive.ql; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.ql.io.orc.TestVectorizedOrcAcidRowBatchReader; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.List; + +public class TestTxnCommands3 extends TxnCommandsBaseForTests { + static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommands3.class); + private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + + File.separator + TestTxnCommands3.class.getCanonicalName() + + "-" + System.currentTimeMillis() + ).getPath().replaceAll("\\\\", "/"); + @Override + protected String getTestDataDir() { + return TEST_DATA_DIR; + } + + @Test + public void testRenameTable() throws Exception { + MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID, true); + runStatementOnDriver("drop database if exists mydb1 cascade"); + runStatementOnDriver("drop database if exists mydb2 cascade"); + runStatementOnDriver("create database mydb1"); + runStatementOnDriver("create database mydb2"); + runStatementOnDriver("create table mydb1.T(a int, b int) stored as orc"); + runStatementOnDriver("insert into mydb1.T values(1,2),(4,5)"); + //put something in WRITE_SET + runStatementOnDriver("update mydb1.T set b = 6 where b = 5"); + runStatementOnDriver("alter table mydb1.T compact 'minor'"); + + runStatementOnDriver("alter table mydb1.T RENAME TO mydb1.S"); + + String testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from mydb1.S"; + String[][] expected = new String[][] { + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", + "s/delta_0000001_0000001_0000/bucket_00000"}, + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t4\t6", + "s/delta_0000002_0000002_0000/bucket_00000"}}; + checkResult(expected, testQuery, false, "check data", LOG); + + + Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='t'")); + Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from COMPACTION_QUEUE where CQ_TABLE='t'")); + Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from WRITE_SET where WS_TABLE='t'")); + Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='t'")); + Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from NEXT_WRITE_ID where NWI_TABLE='t'")); + + Assert.assertEquals( + TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"), 2, + TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='s'")); + Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from COMPACTION_QUEUE where CQ_TABLE='s'")); + Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from WRITE_SET where WS_TABLE='s'")); + Assert.assertEquals(3, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='s'")); + Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from NEXT_WRITE_ID where NWI_TABLE='s'")); + + runStatementOnDriver("alter table mydb1.S RENAME TO mydb2.bar"); + + Assert.assertEquals( + TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"), 2, + TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='bar'")); + Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from COMPACTION_QUEUE where CQ_TABLE='bar'")); + Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from WRITE_SET where WS_TABLE='bar'")); + Assert.assertEquals(4, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='bar'")); + Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from NEXT_WRITE_ID where NWI_TABLE='bar'")); + } + + @Test + public void testDeleteEventPruningOn() throws Exception { + HiveConf.setBoolVar(hiveConf, + HiveConf.ConfVars.FILTER_DELETE_EVENTS, true); + testDeleteEventPruning(); + } + @Test + public void testDeleteEventPruningOff() throws Exception { + HiveConf.setBoolVar(hiveConf, + HiveConf.ConfVars.FILTER_DELETE_EVENTS, false); + testDeleteEventPruning(); + } + /** + * run with and w/o event fitlering enabled - should get the same results + * {@link TestVectorizedOrcAcidRowBatchReader#testDeleteEventFiltering()} + * + * todo: add .q test using VerifyNumReducersHook.num.reducers to make sure + * it does have 1 split for each input file. + * Will need to crate VerifyNumMappersHook + * + * Also, consider + * HiveSplitGenerator.java + * RAW_INPUT_SPLITS and GROUPED_INPUT_SPLITS are the counters before and + * after grouping splits PostExecTezSummaryPrinter post exec hook can be + * used to printout specific counters + */ + private void testDeleteEventPruning() throws Exception { + HiveConf.setBoolVar(hiveConf, + HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); + dropTable(new String[] {"T"}); + runStatementOnDriver( + "create transactional table T(a int, b int) stored as orc"); + runStatementOnDriver("insert into T values(1,2),(4,5)"); + runStatementOnDriver("insert into T values(1,3),(4,6)"); + runStatementOnDriver("delete from T where a = 1"); + List rs = runStatementOnDriver( + "select ROW__ID, a, b from T order by a, b"); + + boolean isVectorized = + hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED); + String testQuery = isVectorized ? + "select ROW__ID, a, b from T order by a, b" : + "select ROW__ID, a, b, INPUT__FILE__NAME from T order by a, b"; + String[][] expected = new String[][]{ + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t4\t5", + "warehouse/t/delta_0000001_0000001_0000/bucket_00000"}, + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t4\t6", + "warehouse/t/delta_0000002_0000002_0000/bucket_00000"}}; + checkResult(expected, testQuery, isVectorized, "?", LOG); + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java index a5bd1cbd67..e882c94cfe 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java @@ -176,67 +176,4 @@ public void testConcatenateMM() throws Exception { "t/base_0000002/000000_0"}}; checkResult(expected2, testQuery, false, "check data after concatenate", LOG); } - @Test - public void testRenameTable() throws Exception { - MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID, true); - runStatementOnDriver("drop database if exists mydb1 cascade"); - runStatementOnDriver("drop database if exists mydb2 cascade"); - runStatementOnDriver("create database mydb1"); - runStatementOnDriver("create database mydb2"); - runStatementOnDriver("create table mydb1.T(a int, b int) stored as orc"); - runStatementOnDriver("insert into mydb1.T values(1,2),(4,5)"); - //put something in WRITE_SET - runStatementOnDriver("update mydb1.T set b = 6 where b = 5"); - runStatementOnDriver("alter table mydb1.T compact 'minor'"); - - runStatementOnDriver("alter table mydb1.T RENAME TO mydb1.S"); - - String testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from mydb1.S"; - String[][] expected = new String[][] { - {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", - "s/delta_0000001_0000001_0000/bucket_00000"}, - {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t4\t6", - "s/delta_0000002_0000002_0000/bucket_00000"}}; - checkResult(expected, testQuery, false, "check data", LOG); - - - Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='t'")); - Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from COMPACTION_QUEUE where CQ_TABLE='t'")); - Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from WRITE_SET where WS_TABLE='t'")); - Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='t'")); - Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from NEXT_WRITE_ID where NWI_TABLE='t'")); - - Assert.assertEquals( - TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"), 2, - TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='s'")); - Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from COMPACTION_QUEUE where CQ_TABLE='s'")); - Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from WRITE_SET where WS_TABLE='s'")); - Assert.assertEquals(3, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='s'")); - Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from NEXT_WRITE_ID where NWI_TABLE='s'")); - - runStatementOnDriver("alter table mydb1.S RENAME TO mydb2.bar"); - - Assert.assertEquals( - TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"), 2, - TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='bar'")); - Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from COMPACTION_QUEUE where CQ_TABLE='bar'")); - Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from WRITE_SET where WS_TABLE='bar'")); - Assert.assertEquals(4, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='bar'")); - Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from NEXT_WRITE_ID where NWI_TABLE='bar'")); - } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java index 05ce3e214d..dab0d982c9 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java +++ ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java @@ -249,4 +249,9 @@ protected void checkResult(String[][] expectedResult, String query, boolean isVe checkExpected(rs, expectedResult, msg + (isVectorized ? " vect" : ""), LOG, !isVectorized); assertVectorized(isVectorized, query); } + void dropTable(String[] tabs) throws Exception { + for(String tab : tabs) { + d.run("drop table if exists " + tab); + } + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java index 551e5ca0a6..f4831ae23a 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java @@ -17,10 +17,6 @@ */ package org.apache.hadoop.hive.ql.io.orc; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - import java.io.File; import java.util.List; @@ -49,6 +45,8 @@ import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.*; + /** * This class tests the VectorizedOrcAcidRowBatchReader by creating an actual split and a set * of delete delta files. The split is on an insert delta and there are multiple delete deltas @@ -62,6 +60,7 @@ private JobConf conf; private FileSystem fs; private Path root; + private ObjectInspector inspector; public static class DummyRow { LongWritable field; @@ -90,7 +89,6 @@ static String getColumnTypesProperty() { @Before public void setup() throws Exception { conf = new JobConf(); - conf.set("bucket_count", "1"); conf.set(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true"); conf.setBoolean(HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN.varname, true); conf.set(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, "default"); @@ -107,11 +105,175 @@ public void setup() throws Exception { fs = root.getFileSystem(conf); root = fs.makeQualified(root); fs.delete(root, true); - ObjectInspector inspector; synchronized (TestOrcFile.class) { inspector = ObjectInspectorFactory.getReflectionObjectInspector (DummyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); } + } + @Test + public void testDeleteEventFilteringOff() throws Exception { + HiveConf.setBoolVar(conf, HiveConf.ConfVars.FILTER_DELETE_EVENTS, false); + testDeleteEventFiltering(); + } + @Test + public void testDeleteEventFilteringOn() throws Exception { + HiveConf.setBoolVar(conf, HiveConf.ConfVars.FILTER_DELETE_EVENTS, true); + testDeleteEventFiltering(); + } + + /** + * Tests that we can figure out min/max ROW__ID for each split and then use + * that to only load delete events between min/max. + * This test doesn't actually check what is read - that is done more E2E + * unit tests. + * @throws Exception + */ + private void testDeleteEventFiltering() throws Exception { + boolean filterOn = + HiveConf.getBoolVar(conf, HiveConf.ConfVars.FILTER_DELETE_EVENTS); + int bucket = 0; + AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) + .filesystem(fs) + .bucket(bucket) + .writingBase(false) + .minimumWriteId(1) + .maximumWriteId(1) + .inspector(inspector) + .reporter(Reporter.NULL) + .recordIdColumn(1) + .finalDestination(root); + + int bucketProperty = BucketCodec.V1.encode(options); + + //create 3 insert deltas so that we have 3 splits + RecordUpdater updater = new OrcRecordUpdater(root, options); + updater.insert(options.getMinimumWriteId(), + new DummyRow(1, 0, options.getMinimumWriteId(), bucket)); + updater.insert(options.getMinimumWriteId(), + new DummyRow(2, 1, options.getMinimumWriteId(), bucket)); + updater.insert(options.getMinimumWriteId(), + new DummyRow(3, 2, options.getMinimumWriteId(), bucket)); + updater.close(false); + + options.minimumWriteId(2) + .maximumWriteId(2); + updater = new OrcRecordUpdater(root, options); + updater.insert(options.getMinimumWriteId(), + new DummyRow(4, 0, options.getMinimumWriteId(), bucket)); + updater.insert(options.getMinimumWriteId(), + new DummyRow(5, 1, options.getMinimumWriteId(), bucket)); + updater.insert(options.getMinimumWriteId(), + new DummyRow(6, 2, options.getMinimumWriteId(), bucket)); + updater.close(false); + + options.minimumWriteId(3) + .maximumWriteId(3); + updater = new OrcRecordUpdater(root, options); + updater.insert(options.getMinimumWriteId(), + new DummyRow(7, 0, options.getMinimumWriteId(), bucket)); + updater.insert(options.getMinimumWriteId(), + new DummyRow(8, 1, options.getMinimumWriteId(), bucket)); + updater.insert(options.getMinimumWriteId(), + new DummyRow(9, 2, options.getMinimumWriteId(), bucket)); + updater.close(false); + + //delete 1 row from each of the insert deltas + options.minimumWriteId(4) + .maximumWriteId(4); + updater = new OrcRecordUpdater(root, options); + updater.delete(options.getMinimumWriteId(), + new DummyRow(-1, 0, 1, bucket)); + updater.delete(options.getMinimumWriteId(), + new DummyRow(-1, 1, 2, bucket)); + updater.delete(options.getMinimumWriteId(), + new DummyRow(-1, 2, 3, bucket)); + updater.close(false); + + //HWM is not important - just make sure deltas created above are read as + // if committed + conf.set(ValidWriteIdList.VALID_WRITEIDS_KEY, + "tbl:5:" + Long.MAX_VALUE + "::"); + + //now we have 3 delete events total, but for each split we should only + // load 1 into DeleteRegistry (if filtering is on) + List> splitStrategies = getSplitStrategies(); + assertEquals(1, splitStrategies.size()); + List splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits(); + + assertEquals(3, splits.size()); + assertEquals(root.toUri().toString() + File.separator + + "delta_0000001_0000001_0000/bucket_00000", + splits.get(0).getPath().toUri().toString()); + assertFalse(splits.get(0).isOriginal()); + + assertEquals(root.toUri().toString() + File.separator + + "delta_0000002_0000002_0000/bucket_00000", + splits.get(1).getPath().toUri().toString()); + assertFalse(splits.get(1).isOriginal()); + + assertEquals(root.toUri().toString() + File.separator + + "delta_0000003_0000003_0000/bucket_00000", + splits.get(2).getPath().toUri().toString()); + assertFalse(splits.get(2).isOriginal()); + + VectorizedOrcAcidRowBatchReader vectorizedReader = + new VectorizedOrcAcidRowBatchReader(splits.get(0), conf, Reporter.NULL, + new VectorizedRowBatchCtx()); + ColumnizedDeleteEventRegistry deleteEventRegistry = + (ColumnizedDeleteEventRegistry) vectorizedReader + .getDeleteEventRegistry(); + assertEquals("number of delete events for stripe 1", filterOn ? 1 : 3, + deleteEventRegistry.size()); + OrcRawRecordMerger.KeyInterval keyInterval = + vectorizedReader.getKeyInterval(); + if(filterOn) { + assertEquals(new OrcRawRecordMerger.KeyInterval( + new RecordIdentifier(1, bucketProperty, 0), + new RecordIdentifier(1, bucketProperty, 2)), + keyInterval); + } + else { + assertEquals(new OrcRawRecordMerger.KeyInterval(null, null), keyInterval); + } + + vectorizedReader = new VectorizedOrcAcidRowBatchReader(splits.get(1), conf, + Reporter.NULL, new VectorizedRowBatchCtx()); + deleteEventRegistry = (ColumnizedDeleteEventRegistry) vectorizedReader + .getDeleteEventRegistry(); + assertEquals("number of delete events for stripe 2", filterOn ? 1 : 3, + deleteEventRegistry.size()); + keyInterval = vectorizedReader.getKeyInterval(); + if(filterOn) { + assertEquals(new OrcRawRecordMerger.KeyInterval( + new RecordIdentifier(2, bucketProperty, 0), + new RecordIdentifier(2, bucketProperty, 2)), + keyInterval); + } + else { + assertEquals(new OrcRawRecordMerger.KeyInterval(null, null), keyInterval); + } + + vectorizedReader = new VectorizedOrcAcidRowBatchReader(splits.get(2), conf, + Reporter.NULL, new VectorizedRowBatchCtx()); + deleteEventRegistry = (ColumnizedDeleteEventRegistry) vectorizedReader + .getDeleteEventRegistry(); + assertEquals("number of delete events for stripe 3", filterOn ? 1 : 3, + deleteEventRegistry.size()); + keyInterval = vectorizedReader.getKeyInterval(); + if(filterOn) { + assertEquals(new OrcRawRecordMerger.KeyInterval( + new RecordIdentifier(3, bucketProperty, 0), + new RecordIdentifier(3, bucketProperty, 2)), keyInterval); + } + else { + assertEquals(new OrcRawRecordMerger.KeyInterval(null, null), keyInterval); + } + } + + @Test + public void testVectorizedOrcAcidRowBatchReader() throws Exception { + conf.set("bucket_count", "1"); + int bucket = 0; AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) .filesystem(fs) @@ -177,28 +339,7 @@ public void setup() throws Exception { } } updater.close(false); - } - - private List getSplits() throws Exception { - conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, - AcidUtils.AcidOperationalProperties.getDefault().toInt()); - OrcInputFormat.Context context = new OrcInputFormat.Context(conf); - OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(context, fs, root, false, null); - OrcInputFormat.AcidDirInfo adi = gen.call(); - List> splitStrategies = OrcInputFormat.determineSplitStrategies( - null, context, adi.fs, adi.splitPath, adi.baseFiles, adi.deleteEvents, - null, null, true); - assertEquals(1, splitStrategies.size()); - List splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits(); - assertEquals(1, splits.size()); - assertEquals(root.toUri().toString() + File.separator + "delta_0000001_0000010_0000/bucket_00000", - splits.get(0).getPath().toUri().toString()); - assertFalse(splits.get(0).isOriginal()); - return splits; - } - @Test - public void testVectorizedOrcAcidRowBatchReader() throws Exception { testVectorizedOrcAcidRowBatchReader(ColumnizedDeleteEventRegistry.class.getName()); // To test the SortMergedDeleteEventRegistry, we need to explicitly set the @@ -213,7 +354,16 @@ public void testVectorizedOrcAcidRowBatchReader() throws Exception { private void testVectorizedOrcAcidRowBatchReader(String deleteEventRegistry) throws Exception { - List splits = getSplits(); + List> splitStrategies = getSplitStrategies(); + assertEquals(1, splitStrategies.size()); + List splits = ((OrcInputFormat.ACIDSplitStrategy) + splitStrategies.get(0)).getSplits(); + assertEquals(1, splits.size()); + assertEquals(root.toUri().toString() + File.separator + + "delta_0000001_0000010_0000/bucket_00000", + splits.get(0).getPath().toUri().toString()); + assertFalse(splits.get(0).isOriginal()); + // Mark one of the transactions as an exception to test that invalid transactions // are being handled properly. conf.set(ValidWriteIdList.VALID_WRITEIDS_KEY, "tbl:14:1:1:5"); // Exclude transaction 5 @@ -244,4 +394,16 @@ private void testVectorizedOrcAcidRowBatchReader(String deleteEventRegistry) thr } } } + private List> getSplitStrategies() throws Exception { + conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, + AcidUtils.AcidOperationalProperties.getDefault().toInt()); + OrcInputFormat.Context context = new OrcInputFormat.Context(conf); + OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator( + context, fs, root, false, null); + OrcInputFormat.AcidDirInfo adi = gen.call(); + return OrcInputFormat.determineSplitStrategies( + null, context, adi.fs, adi.splitPath, adi.baseFiles, adi.deleteEvents, + null, null, true); + + } }