diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 3b8e820f61..0cecae5f1a 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1858,7 +1858,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal + "into memory to optimize for performance. To prevent out-of-memory errors, this is a rough heuristic\n" + "that limits the total number of delete events that can be loaded into memory at once.\n" + "Roughly it has been set to 10 million delete events per bucket (~160 MB).\n"), - + FILTER_DELETE_EVENTS("hive.txn.filter.delete.events", true, + "If true, VectorizedOrcAcidRowBatchReader will compute min/max " + + "ROW__ID for the split and only load delete events in that range.\n" + ), HIVESAMPLERANDOMNUM("hive.sample.seednumber", 0, "A number used to percentage sampling. By changing this number, user will change the subsets of data sampled."), diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index d82faf3e13..8c7a78b9c5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -983,7 +983,7 @@ public void process(Object row, int tag) throws HiveException { int writerOffset; // This if/else chain looks ugly in the inner loop, but given that it will be 100% the same // for a given operator branch prediction should work quite nicely on it. - // RecordUpdateer expects to get the actual row, not a serialized version of it. Thus we + // RecordUpdater expects to get the actual row, not a serialized version of it. Thus we // pass the row rather than recordValue. if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || conf.isMmTable()) { rowOutWriters[findWriterOffset(row)].write(recordValue); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java index ea7ba53a3a..e38e21a3d0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java @@ -176,6 +176,7 @@ protected int compareToInternal(RecordIdentifier other) { @Override public int compareTo(RecordIdentifier other) { if (other.getClass() != RecordIdentifier.class) { + //WTF? assumes that other instanceof OrcRawRecordMerger.ReaderKey??? return -other.compareTo(this); } return compareToInternal(other); @@ -219,17 +220,15 @@ public int hashCode() { @Override public String toString() { - BucketCodec codec = - BucketCodec.determineVersion(bucketId); - String s = "(" + codec.getVersion() + "." + codec.decodeWriterId(bucketId) + - "." + codec.decodeStatementId(bucketId) + ")"; - return "{originalWriteId: " + writeId + ", " + bucketToString() + ", row: " + getRowId() +"}"; + return "RecordIdentifier(" + writeId + ", " + bucketToString(bucketId) + "," + + getRowId() +")"; } - protected String bucketToString() { - if (bucketId == -1) return ("bucket: " + bucketId); + public static String bucketToString(int bucketId) { + if (bucketId == -1) return "" + bucketId; BucketCodec codec = BucketCodec.determineVersion(bucketId); - return "bucket: " + bucketId + "(" + codec.getVersion() + "." + - codec.decodeWriterId(bucketId) + "." + codec.decodeStatementId(bucketId) + ")"; + return bucketId + "(" + codec.getVersion() + "." + + codec.decodeWriterId(bucketId) + "." + + codec.decodeStatementId(bucketId) + ")"; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 929ea9b1ed..6be0c74f4e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -20,10 +20,10 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.TreeMap; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -33,7 +33,6 @@ import org.apache.orc.OrcUtils; import org.apache.orc.StripeInformation; import org.apache.orc.TypeDescription; -import org.apache.orc.impl.SchemaEvolution; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -168,7 +167,8 @@ int compareRow(RecordIdentifier other) { @Override public String toString() { return "{originalWriteId: " + getWriteId() + ", " + - bucketToString() + ", row: " + getRowId() + ", currentWriteId " + currentWriteId + "}"; + bucketToString(getBucketProperty()) + ", row: " + getRowId() + + ", currentWriteId " + currentWriteId + "}"; } } interface ReaderPair { @@ -664,20 +664,37 @@ private Reader advanceToNextFile() throws IOException { // The key of the next lowest reader. private ReaderKey secondaryKey = null; - - private static final class KeyInterval { + static final class KeyInterval { private final RecordIdentifier minKey; private final RecordIdentifier maxKey; - private KeyInterval(RecordIdentifier minKey, RecordIdentifier maxKey) { + KeyInterval(RecordIdentifier minKey, RecordIdentifier maxKey) { this.minKey = minKey; this.maxKey = maxKey; } - private RecordIdentifier getMinKey() { + RecordIdentifier getMinKey() { return minKey; } - private RecordIdentifier getMaxKey() { + RecordIdentifier getMaxKey() { return maxKey; + }; + @Override + public String toString() { + return "KeyInterval[" + minKey + "," + maxKey + "]"; } + @Override + public boolean equals(Object other) { + if(!(other instanceof KeyInterval)) { + return false; + } + KeyInterval otherInterval = (KeyInterval)other; + return Objects.equals(minKey, otherInterval.getMinKey()) && + Objects.equals(maxKey, otherInterval.getMaxKey()); + } + @Override + public int hashCode() { + return Objects.hash(minKey, maxKey); + } + } /** * Find the key range for original bucket files. diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 1841cfaa2e..b30fb2da69 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -44,11 +44,19 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; +import org.apache.orc.ColumnStatistics; +import org.apache.orc.IntegerColumnStatistics; +import org.apache.orc.OrcConf; +import org.apache.orc.StripeInformation; +import org.apache.orc.StripeStatistics; import org.apache.orc.impl.AcidStats; import org.apache.orc.impl.OrcAcidUtils; import org.slf4j.Logger; @@ -99,6 +107,16 @@ * To have access to {@link RecordReader#getRowNumber()} in the underlying file */ private RecordReader innerReader; + /** + * min/max ROW__ID for the split (if available) so that we can limit the + * number of delete events to load in memory + */ + private final OrcRawRecordMerger.KeyInterval keyInterval; + /** + * {@link SearchArgument} pushed down to delete_deltaS + */ + private SearchArgument deleteEventSarg = null; + VectorizedOrcAcidRowBatchReader(OrcSplit inputSplit, JobConf conf, Reporter reporter) throws IOException { @@ -110,7 +128,7 @@ this(conf, inputSplit, reporter, rbCtx == null ? Utilities.getVectorizedRowBatchCtx(conf) : rbCtx, false); - final Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, (OrcSplit) inputSplit); + final Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, inputSplit); // Careful with the range here now, we do not want to read the whole base file like deltas. innerReader = reader.rowsOptions(readerOptions.range(offset, length), conf); baseReader = new org.apache.hadoop.mapred.RecordReader() { @@ -197,20 +215,21 @@ private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit orcSplit, Reporte String txnString = conf.get(ValidWriteIdList.VALID_WRITEIDS_KEY); this.validWriteIdList = (txnString == null) ? new ValidReaderWriteIdList() : new ValidReaderWriteIdList(txnString); - LOG.debug("VectorizedOrcAcidRowBatchReader:: Read ValidWriteIdList: " + this.validWriteIdList.toString() - + " isFullAcidTable: " + AcidUtils.isFullAcidScan(conf)); + LOG.info("Read ValidWriteIdList: " + this.validWriteIdList.toString() + + ":" + orcSplit); // Clone readerOptions for deleteEvents. Reader.Options deleteEventReaderOptions = readerOptions.clone(); // Set the range on the deleteEventReaderOptions to 0 to INTEGER_MAX because // we always want to read all the delete delta files. deleteEventReaderOptions.range(0, Long.MAX_VALUE); - // Disable SARGs for deleteEventReaders, as SARGs have no meaning. - deleteEventReaderOptions.searchArgument(null, null); + keyInterval = findMinMaxKeys(orcSplit, conf, deleteEventReaderOptions); DeleteEventRegistry der; try { - // See if we can load all the delete events from all the delete deltas in memory... - der = new ColumnizedDeleteEventRegistry(conf, orcSplit, deleteEventReaderOptions); + // See if we can load all the relevant delete events from all the + // delete deltas in memory... + der = new ColumnizedDeleteEventRegistry(conf, orcSplit, + deleteEventReaderOptions, keyInterval); } catch (DeleteEventsOverflowMemoryException e) { // If not, then create a set of hanging readers that do sort-merge to find the next smallest // delete event on-demand. Caps the memory consumption to (some_const * no. of readers). @@ -232,13 +251,254 @@ private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit orcSplit, Reporte syntheticProps = computeOffsetAndBucket(orcSplit, conf, validWriteIdList); } + /** + * Generates a SearchArgument to push down to delete_delta files. + * + * + * Note that bucket is a bit packed int, so even thought all delete events + * for a given split have the same bucket ID but not the same "bucket" value + * {@link BucketCodec} + */ + private void setSARG(OrcRawRecordMerger.KeyInterval keyInterval, + Reader.Options deleteEventReaderOptions, + long minBucketProp, long maxBucketProp, long minRowId, long maxRowId) { + SearchArgument.Builder b = null; + if(keyInterval.getMinKey() != null) { + RecordIdentifier k = keyInterval.getMinKey(); + b = SearchArgumentFactory.newBuilder(); + b.startAnd() //not(ot < 7) -> ot >=7 + .startNot().lessThan("originalTransaction", + PredicateLeaf.Type.LONG, k.getWriteId()).end(); + b.startNot().lessThan( + "bucket", PredicateLeaf.Type.LONG, minBucketProp).end(); + b.startNot().lessThan("rowId", + PredicateLeaf.Type.LONG, minRowId).end(); + b.end(); + } + if(keyInterval.getMaxKey() != null) { + RecordIdentifier k = keyInterval.getMaxKey(); + if(b == null) { + b = SearchArgumentFactory.newBuilder(); + } + b.startAnd().lessThanEquals( + "originalTransaction", PredicateLeaf.Type.LONG, k.getWriteId()); + b.lessThanEquals("bucket", PredicateLeaf.Type.LONG, maxBucketProp); + b.lessThanEquals("rowId", PredicateLeaf.Type.LONG, maxRowId); + b.end(); + } + if(b != null) { + deleteEventSarg = b.build(); + LOG.info("deleteReader SARG(" + deleteEventSarg + ") "); + deleteEventReaderOptions.searchArgument(deleteEventSarg, + new String[] {"originalTransaction", "bucket", "rowId"}); + return; + } + deleteEventReaderOptions.searchArgument(null, null); + } public void setBaseAndInnerReader( - final org.apache.hadoop.mapred.RecordReader baseReader) { + final org.apache.hadoop.mapred.RecordReader baseReader) { this.baseReader = baseReader; this.innerReader = null; this.vectorizedRowBatchBase = baseReader.createValue(); } + /** + * A given ORC reader will always process one or more whole stripes but the + * split boundaries may not line up with stripe boundaries if the InputFormat + * doesn't understand ORC specifics. So first we need to figure out which + * stripe(s) we are reading. + * + * Suppose txn1 writes 100K rows + * and txn2 writes 100 rows so we have events + * {1,0,0}....{1,0,100K},{2,0,0}...{2,0,100} in 2 files + * After compaction we may have 2 stripes + * {1,0,0}...{1,0,90K},{1,0,90001}...{2,0,100} + * + * Now suppose there is a delete stmt that deletes every row. So when we load + * the 2nd stripe, if we just look at stripe {@link ColumnStatistics}, + * minKey={1,0,100} and maxKey={2,0,90001}, all but the 1st 100 delete events + * will get loaded. But with {@link OrcRecordUpdater#ACID_KEY_INDEX_NAME}, + * minKey={1,0,90001} and maxKey={2,0,100} so we only load about 10K deletes. + * + * Also, even with Query Based compactor (once we have it), FileSinkOperator + * uses OrcRecordWriter to write to file, so we should have the + * hive.acid.index in place. + * + * If reading the 1st stripe, we don't have the start event, so we'll get it + * from stats, which will strictly speaking be accurate only wrt writeId and + * bucket but that is good enough. + * + * @return empty KeyInterval if KeyInterval could not be + * determined + */ + private OrcRawRecordMerger.KeyInterval findMinMaxKeys( + OrcSplit orcSplit, Configuration conf, + Reader.Options deleteEventReaderOptions) throws IOException { + if(!HiveConf.getBoolVar(conf, ConfVars.FILTER_DELETE_EVENTS)) { + LOG.debug("findMinMaxKeys() " + ConfVars.FILTER_DELETE_EVENTS + "=false"); + return new OrcRawRecordMerger.KeyInterval(null, null); + } + if(orcSplit.isOriginal()) { + /** + * Among originals we may have files with _copy_N suffix. To properly + * generate a synthetic ROW___ID for them we need + * {@link OffsetAndBucketProperty} which could be an expensive computation + * if there are lots of copy_N files for a given bucketId. But unless + * there are delete events, we often don't need synthetic ROW__IDs at all. + * Kind of chicken-and-egg - deal with this later. + * See {@link OrcRawRecordMerger#discoverOriginalKeyBounds(Reader, int, + * Reader.Options, Configuration, OrcRawRecordMerger.Options)}*/ + LOG.debug("findMinMaxKeys(original split) - ignoring"); + return new OrcRawRecordMerger.KeyInterval(null, null); + } + //todo: since we already have OrcSplit.orcTail, should somehow use it to + // get the acid.index, stats, etc rather than fetching the footer again + // though it seems that orcTail is mostly null.... + Reader reader = OrcFile.createReader(orcSplit.getPath(), + OrcFile.readerOptions(conf)); + List stripes = reader.getStripes(); + final long splitStart = orcSplit.getStart(); + final long splitEnd = splitStart + orcSplit.getLength(); + int firstSripeIndex = -1; + int lastStripeIndex = -1; + for(int i = 0; i < stripes.size(); i++) { + StripeInformation stripe = stripes.get(i); + long stripeEnd = stripe.getOffset() + stripe.getLength(); + if(firstSripeIndex == -1 && stripe.getOffset() >= splitStart) { + firstSripeIndex = i; + } + if(lastStripeIndex == -1 && splitEnd <= stripeEnd && + stripes.get(firstSripeIndex).getOffset() <= stripe.getOffset() ) { + //the last condition is for when both splitStart and splitEnd are in + // the same stripe + lastStripeIndex = i; + } + } + if(lastStripeIndex == -1) { + //split goes to the EOF which is > end of stripe since file has a footer + assert stripes.get(stripes.size() - 1).getOffset() + + stripes.get(stripes.size() - 1).getLength() < splitEnd; + lastStripeIndex = stripes.size() - 1; + } + if(firstSripeIndex == -1 || lastStripeIndex == -1) { + //this should not happen but... if we don't know which stripe(s) are + //involved we can't figure out min/max bounds + LOG.warn("Could not find stripe (" + firstSripeIndex + "," + + lastStripeIndex + ")"); + return new OrcRawRecordMerger.KeyInterval(null, null); + } + RecordIdentifier[] keyIndex = OrcRecordUpdater.parseKeyIndex(reader); + if(keyIndex == null || keyIndex.length != stripes.size()) { + LOG.warn("Could not find keyIndex or length doesn't match (" + + firstSripeIndex + "," + lastStripeIndex + "," + stripes.size() + "," + + (keyIndex == null ? -1 : keyIndex.length) + ")"); + return new OrcRawRecordMerger.KeyInterval(null, null); + } + /** + * If {@link OrcConf.ROW_INDEX_STRIDE} is set to 0 all column stats on + * ORC file are disabled though objects for them exist but and have + * min/max set to MIN_LONG/MAX_LONG so we only use column stats if they + * are actually computed. Streaming ingest used to set it 0 and Minor + * compaction so there are lots of legacy files with no (rather, bad) + * column stats*/ + boolean columnStatsPresent = reader.getRowIndexStride() > 0; + if(!columnStatsPresent) { + LOG.debug("findMinMaxKeys() No ORC column stats"); + } + RecordIdentifier minKey = null; + if(firstSripeIndex > 0) { + //valid keys are strictly > than this key + minKey = keyIndex[firstSripeIndex - 1]; + //add 1 to make comparison >= to match the case of 0th stripe + minKey.setRowId(minKey.getRowId() + 1); + } + else { + List stats = reader.getStripeStatistics(); + assert stripes.size() == stats.size() : "str.s=" + stripes.size() + + " sta.s=" + stats.size(); + if(columnStatsPresent) { + ColumnStatistics[] colStats = + stats.get(firstSripeIndex).getColumnStatistics(); + /* + Structure in data is like this: + > + The +1 is to account for the top level struct which has a + ColumnStatistics object in colsStats. Top level struct is normally + dropped by the Reader (I guess because of orc.impl.SchemaEvolution) + */ + IntegerColumnStatistics origWriteId = (IntegerColumnStatistics) + colStats[OrcRecordUpdater.ORIGINAL_WRITEID + 1]; + IntegerColumnStatistics bucketProperty = (IntegerColumnStatistics) + colStats[OrcRecordUpdater.BUCKET + 1]; + IntegerColumnStatistics rowId = (IntegerColumnStatistics) + colStats[OrcRecordUpdater.ROW_ID + 1]; + //we may want to change bucketProperty from int to long in the + // future(across the stack) this protects the following cast to int + assert bucketProperty.getMinimum() <= Integer.MAX_VALUE : + "was bucketProper changed to a long (" + + bucketProperty.getMinimum() + ")?!:" + orcSplit; + //this a lower bound but not necessarily greatest lower bound + minKey = new RecordIdentifier(origWriteId.getMinimum(), + (int) bucketProperty.getMinimum(), rowId.getMinimum()); + } + } + OrcRawRecordMerger.KeyInterval keyInterval = + new OrcRawRecordMerger.KeyInterval(minKey, keyIndex[lastStripeIndex]); + LOG.info("findMinMaxKeys(): " + keyInterval + + " stripes(" + firstSripeIndex + "," + lastStripeIndex + ")"); + + long minBucketProp = Long.MAX_VALUE, maxBucketProp = Long.MIN_VALUE; + long minRowId = Long.MAX_VALUE, maxRowId = Long.MIN_VALUE; + if(columnStatsPresent) { + /** + * figure out min/max bucket, rowid for push down. This is different from + * min/max ROW__ID because ROW__ID comparison uses dictionary order on two + * tuples (a,b,c), but PPD can only do + * (a between (x,y) and b between(x1,y1) and c between(x2,y2)) + * Consider: + * (0,536936448,0), (0,536936448,2), (10000001,536936448,0) + * 1st is min ROW_ID, 3r is max ROW_ID + * and Delete events (0,536936448,2),....,(10000001,536936448,1000000) + * So PPD based on min/max ROW_ID would have 0<= rowId <=0 which will + * miss this delete event. But we still want PPD to filter out data if + * possible. + * + * So use stripe stats to find proper min/max for bucketProp and rowId + * writeId is the same in both cases + */ + List stats = reader.getStripeStatistics(); + for(int i = firstSripeIndex; i <= lastStripeIndex; i++) { + ColumnStatistics[] colStats = stats.get(firstSripeIndex) + .getColumnStatistics(); + IntegerColumnStatistics bucketProperty = (IntegerColumnStatistics) + colStats[OrcRecordUpdater.BUCKET + 1]; + IntegerColumnStatistics rowId = (IntegerColumnStatistics) + colStats[OrcRecordUpdater.ROW_ID + 1]; + if(bucketProperty.getMinimum() < minBucketProp) { + minBucketProp = bucketProperty.getMinimum(); + } + if(bucketProperty.getMaximum() > maxBucketProp) { + maxBucketProp = bucketProperty.getMaximum(); + } + if(rowId.getMinimum() < minRowId) { + minRowId = rowId.getMinimum(); + } + if(rowId.getMaximum() > maxRowId) { + maxRowId = rowId.getMaximum(); + } + } + } + if(minBucketProp == Long.MAX_VALUE) minBucketProp = Long.MIN_VALUE; + if(maxBucketProp == Long.MIN_VALUE) maxBucketProp = Long.MAX_VALUE; + if(minRowId == Long.MAX_VALUE) minRowId = Long.MIN_VALUE; + if(maxRowId == Long.MIN_VALUE) maxRowId = Long.MAX_VALUE; + + setSARG(keyInterval, deleteEventReaderOptions, minBucketProp, maxBucketProp, + minRowId, maxRowId); + return keyInterval; + } + /** * Used for generating synthetic ROW__IDs for reading "original" files */ @@ -627,7 +887,7 @@ DeleteEventRegistry getDeleteEventRegistry() { * will read the delete delta files and will create their own internal * data structures to maintain record ids of the records that got deleted. */ - protected static interface DeleteEventRegistry { + protected interface DeleteEventRegistry { /** * Modifies the passed bitset to indicate which of the rows in the batch * have been deleted. Assumes that the batch.size is equal to bitset size. @@ -657,6 +917,9 @@ DeleteEventRegistry getDeleteEventRegistry() { * delete events. This internally uses the OrcRawRecordMerger and maintains a constant * amount of memory usage, given the number of delete delta files. Therefore, this * implementation will be picked up when the memory pressure is high. + * + * Don't bother to use KeyInterval from split here because since this doesn't + * buffer delete events in memory. */ static class SortMergedDeleteEventRegistry implements DeleteEventRegistry { private OrcRawRecordMerger deleteRecords; @@ -665,16 +928,15 @@ DeleteEventRegistry getDeleteEventRegistry() { private Boolean isDeleteRecordAvailable = null; private ValidWriteIdList validWriteIdList; - SortMergedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, Reader.Options readerOptions) - throws IOException { + SortMergedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, + Reader.Options readerOptions) throws IOException { final Path[] deleteDeltas = getDeleteDeltaDirsFromSplit(orcSplit); if (deleteDeltas.length > 0) { int bucket = AcidUtils.parseBucketId(orcSplit.getPath()); String txnString = conf.get(ValidWriteIdList.VALID_WRITEIDS_KEY); this.validWriteIdList = (txnString == null) ? new ValidReaderWriteIdList() : new ValidReaderWriteIdList(txnString); - LOG.debug("SortMergedDeleteEventRegistry:: Read ValidWriteIdList: " + this.validWriteIdList.toString() - + " isFullAcidTable: " + AcidUtils.isFullAcidScan(conf)); + LOG.debug("Using SortMergedDeleteEventRegistry"); OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isDeleteReader(true); assert !orcSplit.isOriginal() : "If this now supports Original splits, set up mergeOptions properly"; this.deleteRecords = new OrcRawRecordMerger(conf, true, null, false, bucket, @@ -812,6 +1074,7 @@ public void close() throws IOException { * A simple wrapper class to hold the (owid, bucketProperty, rowId) pair. */ static class DeleteRecordKey implements Comparable { + private static final DeleteRecordKey otherKey = new DeleteRecordKey(); private long originalWriteId; /** * see {@link BucketCodec} @@ -844,9 +1107,18 @@ public int compareTo(DeleteRecordKey other) { } return 0; } + private int compareTo(RecordIdentifier other) { + if (other == null) { + return -1; + } + otherKey.set(other.getWriteId(), other.getBucketProperty(), + other.getRowId()); + return compareTo(otherKey); + } @Override public String toString() { - return "owid: " + originalWriteId + " bucketP:" + bucketProperty + " rowid: " + rowId; + return "DeleteRecordKey(" + originalWriteId + "," + + RecordIdentifier.bucketToString(bucketProperty) + "," + rowId +")"; } } @@ -868,10 +1140,30 @@ public String toString() { private boolean isBucketPropertyRepeating; private final boolean isBucketedTable; private final Reader reader; + private final Path deleteDeltaFile; + private final OrcRawRecordMerger.KeyInterval keyInterval; + private final OrcSplit orcSplit; + /** + * total number in the file + */ + private final long numEvents; + /** + * number of events lifted from disk + * some may be skipped due to PPD + */ + private long numEventsFromDisk = 0; + /** + * number of events actually loaded in memory + */ + private long numEventsLoaded = 0; - DeleteReaderValue(Reader deleteDeltaReader, Reader.Options readerOptions, int bucket, - ValidWriteIdList validWriteIdList, boolean isBucketedTable, final JobConf conf) throws IOException { + DeleteReaderValue(Reader deleteDeltaReader, Path deleteDeltaFile, + Reader.Options readerOptions, int bucket, ValidWriteIdList validWriteIdList, + boolean isBucketedTable, final JobConf conf, + OrcRawRecordMerger.KeyInterval keyInterval, OrcSplit orcSplit) + throws IOException { this.reader = deleteDeltaReader; + this.deleteDeltaFile = deleteDeltaFile; this.recordReader = deleteDeltaReader.rowsOptions(readerOptions, conf); this.bucketForSplit = bucket; final boolean useDecimal64ColumnVector = HiveConf.getVar(conf, ConfVars @@ -887,7 +1179,13 @@ public String toString() { this.indexPtrInBatch = 0; this.validWriteIdList = validWriteIdList; this.isBucketedTable = isBucketedTable; - checkBucketId();//check 1st batch + if(batch != null) { + checkBucketId();//check 1st batch + } + this.keyInterval = keyInterval; + this.orcSplit = orcSplit; + this.numEvents = deleteDeltaReader.getNumberOfRows(); + LOG.debug("Num events stats({},x,x)", numEvents); } public boolean next(DeleteRecordKey deleteRecordKey) throws IOException { @@ -910,15 +1208,36 @@ public boolean next(DeleteRecordKey deleteRecordKey) throws IOException { checkBucketId(deleteRecordKey.bucketProperty); } ++indexPtrInBatch; + numEventsFromDisk++; + if(!isDeleteEventInRange(keyInterval, deleteRecordKey)) { + continue; + } if (validWriteIdList.isWriteIdValid(currentWriteId)) { isValidNext = true; } } + numEventsLoaded++; + return true; + } + static boolean isDeleteEventInRange( + OrcRawRecordMerger.KeyInterval keyInterval, + DeleteRecordKey deleteRecordKey) { + if(keyInterval.getMinKey() != null && + deleteRecordKey.compareTo(keyInterval.getMinKey()) < 0) { + //current deleteEvent is < than minKey + return false; + } + if(keyInterval.getMaxKey() != null && + deleteRecordKey.compareTo(keyInterval.getMaxKey()) > 0) { + //current deleteEvent is > than maxKey + return false; + } return true; } - public void close() throws IOException { this.recordReader.close(); + LOG.debug("Num events stats({},{},{})", + numEvents, numEventsFromDisk, numEventsLoaded); } private long setCurrentDeleteKey(DeleteRecordKey deleteRecordKey) { int originalWriteIdIndex = @@ -964,11 +1283,11 @@ private void checkBucketId(int bucketPropertyFromRecord) throws IOException { .decodeWriterId(bucketPropertyFromRecord); if(bucketIdFromRecord != bucketForSplit) { DeleteRecordKey dummy = new DeleteRecordKey(); - long curTxnId = setCurrentDeleteKey(dummy); + setCurrentDeleteKey(dummy); throw new IOException("Corrupted records with different bucket ids " - + "from the containing bucket file found! Expected bucket id " - + bucketForSplit + ", however found the bucket id " + bucketIdFromRecord + - " from " + dummy + " curTxnId: " + curTxnId); + + "from the containing bucket file found! Expected bucket id " + + bucketForSplit + ", however found " + dummy + + ". (" + orcSplit + "," + deleteDeltaFile + ")"); } } @@ -1022,26 +1341,37 @@ public int compareTo(CompressedOwid other) { * of bucketIds where each entry points at an array of rowIds. We could probably use ArrayList * to manage insertion as the structure is built (LinkedList?). This should reduce memory * footprint (as far as OrcReader to a single reader) - probably bad for LLAP IO + * Or much simpler, make compaction of delete deltas very aggressive so that + * we never have move than a few delete files to read. */ private TreeMap sortMerger; private long rowIds[]; private CompressedOwid compressedOwids[]; private ValidWriteIdList validWriteIdList; - private Boolean isEmpty = null; + private Boolean isEmpty; + private final int maxEventsInMemory; + private final OrcSplit orcSplit; + private final boolean testMode; + + ColumnizedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, - Reader.Options readerOptions) throws IOException, DeleteEventsOverflowMemoryException { + Reader.Options readerOptions, + OrcRawRecordMerger.KeyInterval keyInterval) + throws IOException, DeleteEventsOverflowMemoryException { + this.testMode = conf.getBoolean(ConfVars.HIVE_IN_TEST.varname, false); int bucket = AcidUtils.parseBucketId(orcSplit.getPath()); String txnString = conf.get(ValidWriteIdList.VALID_WRITEIDS_KEY); this.validWriteIdList = (txnString == null) ? new ValidReaderWriteIdList() : new ValidReaderWriteIdList(txnString); - LOG.debug("ColumnizedDeleteEventRegistry:: Read ValidWriteIdList: " + this.validWriteIdList.toString() - + " isFullAcidTable: " + AcidUtils.isFullAcidScan(conf)); - this.sortMerger = new TreeMap(); + LOG.debug("Using ColumnizedDeleteEventRegistry"); + this.sortMerger = new TreeMap<>(); this.rowIds = null; this.compressedOwids = null; - int maxEventsInMemory = HiveConf.getIntVar(conf, ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY); + maxEventsInMemory = HiveConf + .getIntVar(conf, ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY); final boolean isBucketedTable = conf.getInt(hive_metastoreConstants.BUCKET_COUNT, 0) > 0; + this.orcSplit = orcSplit; try { final Path[] deleteDeltaDirs = getDeleteDeltaDirsFromSplit(orcSplit); @@ -1059,27 +1389,18 @@ public int compareTo(CompressedOwid other) { // NOTE: A check for existence of deleteDeltaFile is required because we may not have // deletes for the bucket being taken into consideration for this split processing. if (length != -1 && fs.exists(deleteDeltaFile)) { + /** + * todo: we have OrcSplit.orcTail so we should be able to get stats from there + */ Reader deleteDeltaReader = OrcFile.createReader(deleteDeltaFile, OrcFile.readerOptions(conf).maxLength(length)); - AcidStats acidStats = OrcAcidUtils.parseAcidStats(deleteDeltaReader); - if (acidStats.deletes == 0) { + if (deleteDeltaReader.getNumberOfRows() <= 0) { continue; // just a safe check to ensure that we are not reading empty delete files. } - totalDeleteEventCount += acidStats.deletes; - if (totalDeleteEventCount > maxEventsInMemory) { - // ColumnizedDeleteEventRegistry loads all the delete events from all the delete deltas - // into memory. To prevent out-of-memory errors, this check is a rough heuristic that - // prevents creation of an object of this class if the total number of delete events - // exceed this value. By default, it has been set to 10 million delete events per bucket. - LOG.info("Total number of delete events exceeds the maximum number of delete events " - + "that can be loaded into memory for the delete deltas in the directory at : " - + deleteDeltaDirs.toString() +". The max limit is currently set at " - + maxEventsInMemory + " and can be changed by setting the Hive config variable " - + ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname); - throw new DeleteEventsOverflowMemoryException(); - } + totalDeleteEventCount += deleteDeltaReader.getNumberOfRows(); DeleteReaderValue deleteReaderValue = new DeleteReaderValue(deleteDeltaReader, - readerOptions, bucket, validWriteIdList, isBucketedTable, conf); + deleteDeltaFile, readerOptions, bucket, validWriteIdList, isBucketedTable, conf, + keyInterval, orcSplit); DeleteRecordKey deleteRecordKey = new DeleteRecordKey(); if (deleteReaderValue.next(deleteRecordKey)) { sortMerger.put(deleteRecordKey, deleteReaderValue); @@ -1089,11 +1410,9 @@ public int compareTo(CompressedOwid other) { } } } - // Note: totalDeleteEventCount can actually be higher than real value. - // We assume here it won't be lower. Maybe we should just read and not guess... - if (totalDeleteEventCount > 0) { - readAllDeleteEventsFromDeleteDeltas(totalDeleteEventCount); - } + readAllDeleteEventsFromDeleteDeltas(); + LOG.debug("Number of delete events(limit, actual)=({},{})", + totalDeleteEventCount, size()); } isEmpty = compressedOwids == null || rowIds == null; } catch(IOException|DeleteEventsOverflowMemoryException e) { @@ -1101,7 +1420,26 @@ public int compareTo(CompressedOwid other) { throw e; // rethrow the exception so that the caller can handle. } } - + private void checkSize(int index) throws DeleteEventsOverflowMemoryException { + if(index > maxEventsInMemory) { + //check to prevent OOM errors + LOG.info("Total number of delete events exceeds the maximum number of " + + "delete events that can be loaded into memory for " + orcSplit + + ". The max limit is currently set at " + maxEventsInMemory + + " and can be changed by setting the Hive config variable " + + ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY.varname); + throw new DeleteEventsOverflowMemoryException(); + } + if(index < rowIds.length) { + return; + } + int newLength = rowIds.length + 1000000; + if(rowIds.length <= 1000000) { + //double small arrays; increase by 1M large arrays + newLength = rowIds.length * 2; + } + rowIds = Arrays.copyOf(rowIds, newLength); + } /** * This is not done quite right. The intent of {@link CompressedOwid} is a hedge against * "delete from T" that generates a huge number of delete events possibly even 2G - max array @@ -1110,15 +1448,14 @@ public int compareTo(CompressedOwid other) { * In practice we should be filtering delete evens by min/max ROW_ID from the split. The later * is also not yet implemented: HIVE-16812. */ - private void readAllDeleteEventsFromDeleteDeltas( - int totalDeleteEventCount) throws IOException { + private void readAllDeleteEventsFromDeleteDeltas() + throws IOException, DeleteEventsOverflowMemoryException { if (sortMerger == null || sortMerger.isEmpty()) { - rowIds = new long[0]; return; // trivial case, nothing to read. } // Initialize the rowId array when we have some delete events. - rowIds = new long[totalDeleteEventCount]; + rowIds = new long[testMode ? 1 : 10000]; int index = 0; // We compress the owids into CompressedOwid data structure that records @@ -1140,6 +1477,7 @@ private void readAllDeleteEventsFromDeleteDeltas( DeleteReaderValue deleteReaderValue = entry.getValue(); long owid = deleteRecordKey.originalWriteId; int bp = deleteRecordKey.bucketProperty; + checkSize(index); rowIds[index] = deleteRecordKey.rowId; if (lastCo == null || lastCo.originalWriteId != owid || lastCo.bucketProperty != bp) { if (lastCo != null) { @@ -1198,6 +1536,12 @@ private boolean isDeleted(long owid, int bucketProperty, long rowId) { } return false; } + /** + * @return how many delete events are actually loaded + */ + int size() { + return rowIds == null ? 0 : rowIds.length; + } @Override public boolean isEmpty() { if(isEmpty == null) { @@ -1206,8 +1550,7 @@ public boolean isEmpty() { return isEmpty; } @Override - public void findDeletedRecords(ColumnVector[] cols, int size, BitSet selectedBitSet) - throws IOException { + public void findDeletedRecords(ColumnVector[] cols, int size, BitSet selectedBitSet) { if (rowIds == null || compressedOwids == null) { return; } @@ -1258,4 +1601,12 @@ public void close() throws IOException { static class DeleteEventsOverflowMemoryException extends Exception { private static final long serialVersionUID = 1L; } + @VisibleForTesting + OrcRawRecordMerger.KeyInterval getKeyInterval() { + return keyInterval; + } + @VisibleForTesting + SearchArgument getDeleteEventSarg() { + return deleteEventSarg; + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index 9a4322d74b..0cda871262 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -95,6 +95,15 @@ protected String getTestDataDir() { return TEST_DATA_DIR; } + @Override + void initHiveConf() { + super.initHiveConf(); + //TestTxnCommandsWithSplitUpdateAndVectorization has the vectorized version + //of these tests. + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false); + } + + /** * tests that a failing Insert Overwrite (which creates a new base_x) is properly marked as * aborted. diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index c5589b953d..057fd7704c 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -144,6 +144,9 @@ void setUpWithTableProperties(String tableProperties) throws Exception { "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true); hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false); + //TestTxnCommands2WithSplitUpdateAndVectorization has the vectorized version + //of these tests. + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false); TxnDbUtil.setConfValues(hiveConf); TxnDbUtil.prepDb(hiveConf); @@ -716,7 +719,7 @@ public void testNonAcidToAcidConversion3() throws Exception { int resultCount = 1; Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); - // 2. Convert NONACIDORCTBL to ACID table with split_update enabled. (txn_props=default) + // 2. Convert NONACIDORCTBL to ACID table runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')"); status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java new file mode 100644 index 0000000000..9c3cb20ff2 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java @@ -0,0 +1,162 @@ +package org.apache.hadoop.hive.ql; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.ql.io.orc.TestVectorizedOrcAcidRowBatchReader; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.List; + +public class TestTxnCommands3 extends TxnCommandsBaseForTests { + static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommands3.class); + private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + + File.separator + TestTxnCommands3.class.getCanonicalName() + + "-" + System.currentTimeMillis() + ).getPath().replaceAll("\\\\", "/"); + @Override + protected String getTestDataDir() { + return TEST_DATA_DIR; + } + + @Test + public void testRenameTable() throws Exception { + MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID, true); + runStatementOnDriver("drop database if exists mydb1 cascade"); + runStatementOnDriver("drop database if exists mydb2 cascade"); + runStatementOnDriver("create database mydb1"); + runStatementOnDriver("create database mydb2"); + runStatementOnDriver("create table mydb1.T(a int, b int) stored as orc"); + runStatementOnDriver("insert into mydb1.T values(1,2),(4,5)"); + //put something in WRITE_SET + runStatementOnDriver("update mydb1.T set b = 6 where b = 5"); + runStatementOnDriver("alter table mydb1.T compact 'minor'"); + + runStatementOnDriver("alter table mydb1.T RENAME TO mydb1.S"); + + String testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from mydb1.S"; + String[][] expected = new String[][] { + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", + "s/delta_0000001_0000001_0000/bucket_00000"}, + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t4\t6", + "s/delta_0000002_0000002_0000/bucket_00000"}}; + checkResult(expected, testQuery, false, "check data", LOG); + + + Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='t'")); + Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from COMPACTION_QUEUE where CQ_TABLE='t'")); + Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from WRITE_SET where WS_TABLE='t'")); + Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='t'")); + Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from NEXT_WRITE_ID where NWI_TABLE='t'")); + + Assert.assertEquals( + TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"), 2, + TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='s'")); + Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from COMPACTION_QUEUE where CQ_TABLE='s'")); + Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from WRITE_SET where WS_TABLE='s'")); + Assert.assertEquals(3, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='s'")); + Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from NEXT_WRITE_ID where NWI_TABLE='s'")); + + runStatementOnDriver("alter table mydb1.S RENAME TO mydb2.bar"); + + Assert.assertEquals( + TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"), 2, + TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='bar'")); + Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from COMPACTION_QUEUE where CQ_TABLE='bar'")); + Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from WRITE_SET where WS_TABLE='bar'")); + Assert.assertEquals(4, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='bar'")); + Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from NEXT_WRITE_ID where NWI_TABLE='bar'")); + } + + @Test + public void testDeleteEventPruningOn() throws Exception { + HiveConf.setBoolVar(hiveConf, + HiveConf.ConfVars.FILTER_DELETE_EVENTS, true); + testDeleteEventPruning(); + } + @Test + public void testDeleteEventPruningOff() throws Exception { + HiveConf.setBoolVar(hiveConf, + HiveConf.ConfVars.FILTER_DELETE_EVENTS, false); + testDeleteEventPruning(); + } + /** + * run with and w/o event fitlering enabled - should get the same results + * {@link TestVectorizedOrcAcidRowBatchReader#testDeleteEventFiltering()} + * + * todo: add .q test using VerifyNumReducersHook.num.reducers to make sure + * it does have 1 split for each input file. + * Will need to crate VerifyNumMappersHook + * + * Also, consider + * HiveSplitGenerator.java + * RAW_INPUT_SPLITS and GROUPED_INPUT_SPLITS are the counters before and + * after grouping splits PostExecTezSummaryPrinter post exec hook can be + * used to printout specific counters + */ + private void testDeleteEventPruning() throws Exception { + HiveConf.setBoolVar(hiveConf, + HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); + dropTable(new String[] {"T"}); + runStatementOnDriver( + "create transactional table T(a int, b int) stored as orc"); + runStatementOnDriver("insert into T values(1,2),(4,5)"); + runStatementOnDriver("insert into T values(4,6),(1,3)"); + runStatementOnDriver("delete from T where a = 1"); + List rs = runStatementOnDriver( + "select ROW__ID, a, b from T order by a, b"); + + boolean isVectorized = + hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED); + String testQuery = isVectorized ? + "select ROW__ID, a, b from T order by a, b" : + "select ROW__ID, a, b, INPUT__FILE__NAME from T order by a, b"; + String[][] expected = new String[][]{ + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t4\t5", + "warehouse/t/delta_0000001_0000001_0000/bucket_00000"}, + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t4\t6", + "warehouse/t/delta_0000002_0000002_0000/bucket_00000"}}; + checkResult(expected, testQuery, isVectorized, "after delete", LOG); + + runStatementOnDriver("alter table T compact 'MAJOR'"); + runWorker(hiveConf); + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); + ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals("Unexpected number of compactions in history", + 1, resp.getCompactsSize()); + Assert.assertEquals("Unexpected 0 compaction state", + TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState()); + Assert.assertTrue(resp.getCompacts().get(0).getHadoopJobId() + .startsWith("job_local")); + + String[][] expected2 = new String[][]{ + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t4\t5", + "warehouse/t/base_0000001/bucket_00000"}, + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t4\t6", + "warehouse/t/base_0000002/bucket_00000"}}; + checkResult(expected2, testQuery, isVectorized, "after compaction", LOG); + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java index a5bd1cbd67..e882c94cfe 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java @@ -176,67 +176,4 @@ public void testConcatenateMM() throws Exception { "t/base_0000002/000000_0"}}; checkResult(expected2, testQuery, false, "check data after concatenate", LOG); } - @Test - public void testRenameTable() throws Exception { - MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID, true); - runStatementOnDriver("drop database if exists mydb1 cascade"); - runStatementOnDriver("drop database if exists mydb2 cascade"); - runStatementOnDriver("create database mydb1"); - runStatementOnDriver("create database mydb2"); - runStatementOnDriver("create table mydb1.T(a int, b int) stored as orc"); - runStatementOnDriver("insert into mydb1.T values(1,2),(4,5)"); - //put something in WRITE_SET - runStatementOnDriver("update mydb1.T set b = 6 where b = 5"); - runStatementOnDriver("alter table mydb1.T compact 'minor'"); - - runStatementOnDriver("alter table mydb1.T RENAME TO mydb1.S"); - - String testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from mydb1.S"; - String[][] expected = new String[][] { - {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", - "s/delta_0000001_0000001_0000/bucket_00000"}, - {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t4\t6", - "s/delta_0000002_0000002_0000/bucket_00000"}}; - checkResult(expected, testQuery, false, "check data", LOG); - - - Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='t'")); - Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from COMPACTION_QUEUE where CQ_TABLE='t'")); - Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from WRITE_SET where WS_TABLE='t'")); - Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='t'")); - Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from NEXT_WRITE_ID where NWI_TABLE='t'")); - - Assert.assertEquals( - TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"), 2, - TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='s'")); - Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from COMPACTION_QUEUE where CQ_TABLE='s'")); - Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from WRITE_SET where WS_TABLE='s'")); - Assert.assertEquals(3, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='s'")); - Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from NEXT_WRITE_ID where NWI_TABLE='s'")); - - runStatementOnDriver("alter table mydb1.S RENAME TO mydb2.bar"); - - Assert.assertEquals( - TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"), 2, - TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='bar'")); - Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from COMPACTION_QUEUE where CQ_TABLE='bar'")); - Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from WRITE_SET where WS_TABLE='bar'")); - Assert.assertEquals(4, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='bar'")); - Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, - "select count(*) from NEXT_WRITE_ID where NWI_TABLE='bar'")); - } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java index 230e93e814..f7d39e5be3 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.junit.Assert; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -46,6 +47,11 @@ @Rule public TemporaryFolder folder= new TemporaryFolder(); + @Before + public void setUp() throws Exception { + setUpInternal(); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false); + } @Override protected String getTestDataDir() { return TEST_DATA_DIR; diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java index 948243609f..c74a707e4b 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.junit.Assert; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; @@ -58,6 +59,13 @@ protected String getTestDataDir() { return TEST_DATA_DIR; } + @Before + public void setUp() throws Exception { + setUpInternal(); + //see TestTxnNoBucketsVectorized for vectorized version + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false); + } + private boolean shouldVectorize() { return hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED); } diff --git ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java index 05ce3e214d..dab0d982c9 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java +++ ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java @@ -249,4 +249,9 @@ protected void checkResult(String[][] expectedResult, String query, boolean isVe checkExpected(rs, expectedResult, msg + (isVectorized ? " vect" : ""), LOG, !isVectorized); assertVectorized(isVectorized, query); } + void dropTable(String[] tabs) throws Exception { + for(String tab : tabs) { + d.run("drop table if exists " + tab); + } + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index 208aeb5b1f..a8ee744e99 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -794,8 +794,8 @@ public void testFSCallsVectorizedOrcAcidRowBatchReader() throws IOException { int readsAfter = fs.statistics.getReadOps(); System.out.println("STATS TRACE END - " + testCaseName.getMethodName()); int delta = readsAfter - readsBefore; - // 16 without HIVE-19588, 8 with HIVE-19588 - assertEquals(8, delta); + //HIVE-16812 adds 1 read of the footer of each file + assertEquals(16, delta); } finally { MockFileSystem.clearGlobalFiles(); } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java index 551e5ca0a6..8f477f4700 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java @@ -17,10 +17,6 @@ */ package org.apache.hadoop.hive.ql.io.orc; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - import java.io.File; import java.util.List; @@ -40,6 +36,8 @@ import org.apache.hadoop.hive.ql.io.RecordUpdater; import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader.ColumnizedDeleteEventRegistry; import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader.SortMergedDeleteEventRegistry; + +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.io.LongWritable; @@ -49,6 +47,8 @@ import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.*; + /** * This class tests the VectorizedOrcAcidRowBatchReader by creating an actual split and a set * of delete delta files. The split is on an insert delta and there are multiple delete deltas @@ -62,6 +62,7 @@ private JobConf conf; private FileSystem fs; private Path root; + private ObjectInspector inspector; public static class DummyRow { LongWritable field; @@ -90,7 +91,6 @@ static String getColumnTypesProperty() { @Before public void setup() throws Exception { conf = new JobConf(); - conf.set("bucket_count", "1"); conf.set(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true"); conf.setBoolean(HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN.varname, true); conf.set(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, "default"); @@ -107,11 +107,273 @@ public void setup() throws Exception { fs = root.getFileSystem(conf); root = fs.makeQualified(root); fs.delete(root, true); - ObjectInspector inspector; synchronized (TestOrcFile.class) { inspector = ObjectInspectorFactory.getReflectionObjectInspector (DummyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); } + } + @Test + public void testDeleteEventFilteringOff() throws Exception { + HiveConf.setBoolVar(conf, HiveConf.ConfVars.FILTER_DELETE_EVENTS, false); + testDeleteEventFiltering(); + } + @Test + public void testDeleteEventFilteringOn() throws Exception { + HiveConf.setBoolVar(conf, HiveConf.ConfVars.FILTER_DELETE_EVENTS, true); + testDeleteEventFiltering(); + } + + /** + * Tests that we can figure out min/max ROW__ID for each split and then use + * that to only load delete events between min/max. + * This test doesn't actually check what is read - that is done more E2E + * unit tests. + * @throws Exception + */ + private void testDeleteEventFiltering() throws Exception { + boolean filterOn = + HiveConf.getBoolVar(conf, HiveConf.ConfVars.FILTER_DELETE_EVENTS); + int bucket = 0; + AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) + .filesystem(fs) + .bucket(bucket) + .writingBase(false) + .minimumWriteId(1) + .maximumWriteId(1) + .inspector(inspector) + .reporter(Reporter.NULL) + .recordIdColumn(1) + .finalDestination(root); + + int bucketProperty = BucketCodec.V1.encode(options); + + //create 3 insert deltas so that we have 3 splits + RecordUpdater updater = new OrcRecordUpdater(root, options); + updater.insert(options.getMinimumWriteId(), + new DummyRow(1, 0, options.getMinimumWriteId(), bucket)); + updater.insert(options.getMinimumWriteId(), + new DummyRow(2, 1, options.getMinimumWriteId(), bucket)); + updater.insert(options.getMinimumWriteId(), + new DummyRow(3, 2, options.getMinimumWriteId(), bucket)); + updater.close(false); + + options.minimumWriteId(2) + .maximumWriteId(2); + updater = new OrcRecordUpdater(root, options); + updater.insert(options.getMinimumWriteId(), + new DummyRow(4, 0, options.getMinimumWriteId(), bucket)); + updater.insert(options.getMinimumWriteId(), + new DummyRow(5, 1, options.getMinimumWriteId(), bucket)); + updater.insert(options.getMinimumWriteId(), + new DummyRow(6, 2, options.getMinimumWriteId(), bucket)); + updater.close(false); + + options.minimumWriteId(3) + .maximumWriteId(3); + updater = new OrcRecordUpdater(root, options); + updater.insert(options.getMinimumWriteId(), + new DummyRow(7, 0, options.getMinimumWriteId(), bucket)); + updater.insert(options.getMinimumWriteId(), + new DummyRow(8, 1, options.getMinimumWriteId(), bucket)); + updater.insert(options.getMinimumWriteId(), + new DummyRow(9, 2, options.getMinimumWriteId(), bucket)); + updater.close(false); + + //delete 1 row from each of the insert deltas + options.minimumWriteId(4) + .maximumWriteId(4); + updater = new OrcRecordUpdater(root, options); + updater.delete(options.getMinimumWriteId(), + new DummyRow(-1, 0, 1, bucket)); + updater.delete(options.getMinimumWriteId(), + new DummyRow(-1, 1, 2, bucket)); + updater.delete(options.getMinimumWriteId(), + new DummyRow(-1, 2, 3, bucket)); + updater.close(false); + + //HWM is not important - just make sure deltas created above are read as + // if committed + conf.set(ValidWriteIdList.VALID_WRITEIDS_KEY, + "tbl:5:" + Long.MAX_VALUE + "::"); + + //now we have 3 delete events total, but for each split we should only + // load 1 into DeleteRegistry (if filtering is on) + List> splitStrategies = getSplitStrategies(); + assertEquals(1, splitStrategies.size()); + List splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits(); + + assertEquals(3, splits.size()); + assertEquals(root.toUri().toString() + File.separator + + "delta_0000001_0000001_0000/bucket_00000", + splits.get(0).getPath().toUri().toString()); + assertFalse(splits.get(0).isOriginal()); + + assertEquals(root.toUri().toString() + File.separator + + "delta_0000002_0000002_0000/bucket_00000", + splits.get(1).getPath().toUri().toString()); + assertFalse(splits.get(1).isOriginal()); + + assertEquals(root.toUri().toString() + File.separator + + "delta_0000003_0000003_0000/bucket_00000", + splits.get(2).getPath().toUri().toString()); + assertFalse(splits.get(2).isOriginal()); + + VectorizedOrcAcidRowBatchReader vectorizedReader = + new VectorizedOrcAcidRowBatchReader(splits.get(0), conf, Reporter.NULL, + new VectorizedRowBatchCtx()); + ColumnizedDeleteEventRegistry deleteEventRegistry = + (ColumnizedDeleteEventRegistry) vectorizedReader + .getDeleteEventRegistry(); + assertEquals("number of delete events for stripe 1", filterOn ? 1 : 3, + deleteEventRegistry.size()); + OrcRawRecordMerger.KeyInterval keyInterval = + vectorizedReader.getKeyInterval(); + if(filterOn) { + assertEquals(new OrcRawRecordMerger.KeyInterval( + new RecordIdentifier(1, bucketProperty, 0), + new RecordIdentifier(1, bucketProperty, 2)), + keyInterval); + } + else { + assertEquals(new OrcRawRecordMerger.KeyInterval(null, null), keyInterval); + } + + vectorizedReader = new VectorizedOrcAcidRowBatchReader(splits.get(1), conf, + Reporter.NULL, new VectorizedRowBatchCtx()); + deleteEventRegistry = (ColumnizedDeleteEventRegistry) vectorizedReader + .getDeleteEventRegistry(); + assertEquals("number of delete events for stripe 2", filterOn ? 1 : 3, + deleteEventRegistry.size()); + keyInterval = vectorizedReader.getKeyInterval(); + if(filterOn) { + assertEquals(new OrcRawRecordMerger.KeyInterval( + new RecordIdentifier(2, bucketProperty, 0), + new RecordIdentifier(2, bucketProperty, 2)), + keyInterval); + } + else { + assertEquals(new OrcRawRecordMerger.KeyInterval(null, null), keyInterval); + } + + vectorizedReader = new VectorizedOrcAcidRowBatchReader(splits.get(2), conf, + Reporter.NULL, new VectorizedRowBatchCtx()); + deleteEventRegistry = (ColumnizedDeleteEventRegistry) vectorizedReader + .getDeleteEventRegistry(); + assertEquals("number of delete events for stripe 3", filterOn ? 1 : 3, + deleteEventRegistry.size()); + keyInterval = vectorizedReader.getKeyInterval(); + if(filterOn) { + assertEquals(new OrcRawRecordMerger.KeyInterval( + new RecordIdentifier(3, bucketProperty, 0), + new RecordIdentifier(3, bucketProperty, 2)), keyInterval); + } + else { + assertEquals(new OrcRawRecordMerger.KeyInterval(null, null), keyInterval); + } + } + + @Test + public void testDeleteEventFilteringOff2() throws Exception { + HiveConf.setBoolVar(conf, HiveConf.ConfVars.FILTER_DELETE_EVENTS, false); + testDeleteEventFiltering2(); + } + @Test + public void testDeleteEventFilteringOn2() throws Exception { + HiveConf.setBoolVar(conf, HiveConf.ConfVars.FILTER_DELETE_EVENTS, true); + testDeleteEventFiltering2(); + } + + private void testDeleteEventFiltering2() throws Exception { + boolean filterOn = + HiveConf.getBoolVar(conf, HiveConf.ConfVars.FILTER_DELETE_EVENTS); + int bucket = 1; + AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) + .filesystem(fs) + .bucket(bucket) + .writingBase(true) + .minimumWriteId(10000002) + .maximumWriteId(10000002) + .inspector(inspector) + .reporter(Reporter.NULL) + .recordIdColumn(1) + .finalDestination(root); + + int bucketProperty = BucketCodec.V1.encode(options); + + //create data that looks like a compacted base that includes some data + //from 'original' files and some from native Acid write + RecordUpdater updater = new OrcRecordUpdater(root, options); + updater.insert(0, new DummyRow(1, 0, 0, bucket)); + updater.insert(0, new DummyRow(1, 1, 0, bucket)); + updater.insert(0, new DummyRow(2, 2, 0, bucket)); + updater.insert(10000001, new DummyRow(3, 0, 10000001, bucket)); + updater.close(false); + + //delete 3rd row + options.writingBase(false).minimumWriteId(10000004) + .maximumWriteId(10000004); + updater = new OrcRecordUpdater(root, options); + updater.delete(options.getMinimumWriteId(), + new DummyRow(-1, 0, 0, bucket)); + //hypothetically this matches something in (nonexistent here) + // delta_10000003_10000003 + updater.delete(options.getMinimumWriteId(), + new DummyRow(-1, 5, 10000003, bucket)); + updater.close(false); + + //HWM is not important - just make sure deltas created above are read as + // if committed + conf.set(ValidWriteIdList.VALID_WRITEIDS_KEY, + "tbl:10000005:" + Long.MAX_VALUE + "::"); + + List> splitStrategies = getSplitStrategies(); + assertEquals(1, splitStrategies.size()); + List splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits(); + + assertEquals(1, splits.size()); + assertEquals(root.toUri().toString() + File.separator + + "base_10000002/bucket_00001", + splits.get(0).getPath().toUri().toString()); + assertFalse(splits.get(0).isOriginal()); + + + VectorizedOrcAcidRowBatchReader vectorizedReader = + new VectorizedOrcAcidRowBatchReader(splits.get(0), conf, Reporter.NULL, + new VectorizedRowBatchCtx()); + ColumnizedDeleteEventRegistry deleteEventRegistry = + (ColumnizedDeleteEventRegistry) vectorizedReader + .getDeleteEventRegistry(); + assertEquals("number of delete events for stripe 1", filterOn ? 1 : 2, + deleteEventRegistry.size()); + OrcRawRecordMerger.KeyInterval keyInterval = + vectorizedReader.getKeyInterval(); + SearchArgument sarg = vectorizedReader.getDeleteEventSarg(); + if(filterOn) { + assertEquals(new OrcRawRecordMerger.KeyInterval( + new RecordIdentifier(0, bucketProperty, 0), + new RecordIdentifier(10000001, bucketProperty, 0)), + keyInterval); + //key point is that in leaf-5 is (rowId <= 2) even though maxKey has + //rowId 0. more in VectorizedOrcAcidRowBatchReader.findMinMaxKeys + assertEquals( "leaf-0 = (LESS_THAN originalTransaction 0)," + + " leaf-1 = (LESS_THAN bucket 536936448)," + + " leaf-2 = (LESS_THAN rowId 0)," + + " leaf-3 = (LESS_THAN_EQUALS originalTransaction 10000001)," + + " leaf-4 = (LESS_THAN_EQUALS bucket 536936448)," + + " leaf-5 = (LESS_THAN_EQUALS rowId 2)," + + " expr = (and (not leaf-0) (not leaf-1) " + + "(not leaf-2) leaf-3 leaf-4 leaf-5)", sarg.toString()); + } + else { + assertEquals(new OrcRawRecordMerger.KeyInterval(null, null), keyInterval); + assertNull(sarg); + } + + } + @Test + public void testVectorizedOrcAcidRowBatchReader() throws Exception { + conf.set("bucket_count", "1"); + int bucket = 0; AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) .filesystem(fs) @@ -177,28 +439,7 @@ public void setup() throws Exception { } } updater.close(false); - } - private List getSplits() throws Exception { - conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, - AcidUtils.AcidOperationalProperties.getDefault().toInt()); - OrcInputFormat.Context context = new OrcInputFormat.Context(conf); - OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(context, fs, root, false, null); - OrcInputFormat.AcidDirInfo adi = gen.call(); - List> splitStrategies = OrcInputFormat.determineSplitStrategies( - null, context, adi.fs, adi.splitPath, adi.baseFiles, adi.deleteEvents, - null, null, true); - assertEquals(1, splitStrategies.size()); - List splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits(); - assertEquals(1, splits.size()); - assertEquals(root.toUri().toString() + File.separator + "delta_0000001_0000010_0000/bucket_00000", - splits.get(0).getPath().toUri().toString()); - assertFalse(splits.get(0).isOriginal()); - return splits; - } - - @Test - public void testVectorizedOrcAcidRowBatchReader() throws Exception { testVectorizedOrcAcidRowBatchReader(ColumnizedDeleteEventRegistry.class.getName()); // To test the SortMergedDeleteEventRegistry, we need to explicitly set the @@ -213,7 +454,16 @@ public void testVectorizedOrcAcidRowBatchReader() throws Exception { private void testVectorizedOrcAcidRowBatchReader(String deleteEventRegistry) throws Exception { - List splits = getSplits(); + List> splitStrategies = getSplitStrategies(); + assertEquals(1, splitStrategies.size()); + List splits = ((OrcInputFormat.ACIDSplitStrategy) + splitStrategies.get(0)).getSplits(); + assertEquals(1, splits.size()); + assertEquals(root.toUri().toString() + File.separator + + "delta_0000001_0000010_0000/bucket_00000", + splits.get(0).getPath().toUri().toString()); + assertFalse(splits.get(0).isOriginal()); + // Mark one of the transactions as an exception to test that invalid transactions // are being handled properly. conf.set(ValidWriteIdList.VALID_WRITEIDS_KEY, "tbl:14:1:1:5"); // Exclude transaction 5 @@ -244,4 +494,16 @@ private void testVectorizedOrcAcidRowBatchReader(String deleteEventRegistry) thr } } } + private List> getSplitStrategies() throws Exception { + conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, + AcidUtils.AcidOperationalProperties.getDefault().toInt()); + OrcInputFormat.Context context = new OrcInputFormat.Context(conf); + OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator( + context, fs, root, false, null); + OrcInputFormat.AcidDirInfo adi = gen.call(); + return OrcInputFormat.determineSplitStrategies( + null, context, adi.fs, adi.splitPath, adi.baseFiles, adi.deleteEvents, + null, null, true); + + } }