diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index b9094bf..efde2db 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -146,7 +146,7 @@ public int compareTo(RecordIdentifier other) { private boolean isSameRow(ReaderKey other) { return compareRow(other) == 0 && currentTransactionId == other.currentTransactionId; } - + public long getCurrentTransactionId() { return currentTransactionId; } @@ -285,7 +285,7 @@ void next(OrcStruct next) throws IOException { ((LongWritable) next.getFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION)) .set(0); ((LongWritable) next.getFieldValue(OrcRecordUpdater.ROW_ID)) - .set(0); + .set(nextRowId); nextRecord.setFieldValue(OrcRecordUpdater.ROW, recordReader.next(OrcRecordUpdater.getRow(next))); } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index d48e441..e796250 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -18,6 +18,17 @@ package org.apache.hadoop.hive.ql; +import java.io.File; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; @@ -36,9 +47,9 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; -import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.txn.AcidCompactionHistoryService; @@ -54,18 +65,6 @@ import org.junit.Test; import org.junit.rules.TestName; -import java.io.File; -import java.io.IOException; -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - /** * TODO: this should be merged with TestTxnCommands once that is checked in * specifically the tests; the supporting code here is just a clone of TestTxnCommands @@ -271,6 +270,35 @@ public void testNonAcidInsert() throws Exception { List rs1 = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); } + @Test + public void testOriginalFileReaderWhenNonAcidConvertedToAcid() throws Exception { + // 1. Insert five rows to Non-ACID table. + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2),(3,4),(5,6),(7,8),(9,10)"); + + // 2. Convert NONACIDORCTBL to ACID table. + runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b = b*2 where b in (4,10)"); + runStatementOnDriver("delete from " + Table.NONACIDORCTBL + " where a = 7"); + + List rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL + " order by a,b"); + int[][] resultData = new int[][] {{1,2}, {3,8}, {5,6}, {9,20}}; + Assert.assertEquals(stringifyValues(resultData), rs); + // 3. Perform a major compaction. + runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'"); + runWorker(hiveConf); + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); + ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize()); + Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState()); + + // 3. Perform a delete. + runStatementOnDriver("delete from " + Table.NONACIDORCTBL + " where a = 1"); + + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL + " order by a,b"); + resultData = new int[][] {{3,8}, {5,6}, {9,20}}; + Assert.assertEquals(stringifyValues(resultData), rs); + } + /** * Test the query correctness and directory layout after ACID table conversion and MAJOR compaction * 1. Insert a row to Non-ACID table diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index a675a34..6648829 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -3466,4 +3466,83 @@ public void testACIDReaderFooterSerializeWithDeltas() throws Exception { // revert back to local fs conf.set("fs.defaultFS", "file:///"); } + + /** + * also see {@link TestOrcFile#testPredicatePushdown()} + * This tests that {@link RecordReader#getRowNumber()} works with multiple splits + * @throws Exception + */ + @Test + public void testRowNumberUniquenessInDifferentSplits() throws Exception { + Properties properties = new Properties(); + properties.setProperty("columns", "x,y"); + properties.setProperty("columns.types", "int:int"); + StructObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = (StructObjectInspector) + ObjectInspectorFactory.getReflectionObjectInspector(MyRow.class, + ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + // Save the conf variable values so that they can be restored later. + long oldDefaultStripeSize = conf.getLong(HiveConf.ConfVars.HIVE_ORC_DEFAULT_STRIPE_SIZE.varname, -1L); + long oldMaxSplitSize = conf.getLong(HiveConf.ConfVars.MAPREDMAXSPLITSIZE.varname, -1L); + + // Set the conf variable values for this test. + long newStripeSize = 10000L; // 10000 bytes per stripe + long newMaxSplitSize = 100L; // 1024 bytes per split + conf.setLong(HiveConf.ConfVars.HIVE_ORC_DEFAULT_STRIPE_SIZE.varname, newStripeSize); + conf.setLong(HiveConf.ConfVars.MAPREDMAXSPLITSIZE.varname, newMaxSplitSize); + + SerDe serde = new OrcSerde(); + HiveOutputFormat outFormat = new OrcOutputFormat(); + org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter writer = + outFormat.getHiveRecordWriter(conf, testFilePath, MyRow.class, true, + properties, Reporter.NULL); + // The following loop should create 20 stripes in the orc file. + for (int i = 0; i < newStripeSize * 10; ++i) { + writer.write(serde.serialize(new MyRow(i,i+1), inspector)); + } + writer.close(true); + serde = new OrcSerde(); + SerDeUtils.initializeSerDe(serde, conf, properties, null); + assertEquals(OrcSerde.OrcSerdeRow.class, serde.getSerializedClass()); + inspector = (StructObjectInspector) serde.getObjectInspector(); + assertEquals("struct", inspector.getTypeName()); + InputFormat in = new OrcInputFormat(); + FileInputFormat.setInputPaths(conf, testFilePath.toString()); + int numExpectedSplits = 20; + InputSplit[] splits = in.getSplits(conf, numExpectedSplits); + assertEquals(numExpectedSplits, splits.length); + + for (int i = 0; i < numExpectedSplits; ++i) { + OrcSplit split = (OrcSplit) splits[i]; + Reader.Options orcReaderOptions = new Reader.Options(); + orcReaderOptions.range(split.getStart(), split.getLength()); + OrcFile.ReaderOptions qlReaderOptions = OrcFile.readerOptions(conf).maxLength(split.getFileLength()); + Reader reader = OrcFile.createReader(split.getPath(), qlReaderOptions); + RecordReader recordReader = reader.rowsOptions(orcReaderOptions); + for(int j = 0; recordReader.hasNext(); j++) { + long rowNum = (i * 5000) + j; + long rowNumActual = recordReader.getRowNumber(); + assertEquals("rowNum=" + rowNum, rowNum, rowNumActual); + Object row = recordReader.next(null); + } + recordReader.close(); + } + + // Reset the conf variable values that we changed for this test. + if (oldDefaultStripeSize != -1L) { + conf.setLong(HiveConf.ConfVars.HIVE_ORC_DEFAULT_STRIPE_SIZE.varname, oldDefaultStripeSize); + } else { + // this means that nothing was set for default stripe size previously, so we should unset it. + conf.unset(HiveConf.ConfVars.HIVE_ORC_DEFAULT_STRIPE_SIZE.varname); + } + if (oldMaxSplitSize != -1L) { + conf.setLong(HiveConf.ConfVars.MAPREDMAXSPLITSIZE.varname, oldMaxSplitSize); + } else { + // this means that nothing was set for default stripe size previously, so we should unset it. + conf.unset(HiveConf.ConfVars.MAPREDMAXSPLITSIZE.varname); + } + } }