diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java index 142c2d21bc..fe98e2277d 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java @@ -19,17 +19,23 @@ package org.apache.hadoop.hive.ql; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -46,9 +52,13 @@ import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator; import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcFile; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.orc.Reader; import org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.orc.OrcProto; import org.apache.tez.mapreduce.hadoop.MRJobConfig; import org.junit.After; import org.junit.Assert; @@ -873,6 +883,40 @@ public void testCrudMajorCompactionSplitGrouper() throws Exception { Assert.assertEquals("compacted read", rs, rsCompact); } + /** + * Tests the OrcInputFormat.isOrignal method for files in ACID and Non-ACID tables. + * @throws IOException If there is a file reading error + */ + @Test + public void testIsOriginal() throws IOException { + assertIsOriginal(new Path(TEST_WAREHOUSE_DIR, Table.ACIDTBL.name), false); + assertIsOriginal(new Path(TEST_WAREHOUSE_DIR, Table.NONACIDORCTBL.name), true); + } + + /** + * Checks if the file format is original or ACID file based on OrcInputFormat static methods. + * @param path The file to check + * @param expected The expected result of the isOriginal + * @throws IOException Error when reading the file + */ + private void assertIsOriginal(Path path, boolean expected) throws FileNotFoundException, IOException { + FileSystem fs = FileSystem.get(hiveConf); + RemoteIterator lfs = fs.listFiles(path, true); + boolean foundAnyFile = false; + while (lfs.hasNext()) { + LocatedFileStatus lf = lfs.next(); + Path file = lf.getPath(); + if (!file.getName().startsWith(".") && !file.getName().startsWith("_")) { + Reader reader = OrcFile.createReader(file, OrcFile.readerOptions(new Configuration())); + OrcProto.Footer footer = reader.getFileTail().getFooter(); + assertEquals("Reader based original check", expected, OrcInputFormat.isOriginal(reader)); + assertEquals("Footer based original check", expected, OrcInputFormat.isOriginal(footer)); + foundAnyFile = true; + } + } + assertTrue("Checking if any file found to check", foundAnyFile); + } + private void restartSessionAndDriver(HiveConf conf) throws Exception { SessionState ss = SessionState.get(); if (ss != null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 9dac185067..4f7732b2a1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.io.orc; +import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.hive.common.BlobStorageUtils; import org.apache.hadoop.hive.common.NoDynamicValuesException; import org.apache.hadoop.fs.PathFilter; @@ -120,7 +121,6 @@ import org.apache.orc.FileFormatException; import org.apache.orc.OrcProto; import org.apache.orc.OrcProto.Footer; -import org.apache.orc.OrcProto.Type; import org.apache.orc.OrcUtils; import org.apache.orc.StripeInformation; import org.apache.orc.StripeStatistics; @@ -338,20 +338,26 @@ public static RecordReader createReaderFromFile(Reader file, return file.rowsOptions(options, conf); } + /** + * Check if the given file is original orc file or an ACID file. + * @param file The file reader to check + * @return false if an ACID file, true if a simple orc file + */ public static boolean isOriginal(Reader file) { - return !file.hasMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME); + return !CollectionUtils.isEqualCollection(file.getSchema().getFieldNames(), + OrcRecordUpdater.ALL_ACID_ROW_NAMES); } + /** + * Check if the given file is original orc file or an ACID file. + * @param footer The footer of the given file to check + * @return false if an ACID file, true if a simple orc file + */ public static boolean isOriginal(Footer footer) { - for(OrcProto.UserMetadataItem item: footer.getMetadataList()) { - if (item.hasName() && item.getName().equals(OrcRecordUpdater.ACID_KEY_INDEX_NAME)) { - return true; - } - } - return false; + return !CollectionUtils.isEqualCollection(footer.getTypesList().get(0).getFieldNamesList(), + OrcRecordUpdater.ALL_ACID_ROW_NAMES); } - public static boolean[] genIncludedColumns(TypeDescription readerSchema, List included) { return genIncludedColumns(readerSchema, included, null); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 21fe9ceff3..6472e8058a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -24,6 +24,7 @@ import java.nio.charset.CharsetDecoder; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Properties; @@ -83,6 +84,20 @@ final static int ROW_ID = 3; final static int CURRENT_WRITEID = 4; public static final int ROW = 5; + static final String OPERATION_FIELD_NAME = "operation"; + static final String ORIGINAL_WRITEID_FIELD_NAME = "originalTransaction"; + static final String BUCKET_FIELD_NAME = "bucket"; + static final String ROW_ID_FIELD_NAME = "rowId"; + static final String CURRENT_WRITEID_FIELD_NAME = "currentTransaction"; + static final String ROW_FIELD_NAME = "row"; + public static final Collection ALL_ACID_ROW_NAMES = Arrays.asList( + OrcRecordUpdater.BUCKET_FIELD_NAME, + OrcRecordUpdater.CURRENT_WRITEID_FIELD_NAME, + OrcRecordUpdater.ORIGINAL_WRITEID_FIELD_NAME, + OrcRecordUpdater.OPERATION_FIELD_NAME, + OrcRecordUpdater.ROW_FIELD_NAME, + OrcRecordUpdater.ROW_ID_FIELD_NAME); + /** * total number of fields (above) */ @@ -190,17 +205,17 @@ OrcOptions orcOptions(OrcFile.WriterOptions opts) { */ static StructObjectInspector createEventObjectInspector(ObjectInspector rowInspector) { List fields = new ArrayList(); - fields.add(new OrcStruct.Field("operation", + fields.add(new OrcStruct.Field(OPERATION_FIELD_NAME, PrimitiveObjectInspectorFactory.writableIntObjectInspector, OPERATION)); - fields.add(new OrcStruct.Field("originalTransaction", + fields.add(new OrcStruct.Field(ORIGINAL_WRITEID_FIELD_NAME, PrimitiveObjectInspectorFactory.writableLongObjectInspector, ORIGINAL_WRITEID)); - fields.add(new OrcStruct.Field("bucket", + fields.add(new OrcStruct.Field(BUCKET_FIELD_NAME, PrimitiveObjectInspectorFactory.writableIntObjectInspector, BUCKET)); - fields.add(new OrcStruct.Field("rowId", + fields.add(new OrcStruct.Field(ROW_ID_FIELD_NAME, PrimitiveObjectInspectorFactory.writableLongObjectInspector, ROW_ID)); - fields.add(new OrcStruct.Field("currentTransaction", + fields.add(new OrcStruct.Field(CURRENT_WRITEID_FIELD_NAME, PrimitiveObjectInspectorFactory.writableLongObjectInspector, CURRENT_WRITEID)); - fields.add(new OrcStruct.Field("row", rowInspector, ROW)); + fields.add(new OrcStruct.Field(ROW_FIELD_NAME, rowInspector, ROW)); return new OrcStruct.OrcStructInspector(fields); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 1795bb5457..dc18ba1911 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -318,11 +318,11 @@ private void setSARG(OrcRawRecordMerger.KeyInterval keyInterval, RecordIdentifier k = keyInterval.getMinKey(); b = SearchArgumentFactory.newBuilder(); b.startAnd() //not(ot < 7) -> ot >=7 - .startNot().lessThan("originalTransaction", + .startNot().lessThan(OrcRecordUpdater.ORIGINAL_WRITEID_FIELD_NAME, PredicateLeaf.Type.LONG, k.getWriteId()).end(); b.startNot().lessThan( - "bucket", PredicateLeaf.Type.LONG, minBucketProp).end(); - b.startNot().lessThan("rowId", + OrcRecordUpdater.BUCKET_FIELD_NAME, PredicateLeaf.Type.LONG, minBucketProp).end(); + b.startNot().lessThan(OrcRecordUpdater.ROW_ID_FIELD_NAME, PredicateLeaf.Type.LONG, minRowId).end(); b.end(); } @@ -332,16 +332,20 @@ private void setSARG(OrcRawRecordMerger.KeyInterval keyInterval, b = SearchArgumentFactory.newBuilder(); } b.startAnd().lessThanEquals( - "originalTransaction", PredicateLeaf.Type.LONG, k.getWriteId()); - b.lessThanEquals("bucket", PredicateLeaf.Type.LONG, maxBucketProp); - b.lessThanEquals("rowId", PredicateLeaf.Type.LONG, maxRowId); + OrcRecordUpdater.ORIGINAL_WRITEID_FIELD_NAME, PredicateLeaf.Type.LONG, k.getWriteId()); + b.lessThanEquals(OrcRecordUpdater.BUCKET_FIELD_NAME, PredicateLeaf.Type.LONG, maxBucketProp); + b.lessThanEquals(OrcRecordUpdater.ROW_ID_FIELD_NAME, PredicateLeaf.Type.LONG, maxRowId); b.end(); } if(b != null) { deleteEventSarg = b.build(); LOG.info("deleteReader SARG(" + deleteEventSarg + ") "); deleteEventReaderOptions.searchArgument(deleteEventSarg, - new String[] {"originalTransaction", "bucket", "rowId"}); + new String[] { + OrcRecordUpdater.ORIGINAL_WRITEID_FIELD_NAME, + OrcRecordUpdater.BUCKET_FIELD_NAME, + OrcRecordUpdater.ROW_ID_FIELD_NAME + }); return; } deleteEventReaderOptions.searchArgument(null, null); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java index 1656a5b80e..cbed1fc5f8 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java @@ -395,8 +395,13 @@ public void testNewBase() throws Exception { typeBuilder.setKind(OrcProto.Type.Kind.STRUCT).addSubtypes(1) .addSubtypes(2).addSubtypes(3).addSubtypes(4).addSubtypes(5) .addSubtypes(6); - typeBuilder.addAllFieldNames(Lists.newArrayList("operation", "originalTransaction", "bucket", - "rowId", "currentTransaction", "row")); + typeBuilder.addAllFieldNames(Lists.newArrayList( + OrcRecordUpdater.OPERATION_FIELD_NAME, + OrcRecordUpdater.CURRENT_WRITEID_FIELD_NAME, + OrcRecordUpdater.BUCKET_FIELD_NAME, + OrcRecordUpdater.ROW_ID_FIELD_NAME, + OrcRecordUpdater.CURRENT_WRITEID_FIELD_NAME, + OrcRecordUpdater.ROW_FIELD_NAME)); types.add(typeBuilder.build()); types.add(null); types.add(null); @@ -478,15 +483,15 @@ public void testNewBase() throws Exception { List fields = eventObjectInspector.getAllStructFieldRefs(); assertEquals(OrcRecordUpdater.FIELDS, fields.size()); - assertEquals("operation", + assertEquals(OrcRecordUpdater.OPERATION_FIELD_NAME, fields.get(OrcRecordUpdater.OPERATION).getFieldName()); - assertEquals("currentTransaction", + assertEquals(OrcRecordUpdater.CURRENT_WRITEID_FIELD_NAME, fields.get(OrcRecordUpdater.CURRENT_WRITEID).getFieldName()); - assertEquals("originalTransaction", + assertEquals(OrcRecordUpdater.ORIGINAL_WRITEID_FIELD_NAME, fields.get(OrcRecordUpdater.ORIGINAL_WRITEID).getFieldName()); - assertEquals("bucket", + assertEquals(OrcRecordUpdater.BUCKET_FIELD_NAME, fields.get(OrcRecordUpdater.BUCKET).getFieldName()); - assertEquals("rowId", + assertEquals(OrcRecordUpdater.ROW_ID_FIELD_NAME, fields.get(OrcRecordUpdater.ROW_ID).getFieldName()); StructObjectInspector rowObjectInspector = (StructObjectInspector) fields.get(OrcRecordUpdater.ROW)