diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 650f2af63d..814782a503 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.io.orc; import java.io.IOException; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -54,7 +53,6 @@ private static final Logger LOG = LoggerFactory.getLogger(OrcRawRecordMerger.class); - private final Configuration conf; private final boolean collapse; private final RecordReader baseReader; private final ObjectInspector objectInspector; @@ -186,30 +184,37 @@ public String toString() { currentTransactionId + ", statementId: "+ statementId + "}"; } } - + interface ReaderPair { + OrcStruct nextRecord(); + int getColumns(); + RecordReader getRecordReader(); + Reader getReader(); + RecordIdentifier getMinKey(); + RecordIdentifier getMaxKey(); + ReaderKey getKey(); + void next(OrcStruct next) throws IOException; + } /** * A reader and the next record from that reader. The code reads ahead so that * we can return the lowest ReaderKey from each of the readers. Thus, the * next available row is nextRecord and only following records are still in * the reader. */ - static class ReaderPair { - OrcStruct nextRecord; - final Reader reader; - final RecordReader recordReader; - final ReaderKey key; + @VisibleForTesting + final static class ReaderPairAcid implements ReaderPair { + private OrcStruct nextRecord; + private final Reader reader; + private final RecordReader recordReader; + private final ReaderKey key; private final RecordIdentifier minKey; private final RecordIdentifier maxKey; - final int bucket; private final int statementId; - boolean advancedToMinKey = false; /** * Create a reader that reads from the first key larger than minKey to any * keys equal to maxKey. * @param key the key to read into * @param reader the ORC file reader - * @param bucket the bucket number for the file * @param minKey only return keys larger than minKey if it is non-null * @param maxKey only return keys less than or equal to maxKey if it is * non-null @@ -217,70 +222,71 @@ public String toString() { * @param statementId id of SQL statement within a transaction * @throws IOException */ - ReaderPair(ReaderKey key, Reader reader, int bucket, - RecordIdentifier minKey, RecordIdentifier maxKey, - ReaderImpl.Options options, int statementId) throws IOException { + @VisibleForTesting + ReaderPairAcid(ReaderKey key, Reader reader, + RecordIdentifier minKey, RecordIdentifier maxKey, + ReaderImpl.Options options, int statementId) throws IOException { this.reader = reader; this.key = key; - this.minKey = minKey; - this.maxKey = maxKey; - this.bucket = bucket; // TODO use stripe statistics to jump over stripes recordReader = reader.rowsOptions(options); this.statementId = statementId; + this.minKey = minKey; + this.maxKey = maxKey; + // advance the reader until we reach the minimum key + do { + next(nextRecord()); + } while (nextRecord() != null && + (minKey != null && key.compareRow(getMinKey()) <= 0)); + } + @Override public final OrcStruct nextRecord() { + return nextRecord; } - RecordReader getRecordReader() { + @Override + public final int getColumns() { + return getReader().getTypes().get(OrcRecordUpdater.ROW + 1).getSubtypesCount(); + } + + @Override public RecordReader getRecordReader() { return recordReader; } - /** - * This must be called right after the constructor but not in the constructor to make sure - * sub-classes are fully initialized before their {@link #next(OrcStruct)} is called - */ - void advnaceToMinKey() throws IOException { - advancedToMinKey = true; - // advance the reader until we reach the minimum key - do { - next(nextRecord); - } while (nextRecord != null && - (getMinKey() != null && key.compareRow(getMinKey()) <= 0)); + @Override public Reader getReader() { return reader; } + @Override public RecordIdentifier getMinKey() { + return minKey; + } + @Override public RecordIdentifier getMaxKey() { + return maxKey; + } + @Override public ReaderKey getKey() { + return key; } - void next(OrcStruct next) throws IOException { - assert advancedToMinKey : "advnaceToMinKey() was not called"; + @Override + public void next(OrcStruct next) throws IOException { if (getRecordReader().hasNext()) { nextRecord = (OrcStruct) getRecordReader().next(next); // set the key - key.setValues(OrcRecordUpdater.getOriginalTransaction(nextRecord), - OrcRecordUpdater.getBucket(nextRecord), - OrcRecordUpdater.getRowId(nextRecord), - OrcRecordUpdater.getCurrentTransaction(nextRecord), + getKey().setValues(OrcRecordUpdater.getOriginalTransaction(nextRecord()), + OrcRecordUpdater.getBucket(nextRecord()), + OrcRecordUpdater.getRowId(nextRecord()), + OrcRecordUpdater.getCurrentTransaction(nextRecord()), statementId); // if this record is larger than maxKey, we need to stop - if (getMaxKey() != null && key.compareRow(getMaxKey()) > 0) { - LOG.debug("key " + key + " > maxkey " + getMaxKey()); + if (getMaxKey() != null && getKey().compareRow(getMaxKey()) > 0) { + LOG.debug("key " + getKey() + " > maxkey " + getMaxKey()); nextRecord = null; getRecordReader().close(); } } else { nextRecord = null; - recordReader.close(); + getRecordReader().close(); } } - - RecordIdentifier getMinKey() { - return minKey; - } - RecordIdentifier getMaxKey() { - return maxKey; - } - int getColumns() { - return reader.getTypes().get(OrcRecordUpdater.ROW + 1).getSubtypesCount(); - } } /** - * A reader that pretends an original base file is a new version base file. + * A reader that pretends an original base file is a new versioned base file. * It wraps the underlying reader's row with an ACID event object and * makes the relevant translations. * @@ -297,178 +303,38 @@ int getColumns() { * {@link OrcRawRecordMerger#minKey} and {@link OrcRawRecordMerger#maxKey} are computed for each * split of the original file and used to filter rows from all the deltas. The ROW__ID.rowid for * the rows of the 'original' file of course, must be assigned from the beginning of logical - * bucket. + * bucket. The last split of the logical bucket, i.e. the split that has the end of last file, + * should include all insert events from deltas. */ - static final class OriginalReaderPair extends ReaderPair { - private final Options mergerOptions; - /** - * Sum total of all rows in all the files before the 'current' one in {@link #originalFiles} list - */ - private long rowIdOffset = 0; - /** - * See {@link AcidUtils.Directory#getOriginalFiles()}. This list has a fixed sort order. This - * is the full list when compacting and empty when doing a simple read. The later is because we - * only need to read the current split from 1 file for simple read. - */ - private final List originalFiles; - /** - * index into {@link #originalFiles} - */ - private int nextFileIndex = 0; - private long numRowsInCurrentFile = 0; - private RecordReader originalFileRecordReader = null; - private final Configuration conf; - private final Reader.Options options; - private final RecordIdentifier minKey;//shadow parent minKey to make final - private final RecordIdentifier maxKey;//shadow parent maxKey to make final - - OriginalReaderPair(ReaderKey key, Reader reader, int bucket, - final RecordIdentifier minKey, final RecordIdentifier maxKey, - Reader.Options options, Options mergerOptions, Configuration conf, - ValidTxnList validTxnList) throws IOException { - super(key, reader, bucket, minKey, maxKey, options, 0); - this.mergerOptions = mergerOptions; - this.conf = conf; - this.options = options; - assert mergerOptions.getRootPath() != null : "Since we have original files"; - assert bucket >= 0 : "don't support non-bucketed tables yet"; + private static abstract class OriginalReaderPair implements ReaderPair { + OrcStruct nextRecord; + private final ReaderKey key; + final int bucketId; - RecordIdentifier newMinKey = minKey; - RecordIdentifier newMaxKey = maxKey; - if(mergerOptions.isCompacting()) { - { - //when compacting each split needs to process the whole logical bucket - assert options.getOffset() == 0; - assert options.getMaxOffset() == Long.MAX_VALUE; - assert minKey == null; - assert maxKey == null; - } - AcidUtils.Directory directoryState = AcidUtils.getAcidState( - mergerOptions.getRootPath(), conf, validTxnList, false, true); - originalFiles = directoryState.getOriginalFiles(); - assert originalFiles.size() > 0; - /** - * when there are no copyN files, the {@link #recordReader} will be the the one and only - * file for for 'bucket' but closing here makes flow cleaner and only happens once in the - * life of the table. With copyN files, the caller may pass in any one of the copyN files. - * This is less prone to bugs than expecting the reader to pass in a Reader for the 1st file - * of a logical bucket.*/ - recordReader.close(); - reader = advanceToNextFile();//in case of Compaction, this is the 1st file of the current bucket - if(reader == null) { - //Compactor generated a split for a bucket that has no data? - throw new IllegalStateException("No 'original' files found for bucketId=" + bucket + - " in " + mergerOptions.getRootPath()); - } - numRowsInCurrentFile = reader.getNumberOfRows(); - originalFileRecordReader = reader.rowsOptions(options); - } - else { - /** - * Logically each bucket consists of 0000_0, 0000_0_copy_1... 0000_0_copyN. etc We don't - * know N a priori so if this is true, then the current split is from 0000_0_copyN file. - * It's needed to correctly set maxKey. In particular, set maxKey==null if this split - * is the tail of the last file for this logical bucket to include all deltas written after - * non-acid to acid table conversion. - */ - boolean isLastFileForThisBucket = false; - boolean haveSeenCurrentFile = false; - originalFiles = Collections.emptyList(); - if (mergerOptions.getCopyIndex() > 0) { - //the split is from something other than the 1st file of the logical bucket - compute offset - - AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(), - conf, validTxnList, false, true); - for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) { - AcidOutputFormat.Options bucketOptions = - AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf); - if (bucketOptions.getBucketId() != bucket) { - continue; - } - if(haveSeenCurrentFile) { - //if here we already saw current file and now found another file for the same bucket - //so the current file is not the last file of the logical bucket - isLastFileForThisBucket = false; - break; - } - if(f.getFileStatus().getPath().equals(mergerOptions.getBucketPath())) { - /** - * found the file whence the current split is from so we're done - * counting {@link rowIdOffset} - */ - haveSeenCurrentFile = true; - isLastFileForThisBucket = true; - continue; - } - Reader copyReader = OrcFile.createReader(f.getFileStatus().getPath(), - OrcFile.readerOptions(conf)); - rowIdOffset += copyReader.getNumberOfRows(); - } - if (rowIdOffset > 0) { - //rowIdOffset could be 0 if all files before current one are empty - /** - * Since we already done {@link OrcRawRecordMerger#discoverOriginalKeyBounds(Reader, - * int, Reader.Options)} need to fix min/max key since these are used by - * {@link #next(OrcStruct)} which uses {@link #rowIdOffset} to generate rowId for - * the key. Clear? */ - if (minKey != null) { - minKey.setRowId(minKey.getRowId() + rowIdOffset); - } - else { - /** - * If this is not the 1st file, set minKey 1 less than the start of current file - * (Would not need to set minKey if we knew that there are no delta files) - * {@link #advanceToMinKey()} needs this */ - newMinKey = new RecordIdentifier(0, bucket, rowIdOffset - 1); - } - if (maxKey != null) { - maxKey.setRowId(maxKey.getRowId() + rowIdOffset); - } - } - } else { - isLastFileForThisBucket = true; - AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(), - conf, validTxnList, false, true); - int numFilesInBucket= 0; - for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) { - AcidOutputFormat.Options bucketOptions = - AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf); - if (bucketOptions.getBucketId() == bucket) { - numFilesInBucket++; - if(numFilesInBucket > 1) { - isLastFileForThisBucket = false; - break; - } - } - } - } - originalFileRecordReader = recordReader; - if(!isLastFileForThisBucket && maxKey == null) { - /* - * If this is the last file for this bucket, maxKey == null means the split is the tail - * of the file so we want to leave it blank to make sure any insert events in delta - * files are included; Conversely, if it's not the last file, set the maxKey so that - * events from deltas that don't modify anything in the current split are excluded*/ - newMaxKey = new RecordIdentifier(0, bucket, - rowIdOffset + reader.getNumberOfRows() - 1); - } - } - this.minKey = newMinKey; - this.maxKey = newMaxKey; - } - @Override RecordReader getRecordReader() { - return originalFileRecordReader; + OriginalReaderPair(ReaderKey key, int bucketId) throws IOException { + this.key = key; + this.bucketId = bucketId; + assert bucketId >= 0 : "don't support non-bucketed tables yet"; } - @Override RecordIdentifier getMinKey() { - return minKey; + @Override public final OrcStruct nextRecord() { + return nextRecord; } - @Override RecordIdentifier getMaxKey() { - return maxKey; + @Override + public int getColumns() { + return getReader().getTypes().get(0).getSubtypesCount(); } - private boolean nextFromCurrentFile(OrcStruct next) throws IOException { - if (originalFileRecordReader.hasNext()) { + @Override + public final ReaderKey getKey() { return key; } + /** + * The cumulative number of row in all files of the logical bucket that precede the file + * represented by {@link #getRecordReader()} + */ + abstract long getRowIdOffset(); + + final boolean nextFromCurrentFile(OrcStruct next) throws IOException { + if (getRecordReader().hasNext()) { //RecordReader.getRowNumber() produces a file-global row number even with PPD - long nextRowId = originalFileRecordReader.getRowNumber() + rowIdOffset; + long nextRowId = getRecordReader().getRowNumber() + getRowIdOffset(); // have to do initialization here, because the super's constructor // calls next and thus we need to initialize before our constructor // runs @@ -476,17 +342,17 @@ private boolean nextFromCurrentFile(OrcStruct next) throws IOException { nextRecord = new OrcStruct(OrcRecordUpdater.FIELDS); IntWritable operation = new IntWritable(OrcRecordUpdater.INSERT_OPERATION); - nextRecord.setFieldValue(OrcRecordUpdater.OPERATION, operation); - nextRecord.setFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION, + nextRecord().setFieldValue(OrcRecordUpdater.OPERATION, operation); + nextRecord().setFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION, new LongWritable(0)); - nextRecord.setFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION, + nextRecord().setFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION, new LongWritable(0)); - nextRecord.setFieldValue(OrcRecordUpdater.BUCKET, - new IntWritable(bucket)); - nextRecord.setFieldValue(OrcRecordUpdater.ROW_ID, + nextRecord().setFieldValue(OrcRecordUpdater.BUCKET, + new IntWritable(bucketId)); + nextRecord().setFieldValue(OrcRecordUpdater.ROW_ID, new LongWritable(nextRowId)); - nextRecord.setFieldValue(OrcRecordUpdater.ROW, - originalFileRecordReader.next(null)); + nextRecord().setFieldValue(OrcRecordUpdater.ROW, + getRecordReader().next(null)); } else { nextRecord = next; ((IntWritable) next.getFieldValue(OrcRecordUpdater.OPERATION)) @@ -494,18 +360,18 @@ private boolean nextFromCurrentFile(OrcStruct next) throws IOException { ((LongWritable) next.getFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION)) .set(0); ((IntWritable) next.getFieldValue(OrcRecordUpdater.BUCKET)) - .set(bucket); + .set(bucketId); ((LongWritable) next.getFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION)) .set(0); ((LongWritable) next.getFieldValue(OrcRecordUpdater.ROW_ID)) .set(nextRowId); - nextRecord.setFieldValue(OrcRecordUpdater.ROW, - originalFileRecordReader.next(OrcRecordUpdater.getRow(next))); + nextRecord().setFieldValue(OrcRecordUpdater.ROW, + getRecordReader().next(OrcRecordUpdater.getRow(next))); } - key.setValues(0L, bucket, nextRowId, 0L, 0); - if (maxKey != null && key.compareRow(maxKey) > 0) { + key.setValues(0L, bucketId, nextRowId, 0L, 0); + if (getMaxKey() != null && key.compareRow(getMaxKey()) > 0) { if (LOG.isDebugEnabled()) { - LOG.debug("key " + key + " > maxkey " + maxKey); + LOG.debug("key " + key + " > maxkey " + getMaxKey()); } return false;//reached End Of Split } @@ -513,9 +379,199 @@ private boolean nextFromCurrentFile(OrcStruct next) throws IOException { } return false;//reached EndOfFile } + } + @VisibleForTesting + final static class OriginalReaderPairToRead extends OriginalReaderPair { + private final long rowIdOffset; + private final Reader reader; + private final RecordReader recordReader; + private final RecordIdentifier minKey; + private final RecordIdentifier maxKey; + + OriginalReaderPairToRead(ReaderKey key, Reader reader, int bucketId, + final RecordIdentifier minKey, final RecordIdentifier maxKey, + Reader.Options options, Options mergerOptions, Configuration conf, + ValidTxnList validTxnList) throws IOException { + super(key, bucketId); + this.reader = reader; + assert !mergerOptions.isCompacting(); + assert mergerOptions.getRootPath() != null : "Since we have original files"; + + RecordIdentifier newMinKey = minKey; + RecordIdentifier newMaxKey = maxKey; + recordReader = reader.rowsOptions(options); + /** + * Logically each bucket consists of 0000_0, 0000_0_copy_1... 0000_0_copyN. etc We don't + * know N a priori so if this is true, then the current split is from 0000_0_copyN file. + * It's needed to correctly set maxKey. In particular, set maxKey==null if this split + * is the tail of the last file for this logical bucket to include all deltas written after + * non-acid to acid table conversion. + */ + boolean isLastFileForThisBucket = false; + boolean haveSeenCurrentFile = false; + long rowIdOffsetTmp = 0; + if (mergerOptions.getCopyIndex() > 0) { + //the split is from something other than the 1st file of the logical bucket - compute offset + + AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(), + conf, validTxnList, false, true); + for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) { + AcidOutputFormat.Options bucketOptions = + AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf); + if (bucketOptions.getBucketId() != bucketId) { + continue; + } + if (haveSeenCurrentFile) { + //if here we already saw current file and now found another file for the same bucket + //so the current file is not the last file of the logical bucket + isLastFileForThisBucket = false; + break; + } + if (f.getFileStatus().getPath().equals(mergerOptions.getBucketPath())) { + /** + * found the file whence the current split is from so we're done + * counting {@link rowIdOffset} + */ + haveSeenCurrentFile = true; + isLastFileForThisBucket = true; + continue; + } + Reader copyReader = OrcFile.createReader(f.getFileStatus().getPath(), + OrcFile.readerOptions(conf)); + rowIdOffsetTmp += copyReader.getNumberOfRows(); + } + this.rowIdOffset = rowIdOffsetTmp; + if (rowIdOffset > 0) { + //rowIdOffset could be 0 if all files before current one are empty + /** + * Since we already done {@link OrcRawRecordMerger#discoverOriginalKeyBounds(Reader, + * int, Reader.Options)} need to fix min/max key since these are used by + * {@link #next(OrcStruct)} which uses {@link #rowIdOffset} to generate rowId for + * the key. Clear? */ + if (minKey != null) { + minKey.setRowId(minKey.getRowId() + rowIdOffset); + } else { + /** + * If this is not the 1st file, set minKey 1 less than the start of current file + * (Would not need to set minKey if we knew that there are no delta files) + * {@link #advanceToMinKey()} needs this */ + newMinKey = new RecordIdentifier(0, bucketId, rowIdOffset - 1); + } + if (maxKey != null) { + maxKey.setRowId(maxKey.getRowId() + rowIdOffset); + } + } + } else { + rowIdOffset = 0; + isLastFileForThisBucket = true; + AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(), + conf, validTxnList, false, true); + int numFilesInBucket = 0; + for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) { + AcidOutputFormat.Options bucketOptions = + AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf); + if (bucketOptions.getBucketId() == bucketId) { + numFilesInBucket++; + if (numFilesInBucket > 1) { + isLastFileForThisBucket = false; + break; + } + } + } + } + if (!isLastFileForThisBucket && maxKey == null) { + /* + * If this is the last file for this bucket, maxKey == null means the split is the tail + * of the file so we want to leave it blank to make sure any insert events in delta + * files are included; Conversely, if it's not the last file, set the maxKey so that + * events from deltas that don't modify anything in the current split are excluded*/ + newMaxKey = new RecordIdentifier(0, bucketId, + rowIdOffset + reader.getNumberOfRows() - 1); + } + this.minKey = newMinKey; + this.maxKey = newMaxKey; + + // advance the reader until we reach the minimum key + do { + next(nextRecord()); + } while (nextRecord() != null && + (getMinKey() != null && this.getKey().compareRow(getMinKey()) <= 0)); + } + @Override public RecordReader getRecordReader() { + return recordReader; + } + @Override public Reader getReader() { return reader; } + @Override public RecordIdentifier getMinKey() { return minKey; } + @Override public RecordIdentifier getMaxKey() { + return maxKey; + } + @Override public long getRowIdOffset() { return rowIdOffset; } + + @Override + public void next(OrcStruct next) throws IOException { + if(!nextFromCurrentFile(next)) { + //only have 1 file so done + nextRecord = null; + getRecordReader().close(); + } + } + } + @VisibleForTesting + final static class OriginalReaderPairToCompact extends OriginalReaderPair { + /** + * See {@link AcidUtils.Directory#getOriginalFiles()}. This list has a fixed sort order. + * It includes all original files (for all buckets). + */ + private final List originalFiles; + /** + * index into {@link #originalFiles} + */ + private int nextFileIndex = 0; + private Reader reader; + private RecordReader recordReader = null; + private final Configuration conf; + private final Reader.Options options; + private long rowIdOffset = 0; + + OriginalReaderPairToCompact(ReaderKey key, int bucketId, + Reader.Options options, Options mergerOptions, Configuration conf, + ValidTxnList validTxnList) throws IOException { + super(key, bucketId); + assert mergerOptions.isCompacting() : "Should only be used for Compaction"; + this.conf = conf; + this.options = options; + assert mergerOptions.getRootPath() != null : "Since we have original files"; + assert this.bucketId >= 0 : "don't support non-bucketed tables yet"; + //when compacting each split needs to process the whole logical bucket + assert options.getOffset() == 0; + assert options.getMaxOffset() == Long.MAX_VALUE; + AcidUtils.Directory directoryState = AcidUtils.getAcidState( + mergerOptions.getRootPath(), conf, validTxnList, false, true); + originalFiles = directoryState.getOriginalFiles(); + assert originalFiles.size() > 0; + this.reader = advanceToNextFile();//in case of Compaction, this is the 1st file of the current bucket + if (reader == null) { + //Compactor generated a split for a bucket that has no data? + throw new IllegalStateException("No 'original' files found for bucketId=" + this.bucketId + + " in " + mergerOptions.getRootPath()); + } + recordReader = getReader().rowsOptions(options); + next(nextRecord());//load 1st row + } + @Override public RecordReader getRecordReader() { + return recordReader; + } + @Override public Reader getReader() { return reader; } + @Override public RecordIdentifier getMinKey() { + return null; + } + @Override public RecordIdentifier getMaxKey() { + return null; + } + @Override public long getRowIdOffset() { return rowIdOffset; } + @Override - void next(OrcStruct next) throws IOException { - assert advancedToMinKey : "advnaceToMinKey() was not called"; + public void next(OrcStruct next) throws IOException { while(true) { if(nextFromCurrentFile(next)) { return; @@ -523,19 +579,17 @@ void next(OrcStruct next) throws IOException { if (originalFiles.size() <= nextFileIndex) { //no more original files to read nextRecord = null; - originalFileRecordReader.close(); + recordReader.close(); return; } else { - assert mergerOptions.isCompacting() : "originalFiles.size() should be 0 when not compacting"; - rowIdOffset += numRowsInCurrentFile; - originalFileRecordReader.close(); - Reader reader = advanceToNextFile(); + rowIdOffset += reader.getNumberOfRows(); + recordReader.close(); + reader = advanceToNextFile(); if(reader == null) { nextRecord = null; return; } - numRowsInCurrentFile = reader.getNumberOfRows(); - originalFileRecordReader = reader.rowsOptions(options); + recordReader = reader.rowsOptions(options); } } } @@ -546,21 +600,19 @@ void next(OrcStruct next) throws IOException { */ private Reader advanceToNextFile() throws IOException { while(nextFileIndex < originalFiles.size()) { - AcidOutputFormat.Options bucketOptions = AcidUtils.parseBaseOrDeltaBucketFilename(originalFiles.get(nextFileIndex).getFileStatus().getPath(), conf); - if (bucketOptions.getBucketId() == bucket) { + AcidOutputFormat.Options bucketOptions = AcidUtils.parseBaseOrDeltaBucketFilename( + originalFiles.get(nextFileIndex).getFileStatus().getPath(), conf); + if (bucketOptions.getBucketId() == bucketId) { break; } + //the the bucket we care about here nextFileIndex++; } if(originalFiles.size() <= nextFileIndex) { return null;//no more files for current bucket } - return OrcFile.createReader(originalFiles.get(nextFileIndex++).getFileStatus().getPath(), OrcFile.readerOptions(conf)); - } - - @Override - int getColumns() { - return reader.getTypes().get(0).getSubtypesCount(); + return OrcFile.createReader(originalFiles.get(nextFileIndex++).getFileStatus(). + getPath(), OrcFile.readerOptions(conf)); } } @@ -569,8 +621,7 @@ int getColumns() { * {@link ReaderKey} ascending. The output of this Reader should a global order across these * files. The root of this tree is always the next 'file' to read from. */ - private final TreeMap readers = - new TreeMap(); + private final TreeMap readers = new TreeMap<>(); // The reader that currently has the lowest key. private ReaderPair primary; @@ -637,7 +688,8 @@ private KeyInterval discoverOriginalKeyBounds(Reader reader, int bucket, } /** - * Find the key range for bucket files. + * Find the key range for the split (of the base). These are used to filter delta files since + * both are sorted by key. * @param reader the reader * @param options the options for reading with * @throws IOException @@ -681,7 +733,7 @@ private KeyInterval discoverKeyBounds(Reader reader, */ static Reader.Options createEventOptions(Reader.Options options) { Reader.Options result = options.clone(); - result.range(options.getOffset(), Long.MAX_VALUE); + //result.range(options.getOffset(), Long.MAX_VALUE);WTF? result.include(options.getInclude()); // slide the column names down by 6 for the name array @@ -757,7 +809,6 @@ Path getBucketPath() { ValidTxnList validTxnList, Reader.Options options, Path[] deltaDirectory, Options mergerOptions) throws IOException { - this.conf = conf; this.collapse = collapseEvents; this.offset = options.getOffset(); this.length = options.getLength(); @@ -788,18 +839,23 @@ Path getBucketPath() { ReaderKey key = new ReaderKey(); if (isOriginal) { options = options.clone(); - pair = new OriginalReaderPair(key, reader, bucket, keyInterval.getMinKey(), keyInterval.getMaxKey(), - options, mergerOptions, conf, validTxnList); + if(mergerOptions.isCompacting()) { + pair = new OriginalReaderPairToCompact(key, bucket, options, mergerOptions, + conf, validTxnList); + } + else { + pair = new OriginalReaderPairToRead(key, reader, bucket, keyInterval.getMinKey(), + keyInterval.getMaxKey(), options, mergerOptions, conf, validTxnList); + } } else { - pair = new ReaderPair(key, reader, bucket, keyInterval.getMinKey(), keyInterval.getMaxKey(), - eventOptions, 0); + pair = new ReaderPairAcid(key, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(), + eventOptions, 0); } minKey = pair.getMinKey(); maxKey = pair.getMaxKey(); LOG.info("updated min key = " + keyInterval.getMinKey() + ", max key = " + keyInterval.getMaxKey()); - pair.advnaceToMinKey(); // if there is at least one record, put it in the map - if (pair.nextRecord != null) { + if (pair.nextRecord() != null) { readers.put(key, pair); } baseReader = pair.getRecordReader(); @@ -828,11 +884,9 @@ Path getBucketPath() { deltaEventOptions = eventOptions.clone().searchArgument(null, null); } } - ReaderPair deltaPair; - deltaPair = new ReaderPair(key, deltaReader, bucket, minKey, - maxKey, deltaEventOptions != null ? deltaEventOptions : eventOptions, deltaDir.getStatementId()); - deltaPair.advnaceToMinKey(); - if (deltaPair.nextRecord != null) { + ReaderPairAcid deltaPair = new ReaderPairAcid(key, deltaReader, minKey, maxKey, + deltaEventOptions != null ? deltaEventOptions : eventOptions, deltaDir.getStatementId()); + if (deltaPair.nextRecord() != null) { readers.put(key, deltaPair); } } @@ -883,8 +937,8 @@ public boolean next(RecordIdentifier recordIdentifier, while (keysSame && primary != null) { // The primary's nextRecord is the next value to return - OrcStruct current = primary.nextRecord; - recordIdentifier.set(primary.key); + OrcStruct current = primary.nextRecord(); + recordIdentifier.set(primary.getKey()); // Advance the primary reader to the next record primary.next(extraValue); @@ -895,12 +949,12 @@ public boolean next(RecordIdentifier recordIdentifier, // now that the primary reader has advanced, we need to see if we // continue to read it or move to the secondary. - if (primary.nextRecord == null || - primary.key.compareTo(secondaryKey) > 0) { + if (primary.nextRecord() == null || + primary.getKey().compareTo(secondaryKey) > 0) { // if the primary isn't done, push it back into the readers - if (primary.nextRecord != null) { - readers.put(primary.key, primary); + if (primary.nextRecord() != null) { + readers.put(primary.getKey(), primary); } // update primary and secondaryKey @@ -967,10 +1021,10 @@ public long getPos() throws IOException { @Override public void close() throws IOException { if (primary != null) { - primary.recordReader.close(); + primary.getRecordReader().close(); } for(ReaderPair pair: readers.values()) { - pair.recordReader.close(); + pair.getRecordReader().close(); } } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java index 2406af5013..ba8d6756ef 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java @@ -36,7 +36,6 @@ import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.RecordUpdater; -import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.OriginalReaderPair; import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.ReaderKey; import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.ReaderPair; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -186,25 +185,24 @@ public void testReaderPair() throws Exception { Reader reader = createMockReader(); RecordIdentifier minKey = new RecordIdentifier(10, 20, 30); RecordIdentifier maxKey = new RecordIdentifier(40, 50, 60); - ReaderPair pair = new ReaderPair(key, reader, 20, minKey, maxKey, + ReaderPair pair = new OrcRawRecordMerger.ReaderPairAcid(key, reader, minKey, maxKey, new Reader.Options(), 0); - pair.advnaceToMinKey(); - RecordReader recordReader = pair.recordReader; + RecordReader recordReader = pair.getRecordReader(); assertEquals(10, key.getTransactionId()); assertEquals(20, key.getBucketProperty()); assertEquals(40, key.getRowId()); assertEquals(120, key.getCurrentTransactionId()); - assertEquals("third", value(pair.nextRecord)); + assertEquals("third", value(pair.nextRecord())); - pair.next(pair.nextRecord); + pair.next(pair.nextRecord()); assertEquals(40, key.getTransactionId()); assertEquals(50, key.getBucketProperty()); assertEquals(60, key.getRowId()); assertEquals(130, key.getCurrentTransactionId()); - assertEquals("fourth", value(pair.nextRecord)); + assertEquals("fourth", value(pair.nextRecord())); - pair.next(pair.nextRecord); - assertEquals(null, pair.nextRecord); + pair.next(pair.nextRecord()); + assertEquals(null, pair.nextRecord()); Mockito.verify(recordReader).close(); } @@ -213,46 +211,45 @@ public void testReaderPairNoMin() throws Exception { ReaderKey key = new ReaderKey(); Reader reader = createMockReader(); - ReaderPair pair = new ReaderPair(key, reader, 20, null, null, + ReaderPair pair = new OrcRawRecordMerger.ReaderPairAcid(key, reader, null, null, new Reader.Options(), 0); - pair.advnaceToMinKey(); - RecordReader recordReader = pair.recordReader; + RecordReader recordReader = pair.getRecordReader(); assertEquals(10, key.getTransactionId()); assertEquals(20, key.getBucketProperty()); assertEquals(20, key.getRowId()); assertEquals(100, key.getCurrentTransactionId()); - assertEquals("first", value(pair.nextRecord)); + assertEquals("first", value(pair.nextRecord())); - pair.next(pair.nextRecord); + pair.next(pair.nextRecord()); assertEquals(10, key.getTransactionId()); assertEquals(20, key.getBucketProperty()); assertEquals(30, key.getRowId()); assertEquals(110, key.getCurrentTransactionId()); - assertEquals("second", value(pair.nextRecord)); + assertEquals("second", value(pair.nextRecord())); - pair.next(pair.nextRecord); + pair.next(pair.nextRecord()); assertEquals(10, key.getTransactionId()); assertEquals(20, key.getBucketProperty()); assertEquals(40, key.getRowId()); assertEquals(120, key.getCurrentTransactionId()); - assertEquals("third", value(pair.nextRecord)); + assertEquals("third", value(pair.nextRecord())); - pair.next(pair.nextRecord); + pair.next(pair.nextRecord()); assertEquals(40, key.getTransactionId()); assertEquals(50, key.getBucketProperty()); assertEquals(60, key.getRowId()); assertEquals(130, key.getCurrentTransactionId()); - assertEquals("fourth", value(pair.nextRecord)); + assertEquals("fourth", value(pair.nextRecord())); - pair.next(pair.nextRecord); + pair.next(pair.nextRecord()); assertEquals(40, key.getTransactionId()); assertEquals(50, key.getBucketProperty()); assertEquals(61, key.getRowId()); assertEquals(140, key.getCurrentTransactionId()); - assertEquals("fifth", value(pair.nextRecord)); + assertEquals("fifth", value(pair.nextRecord())); - pair.next(pair.nextRecord); - assertEquals(null, pair.nextRecord); + pair.next(pair.nextRecord()); + assertEquals(null, pair.nextRecord()); Mockito.verify(recordReader).close(); } @@ -296,25 +293,24 @@ public void testOriginalReaderPair() throws Exception { Path root = new Path(tmpDir, "testOriginalReaderPair"); fs.makeQualified(root); fs.create(root); - ReaderPair pair = new OriginalReaderPair(key, reader, 10, minKey, maxKey, + ReaderPair pair = new OrcRawRecordMerger.OriginalReaderPairToRead(key, reader, 10, minKey, maxKey, new Reader.Options().include(includes), new OrcRawRecordMerger.Options().rootPath(root), conf, new ValidReadTxnList()); - pair.advnaceToMinKey(); - RecordReader recordReader = pair.recordReader; + RecordReader recordReader = pair.getRecordReader(); assertEquals(0, key.getTransactionId()); assertEquals(10, key.getBucketProperty()); assertEquals(2, key.getRowId()); assertEquals(0, key.getCurrentTransactionId()); - assertEquals("third", value(pair.nextRecord)); + assertEquals("third", value(pair.nextRecord())); - pair.next(pair.nextRecord); + pair.next(pair.nextRecord()); assertEquals(0, key.getTransactionId()); assertEquals(10, key.getBucketProperty()); assertEquals(3, key.getRowId()); assertEquals(0, key.getCurrentTransactionId()); - assertEquals("fourth", value(pair.nextRecord)); + assertEquals("fourth", value(pair.nextRecord())); - pair.next(pair.nextRecord); - assertEquals(null, pair.nextRecord); + pair.next(pair.nextRecord()); + assertEquals(null, pair.nextRecord()); Mockito.verify(recordReader).close(); } @@ -331,46 +327,45 @@ public void testOriginalReaderPairNoMin() throws Exception { Path root = new Path(tmpDir, "testOriginalReaderPairNoMin"); fs.makeQualified(root); fs.create(root); - ReaderPair pair = new OriginalReaderPair(key, reader, 10, null, null, + ReaderPair pair = new OrcRawRecordMerger.OriginalReaderPairToRead(key, reader, 10, null, null, new Reader.Options(), new OrcRawRecordMerger.Options().rootPath(root), conf, new ValidReadTxnList()); - pair.advnaceToMinKey(); - assertEquals("first", value(pair.nextRecord)); + assertEquals("first", value(pair.nextRecord())); assertEquals(0, key.getTransactionId()); assertEquals(10, key.getBucketProperty()); assertEquals(0, key.getRowId()); assertEquals(0, key.getCurrentTransactionId()); - pair.next(pair.nextRecord); - assertEquals("second", value(pair.nextRecord)); + pair.next(pair.nextRecord()); + assertEquals("second", value(pair.nextRecord())); assertEquals(0, key.getTransactionId()); assertEquals(10, key.getBucketProperty()); assertEquals(1, key.getRowId()); assertEquals(0, key.getCurrentTransactionId()); - pair.next(pair.nextRecord); - assertEquals("third", value(pair.nextRecord)); + pair.next(pair.nextRecord()); + assertEquals("third", value(pair.nextRecord())); assertEquals(0, key.getTransactionId()); assertEquals(10, key.getBucketProperty()); assertEquals(2, key.getRowId()); assertEquals(0, key.getCurrentTransactionId()); - pair.next(pair.nextRecord); - assertEquals("fourth", value(pair.nextRecord)); + pair.next(pair.nextRecord()); + assertEquals("fourth", value(pair.nextRecord())); assertEquals(0, key.getTransactionId()); assertEquals(10, key.getBucketProperty()); assertEquals(3, key.getRowId()); assertEquals(0, key.getCurrentTransactionId()); - pair.next(pair.nextRecord); - assertEquals("fifth", value(pair.nextRecord)); + pair.next(pair.nextRecord()); + assertEquals("fifth", value(pair.nextRecord())); assertEquals(0, key.getTransactionId()); assertEquals(10, key.getBucketProperty()); assertEquals(4, key.getRowId()); assertEquals(0, key.getCurrentTransactionId()); - pair.next(pair.nextRecord); - assertEquals(null, pair.nextRecord); - Mockito.verify(pair.recordReader).close(); + pair.next(pair.nextRecord()); + assertEquals(null, pair.nextRecord()); + Mockito.verify(pair.getRecordReader()).close(); } @Test @@ -437,11 +432,11 @@ public void testNewBase() throws Exception { OrcRawRecordMerger merger = new OrcRawRecordMerger(conf, false, reader, false, 10, createMaximalTxnList(), new Reader.Options().range(1000, 1000), null, new OrcRawRecordMerger.Options()); - RecordReader rr = merger.getCurrentReader().recordReader; + RecordReader rr = merger.getCurrentReader().getRecordReader(); assertEquals(0, merger.getOtherReaders().size()); - assertEquals(new RecordIdentifier(10, 20, 30), merger.getMinKey()); - assertEquals(new RecordIdentifier(40, 50, 60), merger.getMaxKey()); + assertEquals("" + merger.getMinKey(),new RecordIdentifier(10, 20, 30), merger.getMinKey()); + assertEquals("" + merger.getMaxKey(), new RecordIdentifier(40, 50, 60), merger.getMaxKey()); RecordIdentifier id = merger.createKey(); OrcStruct event = merger.createValue();