diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 970af0e140..d850062377 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -47,6 +47,7 @@ import org.apache.orc.OrcConf; import org.apache.orc.impl.AcidStats; import org.apache.orc.impl.OrcAcidUtils; +import org.apache.orc.impl.WriterImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -559,6 +560,17 @@ public SerDeStats getStats() { int lastBucket; long lastRowId; AcidStats acidStats = new AcidStats(); + /** + * {@link #preStripeWrite(OrcFile.WriterContext)} is normally called by the + * {@link org.apache.orc.MemoryManager} except on close(). + * {@link org.apache.orc.impl.WriterImpl#close()} calls preFooterWrite() before it calls + * {@link WriterImpl#flushStripe()} which causes the {@link #ACID_KEY_INDEX_NAME} index to + * have the last entry missing. It should be also fixed in ORC but that requires upgrading + * the ORC jars to have effect. + * + * This is used to decide if we need to make preStripeWrite() call here. + */ + private long numKeysCurrentStripe = 0; KeyIndexBuilder(String name) { this.builderName = name; @@ -572,11 +584,15 @@ public void preStripeWrite(OrcFile.WriterContext context lastKey.append(','); lastKey.append(lastRowId); lastKey.append(';'); + numKeysCurrentStripe = 0; } @Override public void preFooterWrite(OrcFile.WriterContext context ) throws IOException { + if(numKeysCurrentStripe > 0) { + preStripeWrite(context); + } context.getWriter().addUserMetadata(ACID_KEY_INDEX_NAME, UTF8.encode(lastKey.toString())); context.getWriter().addUserMetadata(OrcAcidUtils.ACID_STATS, @@ -600,6 +616,7 @@ void addKey(int op, long transaction, int bucket, long rowId) { lastTransaction = transaction; lastBucket = bucket; lastRowId = rowId; + numKeysCurrentStripe++; } } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index 0ac29fad94..c1f2e6aae6 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -48,6 +48,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.conf.HiveConf; @@ -68,6 +69,7 @@ import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; @@ -115,6 +117,7 @@ import org.apache.hadoop.util.Progressable; import org.apache.orc.OrcConf; import org.apache.orc.OrcProto; +import org.apache.orc.StripeInformation; import org.apache.orc.TypeDescription; import org.junit.Before; import org.junit.Rule; @@ -3951,4 +3954,119 @@ public void testColumnProjectionWithAcid() throws Exception { assertEquals(1000, record); reader.close(); } + + @Test + public void testAcidReadPastLastStripeOffset() throws Exception { + Path baseDir = new Path(workDir, "base_00100"); + testFilePath = new Path(baseDir, "bucket_00000"); + fs.mkdirs(baseDir); + fs.delete(testFilePath, true); + TypeDescription fileSchema = + TypeDescription.fromString("struct,d:string>>"); + + OrcRecordUpdater.KeyIndexBuilder indexBuilder = new OrcRecordUpdater.KeyIndexBuilder("test"); + OrcFile.WriterOptions options = OrcFile.writerOptions(conf) + .fileSystem(fs) + .setSchema(fileSchema) + .compress(org.apache.orc.CompressionKind.NONE) + .callback(indexBuilder) + .stripeSize(128); + // Create ORC file with small stripe size so we can write multiple stripes. + Writer writer = OrcFile.createWriter(testFilePath, options); + VectorizedRowBatch batch = fileSchema.createRowBatch(1000); + batch.size = 1000; + StructColumnVector scv = (StructColumnVector)batch.cols[5]; + // operation + batch.cols[0].isRepeating = true; + ((LongColumnVector) batch.cols[0]).vector[0] = OrcRecordUpdater.INSERT_OPERATION; + // original transaction + batch.cols[1].isRepeating = true; + ((LongColumnVector) batch.cols[1]).vector[0] = 1; + // bucket + batch.cols[2].isRepeating = true; + ((LongColumnVector) batch.cols[2]).vector[0] = BucketCodec.V1.encode(new AcidOutputFormat + .Options(conf).bucket(0).statementId(0)); + // current transaction + batch.cols[4].isRepeating = true; + ((LongColumnVector) batch.cols[4]).vector[0] = 1; + + LongColumnVector lcv = (LongColumnVector) + ((StructColumnVector) scv.fields[1]).fields[0]; + for(int r=0; r < 1000; r++) { + // row id + ((LongColumnVector) batch.cols[3]).vector[r] = r; + // a + ((LongColumnVector) scv.fields[0]).vector[r] = r * 42; + // b.c + lcv.vector[r] = r * 10001; + // d + ((BytesColumnVector) scv.fields[2]).setVal(r, + Integer.toHexString(r).getBytes(StandardCharsets.UTF_8)); + indexBuilder.addKey(OrcRecordUpdater.INSERT_OPERATION, + 1, (int)(((LongColumnVector) batch.cols[2]).vector[0]), r); + } + + // Minimum 5000 rows per stripe. + for (int idx = 0; idx < 8; ++idx) { + writer.addRowBatch(batch); + // bucket + batch.cols[2].isRepeating = true; + ((LongColumnVector) batch.cols[2]).vector[0] = BucketCodec.V1.encode(new AcidOutputFormat + .Options(conf).bucket(0).statementId(idx + 1)); + for(long row_id : ((LongColumnVector) batch.cols[3]).vector) { + indexBuilder.addKey(OrcRecordUpdater.INSERT_OPERATION, + 1, (int)(((LongColumnVector) batch.cols[2]).vector[0]), row_id); + } + } + writer.close(); + //before + //User Metadata: + // hive.acid.key.index=1,536870916,999; + // hive.acid.stats=9000,0,0 + //now + //User Metadata: + // hive.acid.key.index=1,536870916,999;1,536870920,999; + // hive.acid.stats=9000,0,0 + long fileLength = fs.getFileStatus(testFilePath).getLen(); + + // Find the last stripe. + Reader orcReader = OrcFile.createReader(fs, testFilePath); + List stripes = orcReader.getStripes(); + StripeInformation lastStripe = stripes.get(stripes.size() - 1); + long lastStripeOffset = lastStripe.getOffset(); + long lastStripeLength = lastStripe.getLength(); + + // test with same schema with include + conf.set(ValidTxnList.VALID_TXNS_KEY, "100:99:"); + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, "a,b,d"); + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "int,struct,string"); + conf.set(ColumnProjectionUtils.READ_ALL_COLUMNS, "false"); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0,2"); + + LOG.info("Last stripe " + stripes.size() + + ", offset " + lastStripeOffset + ", length " + lastStripeLength); + // Specify an OrcSplit that starts beyond the offset of the last stripe. + OrcSplit split = new OrcSplit(testFilePath, null, lastStripeOffset + 1, lastStripeLength, + new String[0], null, false, true, + new ArrayList(), fileLength, fileLength, workDir); + OrcInputFormat inputFormat = new OrcInputFormat(); + AcidInputFormat.RowReader reader = inputFormat.getReader(split, + new AcidInputFormat.Options(conf)); + + int record = 0; + RecordIdentifier id = reader.createKey(); + OrcStruct struct = reader.createValue(); + // Iterate through any records. + // Because our read offset was past the stripe offset, the rows from the last stripe will + // not be read. Thus 0 records. + while (reader.next(id, struct)) { + record += 1; + } + assertEquals(0, record); + + reader.close(); + } }