diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8a561e5771..12700fbdb7 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1854,7 +1854,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal + "into memory to optimize for performance. To prevent out-of-memory errors, this is a rough heuristic\n" + "that limits the total number of delete events that can be loaded into memory at once.\n" + "Roughly it has been set to 10 million delete events per bucket (~160 MB).\n"), - + FILTER_DELETE_EVENTS("hive.txn.filter.delete.events", true, + "If true, VectorizedOrcAcidRowBatchReader will compute min/max " + + "ROW__ID for the split and only load delete events in that range.\n" + ), HIVESAMPLERANDOMNUM("hive.sample.seednumber", 0, "A number used to percentage sampling. By changing this number, user will change the subsets of data sampled."), diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 9593975c6c..88e92ff11f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -983,7 +983,7 @@ public void process(Object row, int tag) throws HiveException { int writerOffset; // This if/else chain looks ugly in the inner loop, but given that it will be 100% the same // for a given operator branch prediction should work quite nicely on it. - // RecordUpdateer expects to get the actual row, not a serialized version of it. Thus we + // RecordUpdater expects to get the actual row, not a serialized version of it. Thus we // pass the row rather than recordValue. if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || conf.isMmTable()) { rowOutWriters[findWriterOffset(row)].write(recordValue); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java index ea7ba53a3a..e38e21a3d0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java @@ -176,6 +176,7 @@ protected int compareToInternal(RecordIdentifier other) { @Override public int compareTo(RecordIdentifier other) { if (other.getClass() != RecordIdentifier.class) { + //WTF? assumes that other instanceof OrcRawRecordMerger.ReaderKey??? return -other.compareTo(this); } return compareToInternal(other); @@ -219,17 +220,15 @@ public int hashCode() { @Override public String toString() { - BucketCodec codec = - BucketCodec.determineVersion(bucketId); - String s = "(" + codec.getVersion() + "." + codec.decodeWriterId(bucketId) + - "." + codec.decodeStatementId(bucketId) + ")"; - return "{originalWriteId: " + writeId + ", " + bucketToString() + ", row: " + getRowId() +"}"; + return "RecordIdentifier(" + writeId + ", " + bucketToString(bucketId) + "," + + getRowId() +")"; } - protected String bucketToString() { - if (bucketId == -1) return ("bucket: " + bucketId); + public static String bucketToString(int bucketId) { + if (bucketId == -1) return "" + bucketId; BucketCodec codec = BucketCodec.determineVersion(bucketId); - return "bucket: " + bucketId + "(" + codec.getVersion() + "." + - codec.decodeWriterId(bucketId) + "." + codec.decodeStatementId(bucketId) + ")"; + return bucketId + "(" + codec.getVersion() + "." + + codec.decodeWriterId(bucketId) + "." + + codec.decodeStatementId(bucketId) + ")"; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 929ea9b1ed..dd008915a4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -168,7 +168,8 @@ int compareRow(RecordIdentifier other) { @Override public String toString() { return "{originalWriteId: " + getWriteId() + ", " + - bucketToString() + ", row: " + getRowId() + ", currentWriteId " + currentWriteId + "}"; + bucketToString(getBucketProperty()) + ", row: " + getRowId() + + ", currentWriteId " + currentWriteId + "}"; } } interface ReaderPair { @@ -665,19 +666,22 @@ private Reader advanceToNextFile() throws IOException { // The key of the next lowest reader. private ReaderKey secondaryKey = null; - private static final class KeyInterval { + static final class KeyInterval { private final RecordIdentifier minKey; private final RecordIdentifier maxKey; - private KeyInterval(RecordIdentifier minKey, RecordIdentifier maxKey) { + KeyInterval(RecordIdentifier minKey, RecordIdentifier maxKey) { this.minKey = minKey; this.maxKey = maxKey; } - private RecordIdentifier getMinKey() { + RecordIdentifier getMinKey() { return minKey; } - private RecordIdentifier getMaxKey() { + RecordIdentifier getMaxKey() { return maxKey; } + public String toString() { + return "KeyInterval[" + minKey + "," + maxKey + "]"; + } } /** * Find the key range for original bucket files. diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 1841cfaa2e..20c2a572fb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -44,11 +44,19 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; +import org.apache.orc.ColumnStatistics; +import org.apache.orc.IntegerColumnStatistics; +import org.apache.orc.OrcConf; +import org.apache.orc.StripeInformation; +import org.apache.orc.StripeStatistics; import org.apache.orc.impl.AcidStats; import org.apache.orc.impl.OrcAcidUtils; import org.slf4j.Logger; @@ -110,7 +118,7 @@ this(conf, inputSplit, reporter, rbCtx == null ? Utilities.getVectorizedRowBatchCtx(conf) : rbCtx, false); - final Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, (OrcSplit) inputSplit); + final Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, inputSplit); // Careful with the range here now, we do not want to read the whole base file like deltas. innerReader = reader.rowsOptions(readerOptions.range(offset, length), conf); baseReader = new org.apache.hadoop.mapred.RecordReader() { @@ -197,24 +205,27 @@ private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit orcSplit, Reporte String txnString = conf.get(ValidWriteIdList.VALID_WRITEIDS_KEY); this.validWriteIdList = (txnString == null) ? new ValidReaderWriteIdList() : new ValidReaderWriteIdList(txnString); - LOG.debug("VectorizedOrcAcidRowBatchReader:: Read ValidWriteIdList: " + this.validWriteIdList.toString() - + " isFullAcidTable: " + AcidUtils.isFullAcidScan(conf)); + LOG.info("Read ValidWriteIdList: " + this.validWriteIdList.toString() + + ":" + orcSplit); + + OrcRawRecordMerger.KeyInterval keyInterval = findMinMaxKeys(orcSplit, conf); // Clone readerOptions for deleteEvents. Reader.Options deleteEventReaderOptions = readerOptions.clone(); // Set the range on the deleteEventReaderOptions to 0 to INTEGER_MAX because // we always want to read all the delete delta files. deleteEventReaderOptions.range(0, Long.MAX_VALUE); - // Disable SARGs for deleteEventReaders, as SARGs have no meaning. - deleteEventReaderOptions.searchArgument(null, null); + setSARG(keyInterval, orcSplit, deleteEventReaderOptions); DeleteEventRegistry der; try { // See if we can load all the delete events from all the delete deltas in memory... - der = new ColumnizedDeleteEventRegistry(conf, orcSplit, deleteEventReaderOptions); + der = new ColumnizedDeleteEventRegistry(conf, orcSplit, + deleteEventReaderOptions, keyInterval); } catch (DeleteEventsOverflowMemoryException e) { // If not, then create a set of hanging readers that do sort-merge to find the next smallest // delete event on-demand. Caps the memory consumption to (some_const * no. of readers). - der = new SortMergedDeleteEventRegistry(conf, orcSplit, deleteEventReaderOptions); + der = new SortMergedDeleteEventRegistry(conf, orcSplit, + deleteEventReaderOptions, keyInterval); } this.deleteEventRegistry = der; isOriginal = orcSplit.isOriginal(); @@ -232,6 +243,47 @@ private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit orcSplit, Reporte syntheticProps = computeOffsetAndBucket(orcSplit, conf, validWriteIdList); } + /** + * Generates a SearchArgument to push down to delete_delta files. + */ + private static void setSARG(OrcRawRecordMerger.KeyInterval keyInterval, + OrcSplit orcSplit, Reader.Options deleteEventReaderOptions) { + SearchArgument.Builder b = null; + if(keyInterval.getMinKey() != null) { + RecordIdentifier k = keyInterval.getMinKey(); + b = SearchArgumentFactory.newBuilder(); + b.startAnd() //not(ot < 7) -> ot >=7 + .startNot().lessThan("originalTransaction", + PredicateLeaf.Type.LONG, k.getWriteId()).end() + .startNot().lessThan("bucket", + PredicateLeaf.Type.LONG, (long) k.getBucketProperty()).end() + .startNot().lessThan("rowId", + PredicateLeaf.Type.LONG, k.getRowId()).end() + .end(); + } + if(keyInterval.getMaxKey() != null) { + RecordIdentifier k = keyInterval.getMaxKey(); + if(b == null) { + b = SearchArgumentFactory.newBuilder(); + } + b.startAnd() + .lessThanEquals("originalTransaction", + PredicateLeaf.Type.LONG, k.getWriteId()) + .lessThanEquals("bucket", + PredicateLeaf.Type.LONG, (long) k.getBucketProperty()) + .lessThanEquals("rowId", + PredicateLeaf.Type.LONG, k.getRowId()) + .end(); + } + if(b != null) { + SearchArgument sarg = b.build(); + LOG.debug("deleteReader SARG(" + sarg + ") "); + deleteEventReaderOptions.searchArgument(sarg, + new String[] {"originalTransaction", "bucket", "rowId"}); + return; + } + deleteEventReaderOptions.searchArgument(null, null); + } public void setBaseAndInnerReader( final org.apache.hadoop.mapred.RecordReader baseReader) { this.baseReader = baseReader; @@ -239,6 +291,150 @@ public void setBaseAndInnerReader( this.vectorizedRowBatchBase = baseReader.createValue(); } + /** + * A given ORC reader will always process one or more whole stripes but the + * split boundaries may not line up with stripe boundaries if the InputFormat + * doesn't understand ORC specifics. So first we need to figure out which + * stripe(s) we are reading. + * + * Suppose txn1 writes 100K rows + * and txn2 writes 100 rows so we have events + * {1,0,0}....{1,0,100K},{2,0,0}...{2,0,100} in 2 files + * After compaction we may have 2 stripes + * {1,0,0}...{1,0,90K},{1,0,90001}...{2,0,100} + * + * Now suppose there is a delete stmt that deletes every row. So when we load + * the 2nd stripe, if we just look at stripe ColumnStatistics, + * minKey={1,0,100}, maxKey={2,0,90001}, all but the 1st 100 delete events + * will get loaded. But with hive.acid.index, minKey={1,0,90001} and + * maxKey={2,0,100} so we only load about 10K deletes. + * + * Also, even with Query Based compactor (once we have it), FileSinkOperator + * uses OrcRecordWriter to write to file, so we should have the + * hive.acid.index in place. + * + * If reading the 1st stripe, we don't have the start event, so we'll get it + * from stats, which will strictly speaking be accurate only wrt writeId and + * bucket but that is good enough. + * + * todo: how to test this? + * + * @param orcSplit + * @return empty KeyInterval if KeyInterval could not be determined + * @throws IOException + */ + private static OrcRawRecordMerger.KeyInterval findMinMaxKeys( + OrcSplit orcSplit, Configuration conf) throws IOException { + if(!HiveConf.getBoolVar(conf, ConfVars.FILTER_DELETE_EVENTS)) { + LOG.debug("findMinMaxKeys() " + ConfVars.FILTER_DELETE_EVENTS + "=false"); + return new OrcRawRecordMerger.KeyInterval(null, null); + } + if(orcSplit.isOriginal()) { + /** + * Among originals we may have files with _copy_N suffix. To properly + * generate a synthetic ROW___ID for them we need + * {@link OffsetAndBucketProperty} which could be an expensive computation + * if there are lots of copy_N files for a given bucketId. + * {@link OrcRawRecordMerger#discoverOriginalKeyBounds(Reader, int, + * Reader.Options, Configuration, OrcRawRecordMerger.Options)}*/ + LOG.debug("findMinMaxKeys(original split) - ignoring"); + return new OrcRawRecordMerger.KeyInterval(null, null); + } + //todo: since we already have OrcSplit.orcTail, should somehow use it to + // get the acid.index, stats, etc rather than fetching the footer again + // though it seems that orcTail is mostly null.... + Reader reader = OrcFile.createReader(orcSplit.getPath(), + OrcFile.readerOptions(conf)); + List stripes = reader.getStripes(); + final long splitStart = orcSplit.getStart(); + final long splitEnd = splitStart + orcSplit.getLength(); + int firstSripeIndex = -1; + int lastStripeIndex = -1; + for(int i = 0; i < stripes.size(); i++) { + StripeInformation stripe = stripes.get(i); + long stripeEnd = stripe.getOffset() + stripe.getLength(); + if(firstSripeIndex == -1 && stripe.getOffset() >= splitStart) { + firstSripeIndex = i; + } + if(lastStripeIndex == -1 && splitEnd <= stripeEnd && + stripes.get(firstSripeIndex).getOffset() <= stripe.getOffset() ) { + //the last condition is for when both splitStart and splitEnd are in + // the same stripe + lastStripeIndex = i; + } + } + if(lastStripeIndex == -1) { + //split goes to the EOF which is > end of stripe since file has a footer + assert stripes.get(stripes.size() - 1).getOffset() + + stripes.get(stripes.size() - 1).getLength() < splitEnd; + lastStripeIndex = stripes.size() - 1; + } + if(firstSripeIndex == -1 || lastStripeIndex == -1) { + LOG.warn("Could not find stripe (" + firstSripeIndex + "," + + lastStripeIndex + ")"); + return new OrcRawRecordMerger.KeyInterval(null, null); + } + RecordIdentifier[] keyIndex = OrcRecordUpdater.parseKeyIndex(reader); + if(keyIndex == null || keyIndex.length != stripes.size()) { + LOG.warn("Could not find keyIndex or length doesn't match (" + + firstSripeIndex + "," + lastStripeIndex + "," + stripes.size() + "," + + (keyIndex == null ? -1 : keyIndex.length) + ")"); + return new OrcRawRecordMerger.KeyInterval(null, null); + } + RecordIdentifier minKey = null; + if(firstSripeIndex > 0) { + //valid keys are strictly > than this key + minKey = keyIndex[firstSripeIndex - 1]; + //add 1 to make comparison >= to match the case of 0th stripe + minKey.setRowId(minKey.getRowId() + 1); + } + else { + List stats = reader.getStripeStatistics(); + assert stripes.size() == stats.size() : "str.s=" + stripes.size() + + " sta.s=" + stats.size(); + ColumnStatistics[] colStats = stats.get(firstSripeIndex).getColumnStatistics(); + /** + * If {@link OrcConf.ROW_INDEX_STRIDE} is set to 0 all column stats on + * ORC file are disabled though objects for them exist but and have + * min/max set to MIN_LONG/MAX_LONG so we only use column stats if they + * are actually computed. Streaming ingest used to set it 0 and Minor + * compaction so there are lots of legacy files with no (rather bad) + * column stats*/ + if(!(reader.getNumberOfRows() > 0 && + colStats[0].getNumberOfValues() < reader.getNumberOfRows())) { + /* + Structure in data is like this: + > + The +1 is to account for the top level struct which has a + ColumnStatistics object in colsStats. Top level struct is normally + dropped by the Reader (I guess because of orc.impl.SchemaEvolution) + */ + IntegerColumnStatistics origWriteId = (IntegerColumnStatistics) + colStats[OrcRecordUpdater.ORIGINAL_WRITEID + 1]; + IntegerColumnStatistics bucketProperty = (IntegerColumnStatistics) + colStats[OrcRecordUpdater.BUCKET + 1]; + IntegerColumnStatistics rowId = (IntegerColumnStatistics) + colStats[OrcRecordUpdater.ROW_ID + 1]; + //we may want to change bucketProperty from int to long in the + // future(across the stack) this protects the following cast to int + assert bucketProperty.getMinimum() <= Integer.MAX_VALUE : + "was bucketProper changed to a long (" + + bucketProperty.getMinimum() + ")?!:" + orcSplit; + //this a lower bound but not necessarily greatest lower bound + minKey = new RecordIdentifier(origWriteId.getMinimum(), + (int) bucketProperty.getMinimum(), rowId.getMinimum()); + } + else { + LOG.debug("findMinMaxKeys() No ORC column stats"); + } + } + + OrcRawRecordMerger.KeyInterval keyInterval = + new OrcRawRecordMerger.KeyInterval(minKey, keyIndex[lastStripeIndex]); + LOG.info("findMinMaxKeys(): " + keyInterval + + " stripes(" + firstSripeIndex + "," + lastStripeIndex + ")"); + return keyInterval; + } /** * Used for generating synthetic ROW__IDs for reading "original" files */ @@ -665,16 +861,18 @@ DeleteEventRegistry getDeleteEventRegistry() { private Boolean isDeleteRecordAvailable = null; private ValidWriteIdList validWriteIdList; - SortMergedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, Reader.Options readerOptions) - throws IOException { + SortMergedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, Reader.Options readerOptions, + OrcRawRecordMerger.KeyInterval keyInterval) throws IOException { + /*todo: need to push keyInterval down into OrcRawRecordMerger + * so that it filters events. OrcRawRecordMerger has its own KeyInterval + * logic that is a bit different from this..... oy*/ final Path[] deleteDeltas = getDeleteDeltaDirsFromSplit(orcSplit); if (deleteDeltas.length > 0) { int bucket = AcidUtils.parseBucketId(orcSplit.getPath()); String txnString = conf.get(ValidWriteIdList.VALID_WRITEIDS_KEY); this.validWriteIdList = (txnString == null) ? new ValidReaderWriteIdList() : new ValidReaderWriteIdList(txnString); - LOG.debug("SortMergedDeleteEventRegistry:: Read ValidWriteIdList: " + this.validWriteIdList.toString() - + " isFullAcidTable: " + AcidUtils.isFullAcidScan(conf)); + LOG.debug("Using SortMergedDeleteEventRegistry"); OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isDeleteReader(true); assert !orcSplit.isOriginal() : "If this now supports Original splits, set up mergeOptions properly"; this.deleteRecords = new OrcRawRecordMerger(conf, true, null, false, bucket, @@ -810,8 +1008,10 @@ public void close() throws IOException { static class ColumnizedDeleteEventRegistry implements DeleteEventRegistry { /** * A simple wrapper class to hold the (owid, bucketProperty, rowId) pair. + * todo: why not use {@link RecordIdentifier}?? */ static class DeleteRecordKey implements Comparable { + private static final DeleteRecordKey otherKey = new DeleteRecordKey(); private long originalWriteId; /** * see {@link BucketCodec} @@ -844,9 +1044,18 @@ public int compareTo(DeleteRecordKey other) { } return 0; } + private int compareTo(RecordIdentifier other) { + if (other == null) { + return -1; + } + otherKey.set(other.getWriteId(), other.getBucketProperty(), + other.getRowId()); + return compareTo(otherKey); + } @Override public String toString() { - return "owid: " + originalWriteId + " bucketP:" + bucketProperty + " rowid: " + rowId; + return "DeleteRecordKey(" + originalWriteId + "," + + RecordIdentifier.bucketToString(bucketProperty) + "," + rowId +")"; } } @@ -868,10 +1077,17 @@ public String toString() { private boolean isBucketPropertyRepeating; private final boolean isBucketedTable; private final Reader reader; - - DeleteReaderValue(Reader deleteDeltaReader, Reader.Options readerOptions, int bucket, - ValidWriteIdList validWriteIdList, boolean isBucketedTable, final JobConf conf) throws IOException { + private final Path deleteDeltaFile; + private final OrcRawRecordMerger.KeyInterval keyInterval; + private final OrcSplit orcSplit; + + DeleteReaderValue(Reader deleteDeltaReader, Path deleteDeltaFile, + Reader.Options readerOptions, int bucket, ValidWriteIdList validWriteIdList, + boolean isBucketedTable, final JobConf conf, + OrcRawRecordMerger.KeyInterval keyInterval, OrcSplit orcSplit) + throws IOException { this.reader = deleteDeltaReader; + this.deleteDeltaFile = deleteDeltaFile; this.recordReader = deleteDeltaReader.rowsOptions(readerOptions, conf); this.bucketForSplit = bucket; final boolean useDecimal64ColumnVector = HiveConf.getVar(conf, ConfVars @@ -887,7 +1103,11 @@ public String toString() { this.indexPtrInBatch = 0; this.validWriteIdList = validWriteIdList; this.isBucketedTable = isBucketedTable; - checkBucketId();//check 1st batch + if(batch != null) { + checkBucketId();//check 1st batch + } + this.keyInterval = keyInterval; + this.orcSplit = orcSplit; } public boolean next(DeleteRecordKey deleteRecordKey) throws IOException { @@ -910,13 +1130,30 @@ public boolean next(DeleteRecordKey deleteRecordKey) throws IOException { checkBucketId(deleteRecordKey.bucketProperty); } ++indexPtrInBatch; + if(!isDeleteEventInRange(keyInterval, deleteRecordKey)) { + continue; + } if (validWriteIdList.isWriteIdValid(currentWriteId)) { isValidNext = true; } } return true; } - + static boolean isDeleteEventInRange( + OrcRawRecordMerger.KeyInterval keyInterval, + DeleteRecordKey deleteRecordKey) { + if(keyInterval.getMinKey() != null && + deleteRecordKey.compareTo(keyInterval.getMinKey()) < 0) { + //current deleteEvent is < than minKey + return false; + } + if(keyInterval.getMaxKey() != null && + deleteRecordKey.compareTo(keyInterval.getMaxKey()) > 0) { + //current deleteEvent is > than maxKey + return false; + } + return true; + } public void close() throws IOException { this.recordReader.close(); } @@ -964,11 +1201,11 @@ private void checkBucketId(int bucketPropertyFromRecord) throws IOException { .decodeWriterId(bucketPropertyFromRecord); if(bucketIdFromRecord != bucketForSplit) { DeleteRecordKey dummy = new DeleteRecordKey(); - long curTxnId = setCurrentDeleteKey(dummy); + setCurrentDeleteKey(dummy); throw new IOException("Corrupted records with different bucket ids " - + "from the containing bucket file found! Expected bucket id " - + bucketForSplit + ", however found the bucket id " + bucketIdFromRecord + - " from " + dummy + " curTxnId: " + curTxnId); + + "from the containing bucket file found! Expected bucket id " + + bucketForSplit + ", however found " + dummy + + ". (" + orcSplit + "," + deleteDeltaFile + ")"); } } @@ -1030,14 +1267,15 @@ public int compareTo(CompressedOwid other) { private Boolean isEmpty = null; ColumnizedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, - Reader.Options readerOptions) throws IOException, DeleteEventsOverflowMemoryException { + Reader.Options readerOptions, + OrcRawRecordMerger.KeyInterval keyInterval) + throws IOException, DeleteEventsOverflowMemoryException { int bucket = AcidUtils.parseBucketId(orcSplit.getPath()); String txnString = conf.get(ValidWriteIdList.VALID_WRITEIDS_KEY); this.validWriteIdList = (txnString == null) ? new ValidReaderWriteIdList() : new ValidReaderWriteIdList(txnString); - LOG.debug("ColumnizedDeleteEventRegistry:: Read ValidWriteIdList: " + this.validWriteIdList.toString() - + " isFullAcidTable: " + AcidUtils.isFullAcidScan(conf)); - this.sortMerger = new TreeMap(); + LOG.debug("Using ColumnizedDeleteEventRegistry"); + this.sortMerger = new TreeMap<>(); this.rowIds = null; this.compressedOwids = null; int maxEventsInMemory = HiveConf.getIntVar(conf, ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY); @@ -1059,6 +1297,9 @@ public int compareTo(CompressedOwid other) { // NOTE: A check for existence of deleteDeltaFile is required because we may not have // deletes for the bucket being taken into consideration for this split processing. if (length != -1 && fs.exists(deleteDeltaFile)) { + /** + * todo: we have OrcSplit.orcTail so we should be able to get stats from there + */ Reader deleteDeltaReader = OrcFile.createReader(deleteDeltaFile, OrcFile.readerOptions(conf).maxLength(length)); AcidStats acidStats = OrcAcidUtils.parseAcidStats(deleteDeltaReader); @@ -1067,6 +1308,16 @@ public int compareTo(CompressedOwid other) { } totalDeleteEventCount += acidStats.deletes; if (totalDeleteEventCount > maxEventsInMemory) { + /*todo: this estimation is suboptimal especially if we have + min/maxKey for the split. The files may have many more + deletes than actually need to be loaded in memory (which is + the point of applying KeyInterval). + When does PPD apply? presumably at read time. + see RecordReaderImpl.SargApplier.pickRowGroups + So we should start loading and count events as we load and + stop if we get too much This means we have resize array + dynamically in readAllDeleteEventsFromDeleteDeltas()*/ + // ColumnizedDeleteEventRegistry loads all the delete events from all the delete deltas // into memory. To prevent out-of-memory errors, this check is a rough heuristic that // prevents creation of an object of this class if the total number of delete events @@ -1079,9 +1330,15 @@ public int compareTo(CompressedOwid other) { throw new DeleteEventsOverflowMemoryException(); } DeleteReaderValue deleteReaderValue = new DeleteReaderValue(deleteDeltaReader, - readerOptions, bucket, validWriteIdList, isBucketedTable, conf); + deleteDeltaFile, readerOptions, bucket, validWriteIdList, isBucketedTable, conf, + keyInterval, orcSplit); DeleteRecordKey deleteRecordKey = new DeleteRecordKey(); if (deleteReaderValue.next(deleteRecordKey)) { + //todo: this is problematic - if there are lots of + //delete_deltaS, they are all opened at once and ORC buffers + //may lead to OOM. Need to have aggressive compaction of + //deletes. This may be alleviated some by KeyInterval if + // every delete event in a given file is filtered out sortMerger.put(deleteRecordKey, deleteReaderValue); } else { deleteReaderValue.close(); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index c5589b953d..c117849aa4 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -716,7 +716,7 @@ public void testNonAcidToAcidConversion3() throws Exception { int resultCount = 1; Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); - // 2. Convert NONACIDORCTBL to ACID table with split_update enabled. (txn_props=default) + // 2. Convert NONACIDORCTBL to ACID table runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')"); status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER);