diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index ffcdf6ab09..293b0b46d5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -179,23 +179,32 @@ public String toString() { currentTransactionId + ", statementId: "+ statementId + "}"; } } - + interface ReaderPair { + ReaderKey getKey(); + OrcStruct nextRecord(); + void next(OrcStruct next) throws IOException; + RecordIdentifier getMinKey(); + RecordIdentifier getMaxKey(); + int getColumns(); + RecordReader getRecordReader(); + //void advnaceToMinKey() throws IOException;//todo: move to c'tor + } /** * A reader and the next record from that reader. The code reads ahead so that * we can return the lowest ReaderKey from each of the readers. Thus, the * next available row is nextRecord and only following records are still in * the reader. */ - static class ReaderPair { - OrcStruct nextRecord; - final Reader reader; - final RecordReader recordReader; - final ReaderKey key; + @VisibleForTesting + final static class ReaderPairAcid implements ReaderPair { + private OrcStruct nextRecord; + private final Reader reader; + private final RecordReader recordReader; + private final ReaderKey key; private final RecordIdentifier minKey; private final RecordIdentifier maxKey; - final int bucket; private final int statementId; - boolean advancedToMinKey = false; + //private boolean advancedToMinKey = false; /** * Create a reader that reads from the first key larger than minKey to any @@ -210,36 +219,52 @@ public String toString() { * @param statementId id of SQL statement within a transaction * @throws IOException */ - ReaderPair(ReaderKey key, Reader reader, int bucket, - RecordIdentifier minKey, RecordIdentifier maxKey, - ReaderImpl.Options options, int statementId) throws IOException { + ReaderPairAcid(ReaderKey key, Reader reader, int bucket, + RecordIdentifier minKey, RecordIdentifier maxKey, + ReaderImpl.Options options, int statementId) throws IOException { this.reader = reader; this.key = key; this.minKey = minKey; this.maxKey = maxKey; - this.bucket = bucket; // TODO use stripe statistics to jump over stripes recordReader = reader.rowsOptions(options); this.statementId = statementId; - } - RecordReader getRecordReader() { - return recordReader; - } - /** - * This must be called right after the constructor but not in the constructor to make sure - * sub-classes are fully initialized before their {@link #next(OrcStruct)} is called - */ - void advnaceToMinKey() throws IOException { - advancedToMinKey = true; + //advancedToMinKey = true; // advance the reader until we reach the minimum key do { next(nextRecord); } while (nextRecord != null && - (getMinKey() != null && key.compareRow(getMinKey()) <= 0)); + (minKey != null && key.compareRow(getMinKey()) <= 0)); + } + + /** + * C'tor that computes min/max key - i.e. to be used with base reader + * @param key + * @param reader + * @param bucket + * @param options + * @param statementId + * @throws IOException + */ + ReaderPairAcid(ReaderKey key, Reader reader, int bucket, + ReaderImpl.Options options, int statementId) throws IOException { + this(key, reader, bucket, null, null, options, statementId); + } + @Override + public RecordReader getRecordReader() { + return recordReader; + } + @Override + public OrcStruct nextRecord() { + return nextRecord; + } + @Override + public ReaderKey getKey() { + return key; } - void next(OrcStruct next) throws IOException { - assert advancedToMinKey : "advnaceToMinKey() was not called"; + public void next(OrcStruct next) throws IOException { + //assert advancedToMinKey : "advnaceToMinKey() was not called"; if (getRecordReader().hasNext()) { nextRecord = (OrcStruct) getRecordReader().next(next); // set the key @@ -261,13 +286,13 @@ void next(OrcStruct next) throws IOException { } } - RecordIdentifier getMinKey() { + public RecordIdentifier getMinKey() { return minKey; } - RecordIdentifier getMaxKey() { + public RecordIdentifier getMaxKey() { return maxKey; } - int getColumns() { + public int getColumns() { return reader.getTypes().get(OrcRecordUpdater.ROW + 1).getSubtypesCount(); } } @@ -292,7 +317,12 @@ int getColumns() { * the rows of the 'original' file of course, must be assigned from the beginning of logical * bucket. */ - static final class OriginalReaderPair extends ReaderPair { + static final class OriginalReaderPair implements ReaderPair { + private OrcStruct nextRecord; + final Reader reader; + final ReaderKey key; + final int bucket; + //boolean advancedToMinKey = false; private final Options mergerOptions; /** * Sum total of all rows in all the files before the 'current' one in {@link #originalFiles} list @@ -319,7 +349,9 @@ int getColumns() { final RecordIdentifier minKey, final RecordIdentifier maxKey, Reader.Options options, Options mergerOptions, Configuration conf, ValidTxnList validTxnList) throws IOException { - super(key, reader, bucket, minKey, maxKey, options, 0); + this.key = key; + this.reader = reader; + this.bucket = bucket; this.mergerOptions = mergerOptions; this.conf = conf; this.options = options; @@ -328,7 +360,7 @@ int getColumns() { RecordIdentifier newMinKey = minKey; RecordIdentifier newMaxKey = maxKey; - if(mergerOptions.isCompacting()) { + if (mergerOptions.isCompacting()) { { //when compacting each split needs to process the whole logical bucket assert options.getOffset() == 0; @@ -346,17 +378,17 @@ int getColumns() { * life of the table. With copyN files, the caller may pass in any one of the copyN files. * This is less prone to bugs than expecting the reader to pass in a Reader for the 1st file * of a logical bucket.*/ - recordReader.close(); + //recordReader.close(); reader = advanceToNextFile();//in case of Compaction, this is the 1st file of the current bucket - if(reader == null) { + if (reader == null) { //Compactor generated a split for a bucket that has no data? throw new IllegalStateException("No 'original' files found for bucketId=" + bucket + " in " + mergerOptions.getRootPath()); } numRowsInCurrentFile = reader.getNumberOfRows(); originalFileRecordReader = reader.rowsOptions(options); - } - else { + } else { + originalFileRecordReader = reader.rowsOptions(options); /** * Logically each bucket consists of 0000_0, 0000_0_copy_1... 0000_0_copyN. etc We don't * know N a priori so if this is true, then the current split is from 0000_0_copyN file. @@ -369,7 +401,7 @@ int getColumns() { originalFiles = Collections.emptyList(); if (mergerOptions.getCopyIndex() > 0) { //the split is from something other than the 1st file of the logical bucket - compute offset - + AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(), conf, validTxnList, false, true); for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) { @@ -378,13 +410,13 @@ int getColumns() { if (bucketOptions.getBucket() != bucket) { continue; } - if(haveSeenCurrentFile) { + if (haveSeenCurrentFile) { //if here we already saw current file and now found another file for the same bucket //so the current file is not the last file of the logical bucket isLastFileForThisBucket = false; break; } - if(f.getFileStatus().getPath().equals(mergerOptions.getBucketPath())) { + if (f.getFileStatus().getPath().equals(mergerOptions.getBucketPath())) { /** * found the file whence the current split is from so we're done * counting {@link rowIdOffset} @@ -406,8 +438,7 @@ int getColumns() { * the key. Clear? */ if (minKey != null) { minKey.setRowId(minKey.getRowId() + rowIdOffset); - } - else { + } else { /** * If this is not the 1st file, set minKey 1 less than the start of current file * (Would not need to set minKey if we knew that there are no delta files) @@ -422,21 +453,20 @@ int getColumns() { isLastFileForThisBucket = true; AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(), conf, validTxnList, false, true); - int numFilesInBucket= 0; + int numFilesInBucket = 0; for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) { AcidOutputFormat.Options bucketOptions = AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf); if (bucketOptions.getBucket() == bucket) { numFilesInBucket++; - if(numFilesInBucket > 1) { + if (numFilesInBucket > 1) { isLastFileForThisBucket = false; break; } } } } - originalFileRecordReader = recordReader; - if(!isLastFileForThisBucket && maxKey == null) { + if (!isLastFileForThisBucket && maxKey == null) { /* * If this is the last file for this bucket, maxKey == null means the split is the tail * of the file so we want to leave it blank to make sure any insert events in delta @@ -448,16 +478,32 @@ int getColumns() { } this.minKey = newMinKey; this.maxKey = newMaxKey; + + //advancedToMinKey = true; + // advance the reader until we reach the minimum key + do { + next(nextRecord); + } while (nextRecord != null && + (getMinKey() != null && this.key.compareRow(getMinKey()) <= 0)); } - @Override RecordReader getRecordReader() { + @Override + public OrcStruct nextRecord() { + return nextRecord; + } + @Override + public ReaderKey getKey() { + return key; + } + @Override public RecordReader getRecordReader() { return originalFileRecordReader; } - @Override RecordIdentifier getMinKey() { + @Override public RecordIdentifier getMinKey() { return minKey; } - @Override RecordIdentifier getMaxKey() { + @Override public RecordIdentifier getMaxKey() { return maxKey; } + private boolean nextFromCurrentFile(OrcStruct next) throws IOException { if (originalFileRecordReader.hasNext()) { //RecordReader.getRowNumber() produces a file-global row number even with PPD @@ -507,8 +553,8 @@ private boolean nextFromCurrentFile(OrcStruct next) throws IOException { return false;//reached EndOfFile } @Override - void next(OrcStruct next) throws IOException { - assert advancedToMinKey : "advnaceToMinKey() was not called"; + public void next(OrcStruct next) throws IOException { + //assert advancedToMinKey : "advnaceToMinKey() was not called"; while(true) { if(nextFromCurrentFile(next)) { return; @@ -552,7 +598,7 @@ private Reader advanceToNextFile() throws IOException { } @Override - int getColumns() { + public int getColumns() { return reader.getTypes().get(0).getSubtypesCount(); } } @@ -562,8 +608,7 @@ int getColumns() { * {@link ReaderKey} ascending. The output of this Reader should a global order across these * files. The root of this tree is always the next 'file' to read from. */ - private final TreeMap readers = - new TreeMap(); + private final TreeMap readers = new TreeMap<>(); // The reader that currently has the lowest key. private ReaderPair primary; @@ -768,6 +813,7 @@ Path getBucketPath() { baseReader = null; minKey = maxKey = null; } else { + //we don't need to look for keys when compacting KeyInterval keyInterval; // find the min/max based on the offset and length (and more for 'original') if (isOriginal) { @@ -784,15 +830,15 @@ Path getBucketPath() { pair = new OriginalReaderPair(key, reader, bucket, keyInterval.getMinKey(), keyInterval.getMaxKey(), options, mergerOptions, conf, validTxnList); } else { - pair = new ReaderPair(key, reader, bucket, keyInterval.getMinKey(), keyInterval.getMaxKey(), + pair = new ReaderPairAcid(key, reader, bucket, keyInterval.getMinKey(), keyInterval.getMaxKey(), eventOptions, 0); } minKey = pair.getMinKey(); maxKey = pair.getMaxKey(); LOG.info("updated min key = " + keyInterval.getMinKey() + ", max key = " + keyInterval.getMaxKey()); - pair.advnaceToMinKey(); + //pair.advnaceToMinKey(); // if there is at least one record, put it in the map - if (pair.nextRecord != null) { + if (pair.nextRecord() != null) { readers.put(key, pair); } baseReader = pair.getRecordReader(); @@ -821,10 +867,10 @@ Path getBucketPath() { deltaEventOptions = eventOptions.clone().searchArgument(null, null); } } - ReaderPair deltaPair; - deltaPair = new ReaderPair(key, deltaReader, bucket, minKey, + ReaderPairAcid deltaPair; + deltaPair = new ReaderPairAcid(key, deltaReader, bucket, minKey, maxKey, deltaEventOptions != null ? deltaEventOptions : eventOptions, deltaDir.getStatementId()); - deltaPair.advnaceToMinKey(); + //deltaPair.advnaceToMinKey(); if (deltaPair.nextRecord != null) { readers.put(key, deltaPair); } @@ -876,8 +922,8 @@ public boolean next(RecordIdentifier recordIdentifier, while (keysSame && primary != null) { // The primary's nextRecord is the next value to return - OrcStruct current = primary.nextRecord; - recordIdentifier.set(primary.key); + OrcStruct current = primary.nextRecord(); + recordIdentifier.set(primary.getKey()); // Advance the primary reader to the next record primary.next(extraValue); @@ -888,12 +934,12 @@ public boolean next(RecordIdentifier recordIdentifier, // now that the primary reader has advanced, we need to see if we // continue to read it or move to the secondary. - if (primary.nextRecord == null || - primary.key.compareTo(secondaryKey) > 0) { + if (primary.nextRecord() == null || + primary.getKey().compareTo(secondaryKey) > 0) { // if the primary isn't done, push it back into the readers - if (primary.nextRecord != null) { - readers.put(primary.key, primary); + if (primary.nextRecord() != null) { + readers.put(primary.getKey(), primary); } // update primary and secondaryKey @@ -960,10 +1006,10 @@ public long getPos() throws IOException { @Override public void close() throws IOException { if (primary != null) { - primary.recordReader.close(); + primary.getRecordReader().close(); } for(ReaderPair pair: readers.values()) { - pair.recordReader.close(); + pair.getRecordReader().close(); } } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java index 584bd3bbdf..71afc8d2cc 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java @@ -38,10 +38,8 @@ import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.OriginalReaderPair; import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.ReaderKey; import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.ReaderPair; -import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.IntWritable; @@ -187,25 +185,25 @@ public void testReaderPair() throws Exception { Reader reader = createMockReader(); RecordIdentifier minKey = new RecordIdentifier(10, 20, 30); RecordIdentifier maxKey = new RecordIdentifier(40, 50, 60); - ReaderPair pair = new ReaderPair(key, reader, 20, minKey, maxKey, + ReaderPair pair = new OrcRawRecordMerger.ReaderPairAcid(key, reader, 20, minKey, maxKey, new Reader.Options(), 0); - pair.advnaceToMinKey(); - RecordReader recordReader = pair.recordReader; + //pair.advnaceToMinKey(); + RecordReader recordReader = pair.getRecordReader(); assertEquals(10, key.getTransactionId()); assertEquals(20, key.getBucketId()); assertEquals(40, key.getRowId()); assertEquals(120, key.getCurrentTransactionId()); - assertEquals("third", value(pair.nextRecord)); + assertEquals("third", value(pair.nextRecord())); - pair.next(pair.nextRecord); + pair.next(pair.nextRecord()); assertEquals(40, key.getTransactionId()); assertEquals(50, key.getBucketId()); assertEquals(60, key.getRowId()); assertEquals(130, key.getCurrentTransactionId()); - assertEquals("fourth", value(pair.nextRecord)); + assertEquals("fourth", value(pair.nextRecord())); - pair.next(pair.nextRecord); - assertEquals(null, pair.nextRecord); + pair.next(pair.nextRecord()); + assertEquals(null, pair.nextRecord()); Mockito.verify(recordReader).close(); } @@ -214,46 +212,46 @@ public void testReaderPairNoMin() throws Exception { ReaderKey key = new ReaderKey(); Reader reader = createMockReader(); - ReaderPair pair = new ReaderPair(key, reader, 20, null, null, + ReaderPair pair = new OrcRawRecordMerger.ReaderPairAcid(key, reader, 20, null, null, new Reader.Options(), 0); - pair.advnaceToMinKey(); - RecordReader recordReader = pair.recordReader; + //pair.advnaceToMinKey(); + RecordReader recordReader = pair.getRecordReader(); assertEquals(10, key.getTransactionId()); assertEquals(20, key.getBucketId()); assertEquals(20, key.getRowId()); assertEquals(100, key.getCurrentTransactionId()); - assertEquals("first", value(pair.nextRecord)); + assertEquals("first", value(pair.nextRecord())); - pair.next(pair.nextRecord); + pair.next(pair.nextRecord()); assertEquals(10, key.getTransactionId()); assertEquals(20, key.getBucketId()); assertEquals(30, key.getRowId()); assertEquals(110, key.getCurrentTransactionId()); - assertEquals("second", value(pair.nextRecord)); + assertEquals("second", value(pair.nextRecord())); - pair.next(pair.nextRecord); + pair.next(pair.nextRecord()); assertEquals(10, key.getTransactionId()); assertEquals(20, key.getBucketId()); assertEquals(40, key.getRowId()); assertEquals(120, key.getCurrentTransactionId()); - assertEquals("third", value(pair.nextRecord)); + assertEquals("third", value(pair.nextRecord())); - pair.next(pair.nextRecord); + pair.next(pair.nextRecord()); assertEquals(40, key.getTransactionId()); assertEquals(50, key.getBucketId()); assertEquals(60, key.getRowId()); assertEquals(130, key.getCurrentTransactionId()); - assertEquals("fourth", value(pair.nextRecord)); + assertEquals("fourth", value(pair.nextRecord())); - pair.next(pair.nextRecord); + pair.next(pair.nextRecord()); assertEquals(40, key.getTransactionId()); assertEquals(50, key.getBucketId()); assertEquals(61, key.getRowId()); assertEquals(140, key.getCurrentTransactionId()); - assertEquals("fifth", value(pair.nextRecord)); + assertEquals("fifth", value(pair.nextRecord())); - pair.next(pair.nextRecord); - assertEquals(null, pair.nextRecord); + pair.next(pair.nextRecord()); + assertEquals(null, pair.nextRecord()); Mockito.verify(recordReader).close(); } @@ -299,23 +297,23 @@ public void testOriginalReaderPair() throws Exception { fs.create(root); ReaderPair pair = new OriginalReaderPair(key, reader, 10, minKey, maxKey, new Reader.Options().include(includes), new OrcRawRecordMerger.Options().rootPath(root), conf, new ValidReadTxnList()); - pair.advnaceToMinKey(); - RecordReader recordReader = pair.recordReader; + //pair.advnaceToMinKey(); + RecordReader recordReader = pair.getRecordReader(); assertEquals(0, key.getTransactionId()); assertEquals(10, key.getBucketId()); assertEquals(2, key.getRowId()); assertEquals(0, key.getCurrentTransactionId()); - assertEquals("third", value(pair.nextRecord)); + assertEquals("third", value(pair.nextRecord())); - pair.next(pair.nextRecord); + pair.next(pair.nextRecord()); assertEquals(0, key.getTransactionId()); assertEquals(10, key.getBucketId()); assertEquals(3, key.getRowId()); assertEquals(0, key.getCurrentTransactionId()); - assertEquals("fourth", value(pair.nextRecord)); + assertEquals("fourth", value(pair.nextRecord())); - pair.next(pair.nextRecord); - assertEquals(null, pair.nextRecord); + pair.next(pair.nextRecord()); + assertEquals(null, pair.nextRecord()); Mockito.verify(recordReader).close(); } @@ -334,44 +332,44 @@ public void testOriginalReaderPairNoMin() throws Exception { fs.create(root); ReaderPair pair = new OriginalReaderPair(key, reader, 10, null, null, new Reader.Options(), new OrcRawRecordMerger.Options().rootPath(root), conf, new ValidReadTxnList()); - pair.advnaceToMinKey(); - assertEquals("first", value(pair.nextRecord)); + //pair.advnaceToMinKey(); + assertEquals("first", value(pair.nextRecord())); assertEquals(0, key.getTransactionId()); assertEquals(10, key.getBucketId()); assertEquals(0, key.getRowId()); assertEquals(0, key.getCurrentTransactionId()); - pair.next(pair.nextRecord); - assertEquals("second", value(pair.nextRecord)); + pair.next(pair.nextRecord()); + assertEquals("second", value(pair.nextRecord())); assertEquals(0, key.getTransactionId()); assertEquals(10, key.getBucketId()); assertEquals(1, key.getRowId()); assertEquals(0, key.getCurrentTransactionId()); - pair.next(pair.nextRecord); - assertEquals("third", value(pair.nextRecord)); + pair.next(pair.nextRecord()); + assertEquals("third", value(pair.nextRecord())); assertEquals(0, key.getTransactionId()); assertEquals(10, key.getBucketId()); assertEquals(2, key.getRowId()); assertEquals(0, key.getCurrentTransactionId()); - pair.next(pair.nextRecord); - assertEquals("fourth", value(pair.nextRecord)); + pair.next(pair.nextRecord()); + assertEquals("fourth", value(pair.nextRecord())); assertEquals(0, key.getTransactionId()); assertEquals(10, key.getBucketId()); assertEquals(3, key.getRowId()); assertEquals(0, key.getCurrentTransactionId()); - pair.next(pair.nextRecord); - assertEquals("fifth", value(pair.nextRecord)); + pair.next(pair.nextRecord()); + assertEquals("fifth", value(pair.nextRecord())); assertEquals(0, key.getTransactionId()); assertEquals(10, key.getBucketId()); assertEquals(4, key.getRowId()); assertEquals(0, key.getCurrentTransactionId()); - pair.next(pair.nextRecord); - assertEquals(null, pair.nextRecord); - Mockito.verify(pair.recordReader).close(); + pair.next(pair.nextRecord()); + assertEquals(null, pair.nextRecord()); + Mockito.verify(pair.getRecordReader()).close(); } @Test @@ -438,7 +436,7 @@ public void testNewBase() throws Exception { OrcRawRecordMerger merger = new OrcRawRecordMerger(conf, false, reader, false, 10, createMaximalTxnList(), new Reader.Options().range(1000, 1000), null, new OrcRawRecordMerger.Options()); - RecordReader rr = merger.getCurrentReader().recordReader; + RecordReader rr = merger.getCurrentReader().getRecordReader(); assertEquals(0, merger.getOtherReaders().size()); assertEquals(new RecordIdentifier(10, 20, 30), merger.getMinKey());