diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 5700fb9325..d9c993432e 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1321,6 +1321,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVETESTMODEROLLBACKTXN("hive.test.rollbacktxn", false, "For testing only. Will mark every ACID transaction aborted", false), HIVETESTMODEFAILCOMPACTION("hive.test.fail.compaction", false, "For testing only. Will cause CompactorMR to fail.", false), HIVETESTMODEFAILHEARTBEATER("hive.test.fail.heartbeater", false, "For testing only. Will cause Heartbeater to fail.", false), + TESTMODE_BUCKET_CODEC_VERSION("hive.test.bucketcodec.version", 1, + "For testing only. Will make ACID subsystem write RecordIdentifier.bucketId in specified\n" + + "format", false), HIVEMERGEMAPFILES("hive.merge.mapfiles", true, "Merge small files at the end of a map-only job"), diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java index 571e076588..7c2cadefa7 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java @@ -19,6 +19,8 @@ import java.util.List; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; @@ -68,7 +70,9 @@ public BucketIdResolverImpl(ObjectInspector objectInspector, int recordIdColumn, @Override public Object attachBucketIdToRecord(Object record) { int bucketId = computeBucketId(record); - RecordIdentifier recordIdentifier = new RecordIdentifier(INVALID_TRANSACTION_ID, bucketId, INVALID_ROW_ID); + int bucketProperty = + BucketCodec.V1.encode(new AcidOutputFormat.Options(null).bucket(bucketId)); + RecordIdentifier recordIdentifier = new RecordIdentifier(INVALID_TRANSACTION_ID, bucketProperty, INVALID_ROW_ID); structObjectInspector.setStructFieldData(record, recordIdentifierField, recordIdentifier); return record; } diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java index 1ad0842d98..ae23153604 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.RecordUpdater; import org.apache.hadoop.util.ReflectionUtils; @@ -183,7 +184,7 @@ public void flush() throws IOException { private void reconfigureState(OperationType operationType, List newPartitionValues, Object record) throws WorkerException { RecordIdentifier newRecordIdentifier = extractRecordIdentifier(operationType, newPartitionValues, record); - int newBucketId = newRecordIdentifier.getBucketId(); + int newBucketId = newRecordIdentifier.getBucketProperty(); if (newPartitionValues == null) { newPartitionValues = Collections.emptyList(); @@ -209,8 +210,10 @@ private void reconfigureState(OperationType operationType, List newParti private RecordIdentifier extractRecordIdentifier(OperationType operationType, List newPartitionValues, Object record) throws BucketIdException { RecordIdentifier recordIdentifier = recordInspector.extractRecordIdentifier(record); + int bucketIdFromRecord = BucketCodec.determineVersion( + recordIdentifier.getBucketProperty()).decodeWriterId(recordIdentifier.getBucketProperty()); int computedBucketId = bucketIdResolver.computeBucketId(record); - if (operationType != OperationType.DELETE && recordIdentifier.getBucketId() != computedBucketId) { + if (operationType != OperationType.DELETE && bucketIdFromRecord != computedBucketId) { throw new BucketIdException("RecordIdentifier.bucketId != computed bucketId (" + computedBucketId + ") for record " + recordIdentifier + " in partition " + newPartitionValues + "."); } diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java index 8998de99e9..05cf8b7386 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java @@ -22,6 +22,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.BucketCodec; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.RecordUpdater; import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -31,20 +33,24 @@ private final long transactionId; private final Path partitionPath; - private final int bucketId; + private final int bucketProperty; private final Configuration configuration; private final int recordIdColumn; private final ObjectInspector objectInspector; private RecordUpdater updater; + /** + * @param bucketProperty - from existing {@link RecordIdentifier#getBucketProperty()} + * @throws IOException + */ public MutatorImpl(Configuration configuration, int recordIdColumn, ObjectInspector objectInspector, - AcidOutputFormat outputFormat, long transactionId, Path partitionPath, int bucketId) throws IOException { + AcidOutputFormat outputFormat, long transactionId, Path partitionPath, int bucketProperty) throws IOException { this.configuration = configuration; this.recordIdColumn = recordIdColumn; this.objectInspector = objectInspector; this.transactionId = transactionId; this.partitionPath = partitionPath; - this.bucketId = bucketId; + this.bucketProperty = bucketProperty; updater = createRecordUpdater(outputFormat); } @@ -84,10 +90,12 @@ public void close() throws IOException { @Override public String toString() { return "ObjectInspectorMutator [transactionId=" + transactionId + ", partitionPath=" + partitionPath - + ", bucketId=" + bucketId + "]"; + + ", bucketId=" + bucketProperty + "]"; } protected RecordUpdater createRecordUpdater(AcidOutputFormat outputFormat) throws IOException { + int bucketId = BucketCodec + .determineVersion(bucketProperty).decodeWriterId(bucketProperty); return outputFormat.getRecordUpdater( partitionPath, new AcidOutputFormat.Options(configuration) diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java index 686767908a..de41d344c1 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java @@ -33,7 +33,6 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.io.AcidInputFormat.AcidRecordReader; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.IOConstants; @@ -148,7 +147,7 @@ public void assertMaxTransactionId(long expectedMaxTransactionId) { while (recordReader.next(key, value)) { RecordIdentifier recordIdentifier = recordReader.getRecordIdentifier(); Record record = new Record(new RecordIdentifier(recordIdentifier.getTransactionId(), - recordIdentifier.getBucketId(), recordIdentifier.getRowId()), value.toString()); + recordIdentifier.getBucketProperty(), recordIdentifier.getRowId()), value.toString()); System.out.println(record); records.add(record); } diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java index f1de1dfe4c..ab9f313686 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java @@ -33,6 +33,8 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hive.hcatalog.streaming.TestStreaming; import org.apache.hive.hcatalog.streaming.mutate.StreamingAssert.Factory; @@ -101,6 +103,10 @@ public void setup() throws Exception { .addColumn("msg", "string") .bucketCols(Collections.singletonList("string")); } + private static int encodeBucket(int bucketId) { + return BucketCodec.V1.encode( + new AcidOutputFormat.Options(null).bucket(bucketId)); + } @Test public void testTransactionBatchEmptyCommitPartitioned() throws Exception { @@ -242,7 +248,8 @@ public void testTransactionBatchCommitPartitioned() throws Exception { List readRecords = streamingAssertions.readRecords(); assertThat(readRecords.size(), is(1)); assertThat(readRecords.get(0).getRow(), is("{1, Hello streaming}")); - assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0L))); + assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, + encodeBucket(0), 0L))); assertThat(transaction.getState(), is(COMMITTED)); client.close(); @@ -299,7 +306,8 @@ public void testMulti() throws Exception { List readRecords = streamingAssertions.readRecords(); assertThat(readRecords.size(), is(1)); assertThat(readRecords.get(0).getRow(), is("{1, Hello streaming}")); - assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0L))); + assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, + encodeBucket(0), 0L))); // EUROPE_UK streamingAssertions = assertionFactory.newStreamingAssert(table, EUROPE_UK); @@ -310,7 +318,8 @@ public void testMulti() throws Exception { readRecords = streamingAssertions.readRecords(); assertThat(readRecords.size(), is(1)); assertThat(readRecords.get(0).getRow(), is("{2, Hello streaming}")); - assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0L))); + assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, + encodeBucket(0), 0L))); // EUROPE_FRANCE streamingAssertions = assertionFactory.newStreamingAssert(table, EUROPE_FRANCE); @@ -321,9 +330,11 @@ public void testMulti() throws Exception { readRecords = streamingAssertions.readRecords(); assertThat(readRecords.size(), is(2)); assertThat(readRecords.get(0).getRow(), is("{3, Hello streaming}")); - assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0L))); + assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, + encodeBucket(0), 0L))); assertThat(readRecords.get(1).getRow(), is("{4, Bonjour streaming}")); - assertThat(readRecords.get(1).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 1L))); + assertThat(readRecords.get(1).getRecordIdentifier(), is(new RecordIdentifier(1L, + encodeBucket(0), 1L))); client.close(); } @@ -369,7 +380,8 @@ public void testTransactionBatchCommitUnpartitioned() throws Exception { List readRecords = streamingAssertions.readRecords(); assertThat(readRecords.size(), is(1)); assertThat(readRecords.get(0).getRow(), is("{1, Hello streaming}")); - assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0L))); + assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, + encodeBucket(0), 0L))); assertThat(transaction.getState(), is(COMMITTED)); client.close(); @@ -499,13 +511,15 @@ public void testUpdatesAndDeletes() throws Exception { "Namaste streaming 3")); mutateCoordinator.update(ASIA_INDIA, new MutableRecord(2, "UPDATED: Namaste streaming 2", new RecordIdentifier(1L, - 0, 1L))); + encodeBucket(0), 1L))); mutateCoordinator.insert(ASIA_INDIA, asiaIndiaRecord3); - mutateCoordinator.delete(EUROPE_UK, new MutableRecord(3, "Hello streaming 1", new RecordIdentifier(1L, 0, 0L))); + mutateCoordinator.delete(EUROPE_UK, new MutableRecord(3, "Hello streaming 1", new RecordIdentifier(1L, + encodeBucket(0), 0L))); mutateCoordinator.delete(EUROPE_FRANCE, - new MutableRecord(5, "Bonjour streaming 1", new RecordIdentifier(1L, 0, 0L))); + new MutableRecord(5, "Bonjour streaming 1", new RecordIdentifier(1L, + encodeBucket(0), 0L))); mutateCoordinator.update(EUROPE_FRANCE, new MutableRecord(6, "UPDATED: Bonjour streaming 2", new RecordIdentifier( - 1L, 0, 1L))); + 1L, encodeBucket(0), 1L))); mutateCoordinator.close(); mutateTransaction.commit(); @@ -518,11 +532,14 @@ public void testUpdatesAndDeletes() throws Exception { List indiaRecords = indiaAssertions.readRecords(); assertThat(indiaRecords.size(), is(3)); assertThat(indiaRecords.get(0).getRow(), is("{1, Namaste streaming 1}")); - assertThat(indiaRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0L))); + assertThat(indiaRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, + encodeBucket(0), 0L))); assertThat(indiaRecords.get(1).getRow(), is("{2, UPDATED: Namaste streaming 2}")); - assertThat(indiaRecords.get(1).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 1L))); + assertThat(indiaRecords.get(1).getRecordIdentifier(), is(new RecordIdentifier(1L, + encodeBucket(0), 1L))); assertThat(indiaRecords.get(2).getRow(), is("{20, Namaste streaming 3}")); - assertThat(indiaRecords.get(2).getRecordIdentifier(), is(new RecordIdentifier(2L, 0, 0L))); + assertThat(indiaRecords.get(2).getRecordIdentifier(), is(new RecordIdentifier(2L, + encodeBucket(0), 0L))); StreamingAssert ukAssertions = assertionFactory.newStreamingAssert(table, EUROPE_UK); ukAssertions.assertMinTransactionId(1L); @@ -530,7 +547,8 @@ public void testUpdatesAndDeletes() throws Exception { List ukRecords = ukAssertions.readRecords(); assertThat(ukRecords.size(), is(1)); assertThat(ukRecords.get(0).getRow(), is("{4, Hello streaming 2}")); - assertThat(ukRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 1L))); + assertThat(ukRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, + encodeBucket(0), 1L))); StreamingAssert franceAssertions = assertionFactory.newStreamingAssert(table, EUROPE_FRANCE); franceAssertions.assertMinTransactionId(1L); @@ -538,7 +556,8 @@ public void testUpdatesAndDeletes() throws Exception { List franceRecords = franceAssertions.readRecords(); assertThat(franceRecords.size(), is(1)); assertThat(franceRecords.get(0).getRow(), is("{6, UPDATED: Bonjour streaming 2}")); - assertThat(franceRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 1L))); + assertThat(franceRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, + encodeBucket(0), 1L))); client.close(); } diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java index 437946b0c6..03c28a33c8 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java @@ -20,6 +20,8 @@ import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hive.hcatalog.streaming.mutate.MutableRecord; @@ -40,7 +42,9 @@ public void testAttachBucketIdToRecord() { MutableRecord record = new MutableRecord(1, "hello"); capturingBucketIdResolver.attachBucketIdToRecord(record); - assertThat(record.rowId, is(new RecordIdentifier(-1L, 1, -1L))); + assertThat(record.rowId, is(new RecordIdentifier(-1L, + BucketCodec.V1.encode(new AcidOutputFormat.Options(null).bucket(1)), + -1L))); assertThat(record.id, is(1)); assertThat(record.msg.toString(), is("hello")); } diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorImpl.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorImpl.java index 9aeeb312d9..2273e06531 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorImpl.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorImpl.java @@ -75,7 +75,7 @@ public void injectMocks() throws IOException { public void testCreatesRecordReader() throws IOException { verify(mockOutputFormat).getRecordUpdater(eq(PATH), captureOptions.capture()); Options options = captureOptions.getValue(); - assertThat(options.getBucket(), is(BUCKET_ID)); + assertThat(options.getBucketId(), is(BUCKET_ID)); assertThat(options.getConfiguration(), is((Configuration) configuration)); assertThat(options.getInspector(), is(mockObjectInspector)); assertThat(options.getRecordIdColumn(), is(RECORD_ID_COLUMN)); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 3e0943251e..4d46d65227 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; @@ -75,7 +76,6 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.Serializable; -import java.io.StringWriter; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -768,8 +768,10 @@ public void process(Object row, int tag) throws HiveException { // Find the bucket id, and switch buckets if need to ObjectInspector rowInspector = bDynParts ? subSetOI : outputObjInspector; Object recId = ((StructObjectInspector)rowInspector).getStructFieldData(row, recIdField); - int bucketNum = + int bucketProperty = bucketInspector.get(recIdInspector.getStructFieldData(recId, bucketField)); + int bucketNum = + BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty); if (fpaths.acidLastBucket != bucketNum) { fpaths.acidLastBucket = bucketNum; // Switch files diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java index b85b827424..5ba011cfd2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java @@ -50,7 +50,7 @@ private Reporter reporter; private long minimumTransactionId; private long maximumTransactionId; - private int bucket; + private int bucketId; private PrintStream dummyStream = null; private boolean oldStyle = false; private int recIdCol = -1; // Column the record identifier is in, -1 indicates no record id @@ -170,12 +170,12 @@ public Options maximumTransactionId(long max) { } /** - * The bucket that is included in this file. - * @param bucket the bucket number + * The bucketId that is included in this file. + * @param bucket the bucketId number * @return this */ public Options bucket(int bucket) { - this.bucket = bucket; + this.bucketId = bucket; return this; } @@ -275,8 +275,8 @@ public boolean isWritingDeleteDelta() { return writingDeleteDelta; } - public int getBucket() { - return bucket; + public int getBucketId() { + return bucketId; } public int getRecordIdColumn() { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index da00bb3363..7288c8c355 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -184,7 +184,7 @@ public static Path createFilename(Path directory, String subdir; if (options.getOldStyle()) { return new Path(directory, String.format(LEGACY_FILE_BUCKET_DIGITS, - options.getBucket()) + "_0"); + options.getBucketId()) + "_0"); } else if (options.isWritingBase()) { subdir = BASE_PREFIX + String.format(DELTA_DIGITS, options.getMaximumTransactionId()); @@ -205,7 +205,7 @@ public static Path createFilename(Path directory, options.getMaximumTransactionId(), options.getStatementId()); } - return createBucketFile(new Path(directory, subdir), options.getBucket()); + return createBucketFile(new Path(directory, subdir), options.getBucketId()); } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/io/BucketCodec.java ql/src/java/org/apache/hadoop/hive/ql/io/BucketCodec.java new file mode 100644 index 0000000000..2e69e734ca --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/BucketCodec.java @@ -0,0 +1,108 @@ +package org.apache.hadoop.hive.ql.io; + +/** + * This class makes sense of {@link RecordIdentifier#getBucketProperty()}. Up until ASF Hive 3.0 this + * field was simply the bucket ID. Since 3.0 it does bit packing to store several things: + * top 3 bits - version describing the format (we can only have 8). + * The rest is version specific - see below. + */ +public enum BucketCodec { + /** + * This is the "legacy" version. The whole {@code bucket} value just has the bucket ID in it. + * The numeric code for this version is 0. (Assumes bucket ID takes less than 29 bits... which + * implies top 3 bits are 000 so data written before Hive 3.0 is readable with this scheme). + */ + V0(0) { + @Override + public int decodeWriterId(int bucketProperty) { + return bucketProperty; + } + @Override + public int decodeStatementId(int bucketProperty) { + return 0; + } + @Override + public int encode(AcidOutputFormat.Options options) { + return options.getBucketId(); + } + }, + /** + * Represents format of "bucket" property in Hive 3.0. + * top 3 bits - version code. + * next 14 bits - the bucket ID + * remaining 15 bits - the statement ID - 0-based numbering of all statements within a + * transaction. Each leg of a multi-insert statement gets a separate statement ID. + * + * Constructs like Merge and Multi-Insert may have multiple tasks writing data that belongs to + * the same physical bucket file. For example, a Merge stmt with update and insert clauses, + * (and split update enabled - should be the default in 3.0). A task on behalf of insert may + * be writing a row into bucket 0 and another task in the update branch may be writing an insert + * event into bucket 0. Each of these task are writing to different delta directory - distinguished + * by statement ID. By including both bucket ID and statement ID in {@link RecordIdentifier} + * we ensure that {@link RecordIdentifier} is unique. + * + * The intent is that sorting rows by {@link RecordIdentifier} groups rows in the same physical + * bucket next to each other. + * For any row created by a given version of Hive, top 3 bits are constant. The next + * most significant bits are the bucket ID, then the statement ID. This ensures that + * {@link org.apache.hadoop.hive.ql.optimizer.SortedDynPartitionOptimizer} works which is + * designed so that each task only needs to keep 1 writer opened at a time. It could be + * configured such that a single writer sees data for multiple buckets so it must "group" data + * by bucket ID (and then sort within each bucket as required) which is achieved via sorting + * by {@link RecordIdentifier} which includes the {@link RecordIdentifier#getBucketProperty()} + * which has the actual bucket ID in the high order bits. This scheme also ensures that + * {@link org.apache.hadoop.hive.ql.exec.FileSinkOperator#process(Object, int)} works in case + * there numBuckets > numReducers. (The later could be fixed by changing how writers are + * initialized in "if (fpaths.acidLastBucket != bucketNum) {") + */ + V1(1) { + @Override + public int decodeWriterId(int bucketProperty) { + return (bucketProperty & 0b0001_1111_1111_1111_1000_0000_0000_0000) >>> 15; + } + @Override + public int decodeStatementId(int bucketProperty) { + return (bucketProperty & 0b0000_0000_0000_0000_0111_1111_1111_1111); + } + @Override + public int encode(AcidOutputFormat.Options options) { + return this.version << 29 | options.getBucketId() << 15 | + (options.getStatementId() >= 0 ? options.getStatementId() : 0); + } + }; + private static int TOP3BITS_MASK = 0b1110_0000_0000_0000_0000_0000_0000_0000; + public static BucketCodec determineVersion(int bucket) { + assert 7 << 29 == BucketCodec.TOP3BITS_MASK; + //look at top 3 bits and return appropriate enum + try { + return getCodec((BucketCodec.TOP3BITS_MASK & bucket) >>> 29); + } + catch(IllegalArgumentException ex) { + throw new IllegalArgumentException(ex.getMessage() + " Cannot decode version from " + bucket); + } + } + public static BucketCodec getCodec(int version) { + switch (version) { + case 0: + return BucketCodec.V0; + case 1: + return BucketCodec.V1; + default: + throw new IllegalArgumentException("Illegal 'bucket' format. Version=" + version); + } + } + final int version; + BucketCodec(int version) { + this.version = version; + } + + /** + * For bucketed tables this the bucketId, otherwise writerId + */ + public abstract int decodeWriterId(int bucketProperty); + public abstract int decodeStatementId(int bucketProperty); + public abstract int encode(AcidOutputFormat.Options options); + public int getVersion() { + return version; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java index 7f2c169ce5..87635c2442 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java @@ -89,7 +89,7 @@ public static void toArray(RecordIdentifier ri, Object[] struct) { return; } struct[Field.transactionId.ordinal()] = ri.getTransactionId(); - struct[Field.bucketId.ordinal()] = ri.getBucketId(); + struct[Field.bucketId.ordinal()] = ri.getBucketProperty(); struct[Field.rowId.ordinal()] = ri.getRowId(); } } @@ -142,10 +142,10 @@ public long getTransactionId() { } /** - * What was the original bucket id for the last row? - * @return the bucket id + * See {@link BucketCodec} for details + * @return the bucket value; */ - public int getBucketId() { + public int getBucketProperty() { return bucketId; } @@ -219,7 +219,16 @@ public int hashCode() { @Override public String toString() { - return "{originalTxn: " + transactionId + ", bucket: " + - bucketId + ", row: " + getRowId() + "}"; + BucketCodec codec = + BucketCodec.determineVersion(bucketId); + String s = "(" + codec.getVersion() + "." + codec.decodeWriterId(bucketId) + + "." + codec.decodeStatementId(bucketId) + ")"; + return "{originalTxn: " + transactionId + ", " + bucketToString() + ", row: " + getRowId() +"}"; + } + protected String bucketToString() { + BucketCodec codec = + BucketCodec.determineVersion(bucketId); + return "bucket: " + bucketId + "(" + codec.getVersion() + "." + + codec.decodeWriterId(bucketId) + "." + codec.decodeStatementId(bucketId) + ")"; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 0ef7c758d4..35c4c56a26 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -2056,7 +2056,7 @@ static Reader createOrcReaderForSplit(Configuration conf, OrcSplit orcSplit) thr static int getBucketForSplit(Configuration conf, OrcSplit orcSplit) { if (orcSplit.hasBase()) { - return AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucket(); + return AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucketId(); } else { return (int) orcSplit.getStart(); } @@ -2203,7 +2203,7 @@ private static boolean isStripeSatisfyPredicate( AcidOutputFormat.Options opts = AcidUtils.parseBaseOrDeltaBucketFilename (child.getFileStatus().getPath(), context.conf); opts.writingBase(true); - int b = opts.getBucket(); + int b = opts.getBucketId(); // If the bucket is in the valid range, mark it as covered. // I wish Hive actually enforced bucketing all of the time. if (b >= 0 && b < covered.length) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 95b8806e70..da74a8e2ef 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.io.orc; import java.io.IOException; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -77,6 +76,13 @@ @VisibleForTesting public final static class ReaderKey extends RecordIdentifier{ private long currentTransactionId; + /** + * This is the value from delta file name which may be different from value encode in + * {@link RecordIdentifier#getBucketProperty()} in case of Update/Delete. + * So for Acid 1.0 + multi-stmt txn, if {@code isSameRow() == true}, then it must be an update + * or delete event. For Acid 2.0 + multi-stmt txn, it must be a delete event. + * No 2 Insert events from can ever agree on {@link RecordIdentifier} + */ private int statementId;//sort on this descending, like currentTransactionId public ReaderKey() { @@ -171,8 +177,8 @@ public int compareRow(RecordIdentifier other) { @Override public String toString() { - return "{originalTxn: " + getTransactionId() + ", bucket: " + - getBucketId() + ", row: " + getRowId() + ", currentTxn: " + + return "{originalTxn: " + getTransactionId() + ", " + + bucketToString() + ", row: " + getRowId() + ", currentTxn: " + currentTransactionId + ", statementId: "+ statementId + "}"; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 65f4a24750..d40b89ae14 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -29,10 +29,13 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.RecordUpdater; +import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; @@ -48,10 +51,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; - /** * A RecordUpdater where the files are stored as ORC. + * A note on various record structures: the {@code row} coming in (as in {@link #insert(long, Object)} + * for example), is a struct like but what is written to the file + * * is > (see {@link #createEventSchema(ObjectInspector)}) + * So there are OIs here to make the translation. */ public class OrcRecordUpdater implements RecordUpdater { @@ -96,7 +101,6 @@ private final IntWritable bucket = new IntWritable(); private final LongWritable rowId = new LongWritable(); private long insertedRows = 0; - private long rowIdOffset = 0; // This records how many rows have been inserted or deleted. It is separate from insertedRows // because that is monotonically increasing to give new unique row ids. private long rowCountDelta = 0; @@ -111,6 +115,7 @@ private LongObjectInspector rowIdInspector; // OI for the long row id inside the recordIdentifier private LongObjectInspector origTxnInspector; // OI for the original txn inside the record // identifer + private IntObjectInspector bucketInspector; static int getOperation(OrcStruct struct) { return ((IntWritable) struct.getFieldValue(OPERATION)).get(); @@ -200,7 +205,18 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { this.acidOperationalProperties = AcidUtils.getAcidOperationalProperties(options.getConfiguration()); } - this.bucket.set(options.getBucket()); + BucketCodec bucketCodec = BucketCodec.V1; + if(options.getConfiguration() != null) { + //so that we can test "old" files + Configuration hc = options.getConfiguration(); + if(hc.getBoolean(HiveConf.ConfVars.HIVE_IN_TEST.name(), false) || + hc.getBoolean(HiveConf.ConfVars.HIVE_IN_TEZ_TEST.name(), false)) { + bucketCodec = BucketCodec.getCodec( + hc.getInt(HiveConf.ConfVars.TESTMODE_BUCKET_CODEC_VERSION.name(), + BucketCodec.V1.getVersion())); + } + } + this.bucket.set(bucketCodec.encode(options)); this.path = AcidUtils.createFilename(path, options); this.deleteEventWriter = null; this.deleteEventPath = null; @@ -283,41 +299,6 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { public String toString() { return getClass().getName() + "[" + path +"]"; } - /** - * To handle multiple INSERT... statements in a single transaction, we want to make sure - * to generate unique {@code rowId} for all inserted rows of the transaction. - * @return largest rowId created by previous statements (maybe 0) - * @throws IOException - */ - private long findRowIdOffsetForInsert() throws IOException { - /* - * 1. need to know bucket we are writing to - * 2. need to know which delta dir it's in - * Then, - * 1. find the same bucket file in previous (insert) delta dir for this txn - * (Note: in case of split_update, we can ignore the delete_delta dirs) - * 2. read the footer and get AcidStats which has insert count - * 2.1 if AcidStats.inserts>0 add to the insert count. - * else go to previous delta file - * For example, consider insert/update/insert case...*/ - if(options.getStatementId() <= 0) { - return 0;//there is only 1 statement in this transaction (so far) - } - long totalInserts = 0; - for(int pastStmt = options.getStatementId() - 1; pastStmt >= 0; pastStmt--) { - Path matchingBucket = AcidUtils.createFilename(options.getFinalDestination(), options.clone().statementId(pastStmt)); - if(!fs.exists(matchingBucket)) { - continue; - } - Reader reader = OrcFile.createReader(matchingBucket, OrcFile.readerOptions(options.getConfiguration())); - //no close() on Reader?! - AcidStats acidStats = OrcAcidUtils.parseAcidStats(reader); - if(acidStats.inserts > 0) { - totalInserts += acidStats.inserts; - } - } - return totalInserts; - } // Find the record identifier column (if there) and return a possibly new ObjectInspector that // will strain out the record id for the underlying writer. private ObjectInspector findRecId(ObjectInspector inspector, int rowIdColNum) { @@ -338,6 +319,7 @@ private ObjectInspector findRecId(ObjectInspector inspector, int rowIdColNum) { originalTxnField = fields.get(0); origTxnInspector = (LongObjectInspector)originalTxnField.getFieldObjectInspector(); bucketField = fields.get(1); + bucketInspector = (IntObjectInspector) bucketField.getFieldObjectInspector(); rowIdField = fields.get(2); rowIdInspector = (LongObjectInspector)rowIdField.getFieldObjectInspector(); @@ -346,11 +328,11 @@ private ObjectInspector findRecId(ObjectInspector inspector, int rowIdColNum) { return newInspector; } } - private void addSimpleEvent(int operation, long currentTransaction, long rowId, Object row) throws IOException { this.operation.set(operation); this.currentTransaction.set(currentTransaction); + Integer currentBucket = null; // If this is an insert, originalTransaction should be set to this transaction. If not, // it will be reset by the following if anyway. long originalTransaction = currentTransaction; @@ -359,9 +341,8 @@ private void addSimpleEvent(int operation, long currentTransaction, long rowId, originalTransaction = origTxnInspector.get( recIdInspector.getStructFieldData(rowIdValue, originalTxnField)); rowId = rowIdInspector.get(recIdInspector.getStructFieldData(rowIdValue, rowIdField)); - } - else if(operation == INSERT_OPERATION) { - rowId += rowIdOffset; + currentBucket = setBucket(bucketInspector.get( + recIdInspector.getStructFieldData(rowIdValue, bucketField)), operation); } this.rowId.set(rowId); this.originalTransaction.set(originalTransaction); @@ -372,6 +353,7 @@ else if(operation == INSERT_OPERATION) { writer = OrcFile.createWriter(path, writerOptions); } writer.addRow(item); + restoreBucket(currentBucket, operation); } private void addSplitUpdateEvent(int operation, long currentTransaction, long rowId, Object row) @@ -388,8 +370,11 @@ private void addSplitUpdateEvent(int operation, long currentTransaction, long ro recIdInspector.getStructFieldData(rowValue, originalTxnField)); rowId = rowIdInspector.get( recIdInspector.getStructFieldData(rowValue, rowIdField)); + Integer currentBucket = null; if (operation == DELETE_OPERATION || operation == UPDATE_OPERATION) { + currentBucket = setBucket(bucketInspector.get( + recIdInspector.getStructFieldData(rowValue, bucketField)), operation); // Initialize a deleteEventWriter if not yet done. (Lazy initialization) if (deleteEventWriter == null) { // Initialize an indexBuilder for deleteEvents. @@ -414,6 +399,7 @@ private void addSplitUpdateEvent(int operation, long currentTransaction, long ro item.setFieldValue(OrcRecordUpdater.ROW, null); // ROW is null for delete events. deleteEventIndexBuilder.addKey(DELETE_OPERATION, originalTransaction, bucket.get(), rowId); deleteEventWriter.addRow(item); + restoreBucket(currentBucket, operation); } if (operation == UPDATE_OPERATION) { @@ -426,9 +412,6 @@ private void addSplitUpdateEvent(int operation, long currentTransaction, long ro public void insert(long currentTransaction, Object row) throws IOException { if (this.currentTransaction.get() != currentTransaction) { insertedRows = 0; - //this method is almost no-op in hcatalog.streaming case since statementId == 0 is - //always true in that case - rowIdOffset = findRowIdOffsetForInsert(); } if (acidOperationalProperties.isSplitUpdate()) { addSplitUpdateEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row); @@ -442,7 +425,6 @@ public void insert(long currentTransaction, Object row) throws IOException { public void update(long currentTransaction, Object row) throws IOException { if (this.currentTransaction.get() != currentTransaction) { insertedRows = 0; - rowIdOffset = findRowIdOffsetForInsert(); } if (acidOperationalProperties.isSplitUpdate()) { addSplitUpdateEvent(UPDATE_OPERATION, currentTransaction, -1L, row); @@ -664,4 +646,15 @@ StructField getRecId() { return recId; } } + private void restoreBucket(Integer currentBucket, int operation) { + if(currentBucket != null) { + setBucket(currentBucket, operation); + } + } + private int setBucket(int bucketProperty, int operation) { + assert operation == UPDATE_OPERATION || operation == DELETE_OPERATION; + int currentBucketProperty = bucket.get(); + bucket.set(bucketProperty); + return currentBucketProperty; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 75c7680e26..68e4ae8395 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.Arrays; import java.util.BitSet; -import java.util.List; import java.util.Map.Entry; import java.util.TreeMap; @@ -37,14 +36,12 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; -import org.apache.orc.OrcProto; -import org.apache.orc.OrcUtils; -import org.apache.orc.TypeDescription; import org.apache.orc.impl.AcidStats; import org.apache.orc.impl.OrcAcidUtils; import org.slf4j.Logger; @@ -357,7 +354,7 @@ public SortMergedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, Reader.Opt throws IOException { final Path[] deleteDeltas = getDeleteDeltaDirsFromSplit(orcSplit); if (deleteDeltas.length > 0) { - int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucket(); + int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucketId(); String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY); this.validTxnList = (txnString == null) ? new ValidReadTxnList() : new ValidReadTxnList(txnString); this.deleteRecords = new OrcRawRecordMerger(conf, true, null, false, bucket, @@ -471,9 +468,9 @@ public void close() throws IOException { * An implementation for DeleteEventRegistry that optimizes for performance by loading * all the delete events into memory at once from all the delete delta files. * It starts by reading all the delete events through a regular sort merge logic - * into two vectors- one for original transaction id (otid), and the other for row id. - * (In the current version, since the bucket id should be same for all the delete deltas, - * it is not stored). The otids are likely to be repeated very often, as a single transaction + * into 3 vectors- one for original transaction id (otid), one for bucket property and one for + * row id. See {@link BucketCodec} for more about bucket property. + * The otids are likely to be repeated very often, as a single transaction * often deletes thousands of rows. Hence, the otid vector is compressed to only store the * toIndex and fromIndex ranges in the larger row id vector. Now, querying whether a * record id is deleted or not, is done by performing a binary search on the @@ -484,21 +481,22 @@ public void close() throws IOException { */ static class ColumnizedDeleteEventRegistry implements DeleteEventRegistry { /** - * A simple wrapper class to hold the (otid, rowId) pair. + * A simple wrapper class to hold the (otid, bucketProperty, rowId) pair. */ static class DeleteRecordKey implements Comparable { private long originalTransactionId; + /** + * see {@link BucketCodec} + */ + private int bucketProperty; private long rowId; public DeleteRecordKey() { this.originalTransactionId = -1; this.rowId = -1; } - public DeleteRecordKey(long otid, long rowId) { - this.originalTransactionId = otid; - this.rowId = rowId; - } - public void set(long otid, long rowId) { + public void set(long otid, int bucketProperty, long rowId) { this.originalTransactionId = otid; + this.bucketProperty = bucketProperty; this.rowId = rowId; } @@ -510,11 +508,18 @@ public int compareTo(DeleteRecordKey other) { if (originalTransactionId != other.originalTransactionId) { return originalTransactionId < other.originalTransactionId ? -1 : 1; } + if(bucketProperty != other.bucketProperty) { + return bucketProperty < other.bucketProperty ? -1 : 1; + } if (rowId != other.rowId) { return rowId < other.rowId ? -1 : 1; } return 0; } + @Override + public String toString() { + return "otid: " + originalTransactionId + " bucketP:" + bucketProperty + " rowid: " + rowId; + } } /** @@ -529,6 +534,7 @@ public int compareTo(DeleteRecordKey other) { private int indexPtrInBatch; private final int bucketForSplit; // The bucket value should be same for all the records. private final ValidTxnList validTxnList; + private boolean isBucketPropertyRepeating; public DeleteReaderValue(Reader deleteDeltaReader, Reader.Options readerOptions, int bucket, ValidTxnList validTxnList) throws IOException { @@ -540,6 +546,7 @@ public DeleteReaderValue(Reader deleteDeltaReader, Reader.Options readerOptions, } this.indexPtrInBatch = 0; this.validTxnList = validTxnList; + checkBucketId();//check 1st batch } public boolean next(DeleteRecordKey deleteRecordKey) throws IOException { @@ -551,37 +558,19 @@ public boolean next(DeleteRecordKey deleteRecordKey) throws IOException { if (indexPtrInBatch >= batch.size) { // We have exhausted our current batch, read the next batch. if (recordReader.nextBatch(batch)) { - // Whenever we are reading a batch, we must ensure that all the records in the batch - // have the same bucket id as the bucket id of the split. If not, throw exception. - // NOTE: this assertion might not hold, once virtual bucketing is in place. However, - // it should be simple to fix that case. Just replace check for bucket equality with - // a check for valid bucket mapping. Until virtual bucketing is added, it means - // either the split computation got messed up or we found some corrupted records. - long bucketForRecord = ((LongColumnVector) batch.cols[OrcRecordUpdater.BUCKET]).vector[0]; - if ((batch.size > 1 && !batch.cols[OrcRecordUpdater.BUCKET].isRepeating) - || (bucketForRecord != bucketForSplit)){ - throw new IOException("Corrupted records with different bucket ids " - + "from the containing bucket file found! Expected bucket id " - + bucketForSplit + ", however found the bucket id " + bucketForRecord); - } + checkBucketId(); indexPtrInBatch = 0; // After reading the batch, reset the pointer to beginning. } else { return false; // no more batches to read, exhausted the reader. } } - int originalTransactionIndex = - batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? 0 : indexPtrInBatch; - long originalTransaction = - ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[originalTransactionIndex]; - long rowId = ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector[indexPtrInBatch]; - int currentTransactionIndex = - batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION].isRepeating ? 0 : indexPtrInBatch; - long currentTransaction = - ((LongColumnVector) batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector[currentTransactionIndex]; + long currentTransaction = setCurrentDeleteKey(deleteRecordKey); + if(!isBucketPropertyRepeating) { + checkBucketId(deleteRecordKey.bucketProperty); + } ++indexPtrInBatch; if (validTxnList.isTxnValid(currentTransaction)) { isValidNext = true; - deleteRecordKey.set(originalTransaction, rowId); } } return true; @@ -590,8 +579,51 @@ public boolean next(DeleteRecordKey deleteRecordKey) throws IOException { public void close() throws IOException { this.recordReader.close(); } + private long setCurrentDeleteKey(DeleteRecordKey deleteRecordKey) { + int originalTransactionIndex = + batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? 0 : indexPtrInBatch; + long originalTransaction = + ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[originalTransactionIndex]; + int bucketPropertyIndex = + batch.cols[OrcRecordUpdater.BUCKET].isRepeating ? 0 : indexPtrInBatch; + int bucketProperty = (int)((LongColumnVector)batch.cols[OrcRecordUpdater.BUCKET]).vector[bucketPropertyIndex]; + long rowId = ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector[indexPtrInBatch]; + int currentTransactionIndex = + batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION].isRepeating ? 0 : indexPtrInBatch; + long currentTransaction = + ((LongColumnVector) batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector[currentTransactionIndex]; + deleteRecordKey.set(originalTransaction, bucketProperty, rowId); + return currentTransaction; + } + private void checkBucketId() throws IOException { + isBucketPropertyRepeating = batch.cols[OrcRecordUpdater.BUCKET].isRepeating; + if(isBucketPropertyRepeating) { + int bucketPropertyFromRecord = (int)((LongColumnVector) + batch.cols[OrcRecordUpdater.BUCKET]).vector[0]; + checkBucketId(bucketPropertyFromRecord); + } + } + /** + * Whenever we are reading a batch, we must ensure that all the records in the batch + * have the same bucket id as the bucket id of the split. If not, throw exception. + * NOTE: this assertion might not hold, once virtual bucketing is in place. However, + * it should be simple to fix that case. Just replace check for bucket equality with + * a check for valid bucket mapping. Until virtual bucketing is added, it means + * either the split computation got messed up or we found some corrupted records. + */ + private void checkBucketId(int bucketPropertyFromRecord) throws IOException { + int bucketIdFromRecord = BucketCodec.determineVersion(bucketPropertyFromRecord) + .decodeWriterId(bucketPropertyFromRecord); + if(bucketIdFromRecord != bucketForSplit) { + DeleteRecordKey dummy = new DeleteRecordKey(); + long curTxnId = setCurrentDeleteKey(dummy); + throw new IOException("Corrupted records with different bucket ids " + + "from the containing bucket file found! Expected bucket id " + + bucketForSplit + ", however found the bucket id " + bucketIdFromRecord + + " from " + dummy + " curTxnId: " + curTxnId); + } + } } - /** * A CompressedOtid class stores a compressed representation of the original * transaction ids (otids) read from the delete delta files. Since the record ids @@ -600,13 +632,15 @@ public void close() throws IOException { * the toIndex. These fromIndex and toIndex reference the larger vector formed by * concatenating the correspondingly ordered rowIds. */ - private class CompressedOtid implements Comparable { - long originalTransactionId; - int fromIndex; // inclusive - int toIndex; // exclusive + private final class CompressedOtid implements Comparable { + final long originalTransactionId; + final int bucketProperty; + final int fromIndex; // inclusive + final int toIndex; // exclusive - public CompressedOtid(long otid, int fromIndex, int toIndex) { + CompressedOtid(long otid, int bucketProperty, int fromIndex, int toIndex) { this.originalTransactionId = otid; + this.bucketProperty = bucketProperty; this.fromIndex = fromIndex; this.toIndex = toIndex; } @@ -617,10 +651,24 @@ public int compareTo(CompressedOtid other) { if (originalTransactionId != other.originalTransactionId) { return originalTransactionId < other.originalTransactionId ? -1 : 1; } + if(bucketProperty != other.bucketProperty) { + return bucketProperty < other.bucketProperty ? -1 : 1; + } return 0; } } + /** + * Food for thought: + * this is a bit problematic - in order to load ColumnizedDeleteEventRegistry we still open + * all delete deltas at once - possibly causing OOM same as for {@link SortMergedDeleteEventRegistry} + * which uses {@link OrcRawRecordMerger}. Why not load all delete_delta sequentially. Each + * dd is sorted by {@link RecordIdentifier} so we could create a BTree like structure where the + * 1st level is an array of originalTransactionId where each entry points at an array + * of bucketIds where each entry points at an array of rowIds. We could probably use ArrayList + * to manage insertion as the structure is built (LinkedList?). This should reduce memory + * footprint (as far as OrcReader to a single reader) - probably bad for LLAP IO + */ private TreeMap sortMerger; private long rowIds[]; private CompressedOtid compressedOtids[]; @@ -628,7 +676,7 @@ public int compareTo(CompressedOtid other) { public ColumnizedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, Reader.Options readerOptions) throws IOException, DeleteEventsOverflowMemoryException { - int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucket(); + int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucketId(); String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY); this.validTxnList = (txnString == null) ? new ValidReadTxnList() : new ValidReadTxnList(txnString); this.sortMerger = new TreeMap(); @@ -695,7 +743,10 @@ private void readAllDeleteEventsFromDeleteDeltas() throws IOException { if (sortMerger == null || sortMerger.isEmpty()) return; // trivial case, nothing to read. int distinctOtids = 0; long lastSeenOtid = -1; + int lastSeenBucketProperty = -1; long otids[] = new long[rowIds.length]; + int[] bucketProperties = new int [rowIds.length]; + int index = 0; while (!sortMerger.isEmpty()) { // The sortMerger is a heap data structure that stores a pair of @@ -711,11 +762,14 @@ private void readAllDeleteEventsFromDeleteDeltas() throws IOException { DeleteRecordKey deleteRecordKey = entry.getKey(); DeleteReaderValue deleteReaderValue = entry.getValue(); otids[index] = deleteRecordKey.originalTransactionId; + bucketProperties[index] = deleteRecordKey.bucketProperty; rowIds[index] = deleteRecordKey.rowId; ++index; - if (lastSeenOtid != deleteRecordKey.originalTransactionId) { + if (lastSeenOtid != deleteRecordKey.originalTransactionId || + lastSeenBucketProperty != deleteRecordKey.bucketProperty) { ++distinctOtids; lastSeenOtid = deleteRecordKey.originalTransactionId; + lastSeenBucketProperty = deleteRecordKey.bucketProperty; } if (deleteReaderValue.next(deleteRecordKey)) { sortMerger.put(deleteRecordKey, deleteReaderValue); @@ -729,20 +783,24 @@ private void readAllDeleteEventsFromDeleteDeltas() throws IOException { // the fromIndex(inclusive) and toIndex(exclusive) for each unique otid. this.compressedOtids = new CompressedOtid[distinctOtids]; lastSeenOtid = otids[0]; + lastSeenBucketProperty = bucketProperties[0]; int fromIndex = 0, pos = 0; for (int i = 1; i < otids.length; ++i) { - if (otids[i] != lastSeenOtid) { - compressedOtids[pos] = new CompressedOtid(lastSeenOtid, fromIndex, i); + if (otids[i] != lastSeenOtid || lastSeenBucketProperty != bucketProperties[i]) { + compressedOtids[pos] = + new CompressedOtid(lastSeenOtid, lastSeenBucketProperty, fromIndex, i); lastSeenOtid = otids[i]; + lastSeenBucketProperty = bucketProperties[i]; fromIndex = i; ++pos; } } // account for the last distinct otid - compressedOtids[pos] = new CompressedOtid(lastSeenOtid, fromIndex, otids.length); + compressedOtids[pos] = + new CompressedOtid(lastSeenOtid, lastSeenBucketProperty, fromIndex, otids.length); } - private boolean isDeleted(long otid, long rowId) { + private boolean isDeleted(long otid, int bucketProperty, long rowId) { if (compressedOtids == null || rowIds == null) { return false; } @@ -756,8 +814,8 @@ private boolean isDeleted(long otid, long rowId) { || otid > compressedOtids[compressedOtids.length - 1].originalTransactionId) { return false; } - // Create a dummy key for searching the otid in the compressed otid ranges. - CompressedOtid key = new CompressedOtid(otid, -1, -1); + // Create a dummy key for searching the otid/bucket in the compressed otid ranges. + CompressedOtid key = new CompressedOtid(otid, bucketProperty, -1, -1); int pos = Arrays.binarySearch(compressedOtids, key); if (pos >= 0) { // Otid with the given value found! Searching now for rowId... @@ -789,6 +847,12 @@ public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet) long repeatedOriginalTransaction = (originalTransactionVector != null) ? -1 : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[0]; + long[] bucketProperties = + batch.cols[OrcRecordUpdater.BUCKET].isRepeating ? null + : ((LongColumnVector)batch.cols[OrcRecordUpdater.BUCKET]).vector; + int repeatedBucketProperty = (bucketProperties != null) ? -1 + : (int)((LongColumnVector) batch.cols[OrcRecordUpdater.BUCKET]).vector[0]; + long[] rowIdVector = ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector; @@ -797,8 +861,10 @@ public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet) setBitIndex = selectedBitSet.nextSetBit(setBitIndex+1)) { long otid = originalTransactionVector != null ? originalTransactionVector[setBitIndex] : repeatedOriginalTransaction ; + int bucketProperty = bucketProperties != null ? (int)bucketProperties[setBitIndex] + : repeatedBucketProperty; long rowId = rowIdVector[setBitIndex]; - if (isDeleted(otid, rowId)) { + if (isDeleted(otid, bucketProperty, rowId)) { selectedBitSet.clear(setBitIndex); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java index 0541a4035a..78c511b016 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java @@ -696,15 +696,7 @@ WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2) if(numWhenMatchedUpdateClauses > 1) { throw new SemanticException(ErrorMsg.MERGE_TOO_MANY_UPDATE, ctx.getCmd()); } - assert numInsertClauses < 2; - if(numInsertClauses == 1 && numWhenMatchedUpdateClauses == 1) { - if(AcidUtils.getAcidOperationalProperties(targetTable).isSplitUpdate()) { - throw new IllegalStateException("Tables with " + - hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES + "=" + - TransactionalValidationListener.DEFAULT_TRANSACTIONAL_PROPERTY + " currently do not " + - "support MERGE with both Insert and Update clauses."); - } - } + assert numInsertClauses < 2: "too many Insert clauses"; } if(numWhenMatchedDeleteClauses + numWhenMatchedUpdateClauses == 2 && extraPredicate == null) { throw new SemanticException(ErrorMsg.MERGE_PREDIACTE_REQUIRED, ctx.getCmd()); diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java index 461ef86b83..1de7604432 100755 --- ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.gen.CastDoubleToLong; import org.apache.hadoop.hive.ql.exec.vector.expressions.CastStringToLong; import org.apache.hadoop.hive.ql.exec.vector.expressions.CastTimestampToLong; +import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; @@ -220,7 +221,9 @@ public IntWritable evaluate(RecordIdentifier i) { if (i == null) { return null; } else { - intWritable.set(i.getBucketId()); + BucketCodec decoder = + BucketCodec.determineVersion(i.getBucketProperty()); + intWritable.set(decoder.decodeWriterId(i.getBucketProperty())); return intWritable; } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index 7c66955e14..530c7def88 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -492,53 +492,10 @@ private static void pause(int timeMillis) { * sorts rows in dictionary order */ private List stringifyValues(int[][] rowsIn) { - assert rowsIn.length > 0; - int[][] rows = rowsIn.clone(); - Arrays.sort(rows, new RowComp()); - List rs = new ArrayList(); - for(int[] row : rows) { - assert row.length > 0; - StringBuilder sb = new StringBuilder(); - for(int value : row) { - sb.append(value).append("\t"); - } - sb.setLength(sb.length() - 1); - rs.add(sb.toString()); - } - return rs; - } - private static final class RowComp implements Comparator { - @Override - public int compare(int[] row1, int[] row2) { - assert row1 != null && row2 != null && row1.length == row2.length; - for(int i = 0; i < row1.length; i++) { - int comp = Integer.compare(row1[i], row2[i]); - if(comp != 0) { - return comp; - } - } - return 0; - } + return TestTxnCommands2.stringifyValues(rowsIn); } private String makeValuesClause(int[][] rows) { - assert rows.length > 0; - StringBuilder sb = new StringBuilder("values"); - for(int[] row : rows) { - assert row.length > 0; - if(row.length > 1) { - sb.append("("); - } - for(int value : row) { - sb.append(value).append(","); - } - sb.setLength(sb.length() - 1);//remove trailing comma - if(row.length > 1) { - sb.append(")"); - } - sb.append(","); - } - sb.setLength(sb.length() - 1);//remove trailing comma - return sb.toString(); + return TestTxnCommands2.makeValuesClause(rows); } private List runStatementOnDriver(String stmt) throws Exception { @@ -558,7 +515,6 @@ private CommandProcessorResponse runStatementOnDriverNegative(String stmt) throw throw new RuntimeException("Didn't get expected failure!"); } -// @Ignore @Test public void exchangePartition() throws Exception { runStatementOnDriver("create database ex1"); @@ -756,9 +712,9 @@ public void testMergeUpdateDelete() throws Exception { runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals)); String query = "merge into " + Table.ACIDTBL + " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " + - "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " + - "WHEN MATCHED and t.a > 3 and t.a < 5 THEN DELETE " + - "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) "; + "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " + //updates (2,1) -> (2,0) + "WHEN MATCHED and t.a > 3 and t.a < 5 THEN DELETE " +//deletes (4,3) + "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) ";//inserts (11,11) runStatementOnDriver(query); List r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 5786c4f659..8e478404f7 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; @@ -87,19 +88,27 @@ protected Driver d; protected static enum Table { ACIDTBL("acidTbl"), - ACIDTBLPART("acidTblPart"), + ACIDTBLPART("acidTblPart", "p"), NONACIDORCTBL("nonAcidOrcTbl"), - NONACIDPART("nonAcidPart"), - NONACIDPART2("nonAcidPart2"), - ACIDNESTEDPART("acidNestedPart"); + NONACIDPART("nonAcidPart", "p"), + NONACIDPART2("nonAcidPart2", "p2"), + ACIDNESTEDPART("acidNestedPart", "p,q"); private final String name; + private final String partitionColumns; @Override public String toString() { return name; } + String getPartitionColumns() { + return partitionColumns; + } Table(String name) { + this(name, null); + } + Table(String name, String partitionColumns) { this.name = name; + this.partitionColumns = partitionColumns; } } @@ -599,7 +608,7 @@ public void testNonAcidToAcidConversion3() throws Exception { List rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); int [][] resultData = new int[][] {{1, 2}}; Assert.assertEquals(stringifyValues(resultData), rs); - rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);//todo: what is the point of this if we just did select *? int resultCount = 1; Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); @@ -615,7 +624,7 @@ public void testNonAcidToAcidConversion3() throws Exception { rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); resultData = new int[][] {{1, 2}}; Assert.assertEquals(stringifyValues(resultData), rs); - rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);//todo: what is the point of this if we just did select *? resultCount = 1; Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); @@ -686,7 +695,7 @@ public void testNonAcidToAcidConversion3() throws Exception { rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); resultData = new int[][] {{1, 3}, {3, 4}}; Assert.assertEquals(stringifyValues(resultData), rs); - rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);//todo: what is the point of this if we just did select *? resultCount = 2; Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); @@ -722,7 +731,7 @@ public void testNonAcidToAcidConversion3() throws Exception { rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); resultData = new int[][] {{1, 3}, {3, 4}}; Assert.assertEquals(stringifyValues(resultData), rs); - rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);//todo: what is the point of this if we just did select *? resultCount = 2; Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); @@ -1438,12 +1447,13 @@ public void testMergeWithPredicate() throws Exception { String query = "merge into " + Table.ACIDTBL + " t using " + Table.NONACIDPART2 + " s ON t.a = s.a2 " + "WHEN MATCHED AND t.b between 1 and 3 THEN UPDATE set b = s.b2 " + - "WHEN NOT MATCHED and s.b2 >= 11 THEN INSERT VALUES(s.a2, s.b2)"; + "WHEN NOT MATCHED and s.b2 >= 8 THEN INSERT VALUES(s.a2, s.b2)"; runStatementOnDriver(query); r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); - int[][] rExpected = {{2,2},{4,3},{5,6},{7,8},{11,11}}; + int[][] rExpected = {{2,2},{4,3},{5,6},{7,8},{8,8},{11,11}}; Assert.assertEquals(stringifyValues(rExpected), r); + assertUniqueID(Table.ACIDTBL); } /** @@ -1471,6 +1481,7 @@ public void testMerge2() throws Exception { r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); int[][] rExpected = {{2,2},{4,44},{5,5},{7,8},{11,11}}; Assert.assertEquals(stringifyValues(rExpected), r); + assertUniqueID(Table.ACIDTBL); } /** @@ -1497,27 +1508,34 @@ public void testMerge3() throws Exception { int[][] rExpected = {{7,8},{11,11}}; Assert.assertEquals(stringifyValues(rExpected), r); } - /** - * https://hortonworks.jira.com/browse/BUG-66580 - * @throws Exception - */ - @Ignore @Test public void testMultiInsert() throws Exception { - runStatementOnDriver("create table if not exists srcpart (a int, b int, c int) " + - "partitioned by (z int) clustered by (a) into 2 buckets " + - "stored as orc tblproperties('transactional'='true')"); runStatementOnDriver("create temporary table if not exists data1 (x int)"); -// runStatementOnDriver("create temporary table if not exists data2 (x int)"); - - runStatementOnDriver("insert into data1 values (1),(2),(3)"); -// runStatementOnDriver("insert into data2 values (4),(5),(6)"); + runStatementOnDriver("insert into data1 values (1),(2),(1)"); d.destroy(); hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); d = new Driver(hiveConf); - List r = runStatementOnDriver(" from data1 " + - "insert into srcpart partition(z) select 0,0,1,x " + - "insert into srcpart partition(z=1) select 0,0,1"); + + runStatementOnDriver(" from data1 " + + "insert into " + Table.ACIDTBLPART + " partition(p) select 0, 0, 'p' || x " + + + "insert into " + Table.ACIDTBLPART + " partition(p='p1') select 0, 1"); + /** + * Using {@link BucketCodec.V0} the output + * is missing 1 of the (p1,0,1) rows because they have the same ROW__ID and only differ by + * StatementId so {@link org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger} skips one. + * With split update (and V0), the data is read correctly (insert deltas are now the base) but we still + * should get duplicate ROW__IDs. + */ + List r = runStatementOnDriver("select p,a,b from " + Table.ACIDTBLPART + " order by p, a, b"); + Assert.assertEquals("[p1\t0\t0, p1\t0\t0, p1\t0\t1, p1\t0\t1, p1\t0\t1, p2\t0\t0]", r.toString()); + assertUniqueID(Table.ACIDTBLPART); + /** + * this delete + select covers VectorizedOrcAcidRowBatchReader + */ + runStatementOnDriver("delete from " + Table.ACIDTBLPART); + r = runStatementOnDriver("select p,a,b from " + Table.ACIDTBLPART + " order by p, a, b"); + Assert.assertEquals("[]", r.toString()); } /** * Investigating DP and WriteEntity, etc @@ -1583,6 +1601,8 @@ public void testDynamicPartitionsMerge() throws Exception { r1 = runStatementOnDriver("select p,a,b from " + Table.ACIDTBLPART + " order by p, a, b"); String result= r1.toString(); Assert.assertEquals("[new part\t5\t5, new part\t11\t11, p1\t1\t1, p1\t2\t15, p1\t3\t3, p2\t4\t44]", result); + //note: inserts go into 'new part'... so this won't fail + assertUniqueID(Table.ACIDTBLPART); } /** * Using nested partitions and thus DummyPartition @@ -1605,6 +1625,8 @@ public void testDynamicPartitionsMerge2() throws Exception { "when not matched then insert values(s.a, s.b, 3,4)"); r1 = runStatementOnDriver("select p,q,a,b from " + Table.ACIDNESTEDPART + " order by p,q, a, b"); Assert.assertEquals(stringifyValues(new int[][] {{1,1,1,1},{1,1,3,3},{1,2,2,15},{1,2,4,44},{3,4,5,5},{3,4,11,11}}), r1); + //insert of merge lands in part (3,4) - no updates land there + assertUniqueID(Table.ACIDNESTEDPART); } @Ignore("Covered elsewhere") @Test @@ -1641,6 +1663,41 @@ public void testValuesSource() throws Exception { Assert.assertEquals(stringifyValues(rExpected), r); } + @Test + public void testBucketCodec() throws Exception { + d.destroy(); + //insert data in "legacy" format + hiveConf.setIntVar(HiveConf.ConfVars.TESTMODE_BUCKET_CODEC_VERSION, 0); + d = new Driver(hiveConf); + + int[][] targetVals = {{2,1},{4,3},{5,6},{7,8}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(targetVals)); + + d.destroy(); + hiveConf.setIntVar(HiveConf.ConfVars.TESTMODE_BUCKET_CODEC_VERSION, 1); + d = new Driver(hiveConf); + //do some operations with new format + runStatementOnDriver("update " + Table.ACIDTBL + " set b=11 where a in (5,7)"); + runStatementOnDriver("insert into " + Table.ACIDTBL + " values(11,11)"); + runStatementOnDriver("delete from " + Table.ACIDTBL + " where a = 7"); + + //make sure we get the right data back before/after compactions + List r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + int[][] rExpected = {{2,1},{4,3},{5,11},{11,11}}; + Assert.assertEquals(stringifyValues(rExpected), r); + + runStatementOnDriver("ALTER TABLE " + Table.ACIDTBL + " COMPACT 'MINOR'"); + runWorker(hiveConf); + + r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals(stringifyValues(rExpected), r); + + runStatementOnDriver("ALTER TABLE " + Table.ACIDTBL + " COMPACT 'MAJOR'"); + runWorker(hiveConf); + + r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals(stringifyValues(rExpected), r); + } /** * takes raw data and turns it into a string as if from Driver.getResults() * sorts rows in dictionary order @@ -1661,7 +1718,7 @@ public void testValuesSource() throws Exception { } return rs; } - private static final class RowComp implements Comparator { + static class RowComp implements Comparator { @Override public int compare(int[] row1, int[] row2) { assert row1 != null && row2 != null && row1.length == row2.length; @@ -1674,7 +1731,7 @@ public int compare(int[] row1, int[] row2) { return 0; } } - String makeValuesClause(int[][] rows) { + static String makeValuesClause(int[][] rows) { assert rows.length > 0; StringBuilder sb = new StringBuilder("values"); for(int[] row : rows) { @@ -1705,4 +1762,19 @@ String makeValuesClause(int[][] rows) { d.getResults(rs); return rs; } + final void assertUniqueID(Table table) throws Exception { + String partCols = table.getPartitionColumns(); + //check to make sure there are no duplicate ROW__IDs - HIVE-16832 + StringBuilder sb = new StringBuilder("select "); + if(partCols != null && partCols.length() > 0) { + sb.append(partCols).append(","); + } + sb.append(" ROW__ID, count(*) from ").append(table).append(" group by "); + if(partCols != null && partCols.length() > 0) { + sb.append(partCols).append(","); + } + sb.append("ROW__ID having count(*) > 1"); + List r = runStatementOnDriver(sb.toString()); + Assert.assertTrue("Duplicate ROW__ID: " + r.toString(),r.size() == 0); + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java index ea5ecbc842..520e958af3 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java @@ -542,45 +542,4 @@ public void testNonAcidToAcidConversion3() throws Exception { resultCount = 2; Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); } - @Ignore("HIVE-14947") - @Test - @Override - public void testDynamicPartitionsMerge() throws Exception {} - @Ignore("HIVE-14947") - @Test - @Override - public void testDynamicPartitionsMerge2() throws Exception {} - @Ignore("HIVE-14947") - @Test - @Override - public void testMerge() throws Exception {} - - /** - * todo: remove this test once HIVE-14947 is done (parent class has a better version) - */ - @Test - @Override - public void testMerge2() throws Exception { - int[][] baseValsOdd = {{5,5},{11,11}}; - int[][] baseValsEven = {{2,2},{4,44}}; - runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='odd') " + makeValuesClause(baseValsOdd)); - runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='even') " + makeValuesClause(baseValsEven)); - int[][] vals = {{2,1},{4,3},{5,6},{7,8}}; - runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals)); - List r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); - Assert.assertEquals(stringifyValues(vals), r); - String query = "merge into " + Table.ACIDTBL + - " using " + Table.NONACIDPART2 + " source ON " + Table.ACIDTBL + ".a = source.a2 " + - "WHEN MATCHED THEN UPDATE set b = source.b2 "; - r = runStatementOnDriver(query); - - r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); - int[][] rExpected = {{2,2},{4,44},{5,5},{7,8}}; - Assert.assertEquals(stringifyValues(rExpected), r); - - } - @Ignore("HIVE-14947") - @Test - @Override - public void testMergeWithPredicate() throws Exception {} } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java index a7ff9a3749..c9acdc8c55 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java @@ -113,25 +113,25 @@ public void testParsing() throws Exception { assertEquals(true, opts.isWritingBase()); assertEquals(567, opts.getMaximumTransactionId()); assertEquals(0, opts.getMinimumTransactionId()); - assertEquals(123, opts.getBucket()); + assertEquals(123, opts.getBucketId()); opts = AcidUtils.parseBaseOrDeltaBucketFilename(new Path(dir, "delta_000005_000006/bucket_00001"), conf); assertEquals(false, opts.getOldStyle()); assertEquals(false, opts.isWritingBase()); assertEquals(6, opts.getMaximumTransactionId()); assertEquals(5, opts.getMinimumTransactionId()); - assertEquals(1, opts.getBucket()); + assertEquals(1, opts.getBucketId()); opts = AcidUtils.parseBaseOrDeltaBucketFilename(new Path(dir, "delete_delta_000005_000006/bucket_00001"), conf); assertEquals(false, opts.getOldStyle()); assertEquals(false, opts.isWritingBase()); assertEquals(6, opts.getMaximumTransactionId()); assertEquals(5, opts.getMinimumTransactionId()); - assertEquals(1, opts.getBucket()); + assertEquals(1, opts.getBucketId()); opts = AcidUtils.parseBaseOrDeltaBucketFilename(new Path(dir, "000123_0"), conf); assertEquals(true, opts.getOldStyle()); assertEquals(true, opts.isWritingBase()); - assertEquals(123, opts.getBucket()); + assertEquals(123, opts.getBucketId()); assertEquals(0, opts.getMinimumTransactionId()); assertEquals(0, opts.getMaximumTransactionId()); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index bb7985711f..a4ad2e07d4 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -81,7 +81,6 @@ import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.InputFormatChecker; import org.apache.hadoop.hive.ql.io.RecordIdentifier; -import org.apache.hadoop.hive.ql.io.RecordUpdater; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.Context; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.SplitStrategy; import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; @@ -2445,14 +2444,14 @@ public void testCombinationInputFormatWithAcid() throws Exception { assertEquals("mock:/combinationAcid/p=0/base_0000010/bucket_00000", split.getPath().toString()); assertEquals(0, split.getStart()); - assertEquals(607, split.getLength()); + assertEquals(648, split.getLength()); split = (HiveInputFormat.HiveInputSplit) splits[1]; assertEquals("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat", split.inputFormatClassName()); assertEquals("mock:/combinationAcid/p=0/base_0000010/bucket_00001", split.getPath().toString()); assertEquals(0, split.getStart()); - assertEquals(629, split.getLength()); + assertEquals(674, split.getLength()); CombineHiveInputFormat.CombineHiveInputSplit combineSplit = (CombineHiveInputFormat.CombineHiveInputSplit) splits[2]; assertEquals(BUCKETS, combineSplit.getNumPaths()); @@ -3852,7 +3851,7 @@ public void testColumnProjectionWithAcid() throws Exception { OrcStruct struct = reader.createValue(); while (reader.next(id, struct)) { assertEquals("id " + record, record, id.getRowId()); - assertEquals("bucket " + record, 0, id.getBucketId()); + assertEquals("bucket " + record, 0, id.getBucketProperty()); assertEquals("trans " + record, 1, id.getTransactionId()); assertEquals("a " + record, 42 * record, ((IntWritable) struct.getFieldValue(0)).get()); @@ -3879,7 +3878,7 @@ public void testColumnProjectionWithAcid() throws Exception { struct = reader.createValue(); while (reader.next(id, struct)) { assertEquals("id " + record, record, id.getRowId()); - assertEquals("bucket " + record, 0, id.getBucketId()); + assertEquals("bucket " + record, 0, id.getBucketProperty()); assertEquals("trans " + record, 1, id.getTransactionId()); assertEquals("a " + record, 42 * record, ((IntWritable) struct.getFieldValue(0)).get()); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java index 1ce1bfb1dd..f30408818d 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.io.orc; +import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.orc.CompressionKind; import org.apache.orc.MemoryManager; import org.apache.orc.StripeInformation; @@ -38,10 +39,8 @@ import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.OriginalReaderPair; import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.ReaderKey; import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.ReaderPair; -import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.IntWritable; @@ -191,14 +190,14 @@ public void testReaderPair() throws Exception { new Reader.Options(), 0); RecordReader recordReader = pair.recordReader; assertEquals(10, key.getTransactionId()); - assertEquals(20, key.getBucketId()); + assertEquals(20, key.getBucketProperty()); assertEquals(40, key.getRowId()); assertEquals(120, key.getCurrentTransactionId()); assertEquals("third", value(pair.nextRecord)); pair.next(pair.nextRecord); assertEquals(40, key.getTransactionId()); - assertEquals(50, key.getBucketId()); + assertEquals(50, key.getBucketProperty()); assertEquals(60, key.getRowId()); assertEquals(130, key.getCurrentTransactionId()); assertEquals("fourth", value(pair.nextRecord)); @@ -217,35 +216,35 @@ public void testReaderPairNoMin() throws Exception { new Reader.Options(), 0); RecordReader recordReader = pair.recordReader; assertEquals(10, key.getTransactionId()); - assertEquals(20, key.getBucketId()); + assertEquals(20, key.getBucketProperty()); assertEquals(20, key.getRowId()); assertEquals(100, key.getCurrentTransactionId()); assertEquals("first", value(pair.nextRecord)); pair.next(pair.nextRecord); assertEquals(10, key.getTransactionId()); - assertEquals(20, key.getBucketId()); + assertEquals(20, key.getBucketProperty()); assertEquals(30, key.getRowId()); assertEquals(110, key.getCurrentTransactionId()); assertEquals("second", value(pair.nextRecord)); pair.next(pair.nextRecord); assertEquals(10, key.getTransactionId()); - assertEquals(20, key.getBucketId()); + assertEquals(20, key.getBucketProperty()); assertEquals(40, key.getRowId()); assertEquals(120, key.getCurrentTransactionId()); assertEquals("third", value(pair.nextRecord)); pair.next(pair.nextRecord); assertEquals(40, key.getTransactionId()); - assertEquals(50, key.getBucketId()); + assertEquals(50, key.getBucketProperty()); assertEquals(60, key.getRowId()); assertEquals(130, key.getCurrentTransactionId()); assertEquals("fourth", value(pair.nextRecord)); pair.next(pair.nextRecord); assertEquals(40, key.getTransactionId()); - assertEquals(50, key.getBucketId()); + assertEquals(50, key.getBucketProperty()); assertEquals(61, key.getRowId()); assertEquals(140, key.getCurrentTransactionId()); assertEquals("fifth", value(pair.nextRecord)); @@ -294,14 +293,14 @@ public void testOriginalReaderPair() throws Exception { new Reader.Options().include(includes)); RecordReader recordReader = pair.recordReader; assertEquals(0, key.getTransactionId()); - assertEquals(10, key.getBucketId()); + assertEquals(10, key.getBucketProperty()); assertEquals(2, key.getRowId()); assertEquals(0, key.getCurrentTransactionId()); assertEquals("third", value(pair.nextRecord)); pair.next(pair.nextRecord); assertEquals(0, key.getTransactionId()); - assertEquals(10, key.getBucketId()); + assertEquals(10, key.getBucketProperty()); assertEquals(3, key.getRowId()); assertEquals(0, key.getCurrentTransactionId()); assertEquals("fourth", value(pair.nextRecord)); @@ -323,35 +322,35 @@ public void testOriginalReaderPairNoMin() throws Exception { new Reader.Options()); assertEquals("first", value(pair.nextRecord)); assertEquals(0, key.getTransactionId()); - assertEquals(10, key.getBucketId()); + assertEquals(10, key.getBucketProperty()); assertEquals(0, key.getRowId()); assertEquals(0, key.getCurrentTransactionId()); pair.next(pair.nextRecord); assertEquals("second", value(pair.nextRecord)); assertEquals(0, key.getTransactionId()); - assertEquals(10, key.getBucketId()); + assertEquals(10, key.getBucketProperty()); assertEquals(1, key.getRowId()); assertEquals(0, key.getCurrentTransactionId()); pair.next(pair.nextRecord); assertEquals("third", value(pair.nextRecord)); assertEquals(0, key.getTransactionId()); - assertEquals(10, key.getBucketId()); + assertEquals(10, key.getBucketProperty()); assertEquals(2, key.getRowId()); assertEquals(0, key.getCurrentTransactionId()); pair.next(pair.nextRecord); assertEquals("fourth", value(pair.nextRecord)); assertEquals(0, key.getTransactionId()); - assertEquals(10, key.getBucketId()); + assertEquals(10, key.getBucketProperty()); assertEquals(3, key.getRowId()); assertEquals(0, key.getCurrentTransactionId()); pair.next(pair.nextRecord); assertEquals("fifth", value(pair.nextRecord)); assertEquals(0, key.getTransactionId()); - assertEquals(10, key.getBucketId()); + assertEquals(10, key.getBucketProperty()); assertEquals(4, key.getRowId()); assertEquals(0, key.getCurrentTransactionId()); @@ -434,13 +433,13 @@ public void testNewBase() throws Exception { assertEquals(true, merger.next(id, event)); assertEquals(10, id.getTransactionId()); - assertEquals(20, id.getBucketId()); + assertEquals(20, id.getBucketProperty()); assertEquals(40, id.getRowId()); assertEquals("third", getValue(event)); assertEquals(true, merger.next(id, event)); assertEquals(40, id.getTransactionId()); - assertEquals(50, id.getBucketId()); + assertEquals(50, id.getBucketProperty()); assertEquals(60, id.getRowId()); assertEquals("fourth", getValue(event)); @@ -566,6 +565,7 @@ private void testNewBaseAndDelta(boolean use130Format) throws Exception { // write the base AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) .inspector(inspector).bucket(BUCKET).finalDestination(root); + final int BUCKET_PROPERTY = BucketCodec.V1.encode(options); if(!use130Format) { options.statementId(-1); } @@ -579,11 +579,11 @@ private void testNewBaseAndDelta(boolean use130Format) throws Exception { // write a delta ru = of.getRecordUpdater(root, options.writingBase(false) .minimumTransactionId(200).maximumTransactionId(200).recordIdColumn(1)); - ru.update(200, new MyRow("update 1", 0, 0, BUCKET)); - ru.update(200, new MyRow("update 2", 2, 0, BUCKET)); - ru.update(200, new MyRow("update 3", 3, 0, BUCKET)); - ru.delete(200, new MyRow("", 7, 0, BUCKET)); - ru.delete(200, new MyRow("", 8, 0, BUCKET)); + ru.update(200, new MyRow("update 1", 0, 0, BUCKET_PROPERTY)); + ru.update(200, new MyRow("update 2", 2, 0, BUCKET_PROPERTY)); + ru.update(200, new MyRow("update 3", 3, 0, BUCKET_PROPERTY)); + ru.delete(200, new MyRow("", 7, 0, BUCKET_PROPERTY)); + ru.delete(200, new MyRow("", 8, 0, BUCKET_PROPERTY)); ru.close(false); ValidTxnList txnList = new ValidReadTxnList("200:" + Long.MAX_VALUE); @@ -615,64 +615,64 @@ private void testNewBaseAndDelta(boolean use130Format) throws Exception { assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.UPDATE_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 0, 200), id); + assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 0, 200), id); assertEquals("update 1", getValue(event)); assertFalse(merger.isDelete(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 1, 0), id); + assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 1, 0), id); assertEquals("second", getValue(event)); assertFalse(merger.isDelete(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.UPDATE_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 2, 200), id); + assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 2, 200), id); assertEquals("update 2", getValue(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.UPDATE_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 3, 200), id); + assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 3, 200), id); assertEquals("update 3", getValue(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 4, 0), id); + assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 4, 0), id); assertEquals("fifth", getValue(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 5, 0), id); + assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 5, 0), id); assertEquals("sixth", getValue(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 6, 0), id); + assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 6, 0), id); assertEquals("seventh", getValue(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.DELETE_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 7, 200), id); + assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 7, 200), id); assertNull(OrcRecordUpdater.getRow(event)); assertTrue(merger.isDelete(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.DELETE_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 8, 200), id); + assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 8, 200), id); assertNull(OrcRecordUpdater.getRow(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 9, 0), id); + assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 9, 0), id); assertEquals("tenth", getValue(event)); assertEquals(false, merger.next(id, event)); @@ -686,90 +686,90 @@ private void testNewBaseAndDelta(boolean use130Format) throws Exception { assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.UPDATE_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 0, 200), id); + assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 0, 200), id); assertEquals("update 1", getValue(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 0, 0), id); + assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 0, 0), id); assertEquals("first", getValue(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 1, 0), id); + assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 1, 0), id); assertEquals("second", getValue(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.UPDATE_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 2, 200), id); + assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 2, 200), id); assertEquals("update 2", getValue(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 2, 0), id); + assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 2, 0), id); assertEquals("third", getValue(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.UPDATE_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 3, 200), id); + assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 3, 200), id); assertEquals("update 3", getValue(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 3, 0), id); + assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 3, 0), id); assertEquals("fourth", getValue(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 4, 0), id); + assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 4, 0), id); assertEquals("fifth", getValue(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 5, 0), id); + assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 5, 0), id); assertEquals("sixth", getValue(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 6, 0), id); + assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 6, 0), id); assertEquals("seventh", getValue(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.DELETE_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 7, 200), id); + assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 7, 200), id); assertNull(OrcRecordUpdater.getRow(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 7, 0), id); + assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 7, 0), id); assertEquals("eighth", getValue(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.DELETE_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 8, 200), id); + assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 8, 200), id); assertNull(OrcRecordUpdater.getRow(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 8, 0), id); + assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 8, 0), id); assertEquals("ninth", getValue(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 9, 0), id); + assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 9, 0), id); assertEquals("tenth", getValue(event)); assertEquals(false, merger.next(id, event)); @@ -786,7 +786,7 @@ private void testNewBaseAndDelta(boolean use130Format) throws Exception { LOG.info("id = " + id + "event = " + event); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, i, 0), id); + assertEquals(new ReaderKey(0, BUCKET_PROPERTY, i, 0), id); assertEquals(values[i], getValue(event)); } @@ -974,6 +974,9 @@ public synchronized void addedRow(int rows) throws IOException { new OrcRecordUpdater.OrcOptions(conf) .writingBase(true).minimumTransactionId(0).maximumTransactionId(0) .bucket(BUCKET).inspector(inspector).filesystem(fs); + + final int BUCKET_PROPERTY = BucketCodec.V1.encode(options); + options.orcOptions(OrcFile.writerOptions(conf) .stripeSize(1).blockPadding(false).compress(CompressionKind.NONE) .memory(mgr).batchSize(2)); @@ -994,10 +997,10 @@ public synchronized void addedRow(int rows) throws IOException { "ignore.7"}; for(int i=0; i < values.length; ++i) { if (values[i] != null) { - ru.update(1, new BigRow(i, i, values[i], i, i, i, 0, BUCKET)); + ru.update(1, new BigRow(i, i, values[i], i, i, i, 0, BUCKET_PROPERTY)); } } - ru.delete(100, new BigRow(9, 0, BUCKET)); + ru.delete(100, new BigRow(9, 0, BUCKET_PROPERTY)); ru.close(false); // write a delta @@ -1006,10 +1009,10 @@ public synchronized void addedRow(int rows) throws IOException { values = new String[]{null, null, "1.0", null, null, null, null, "3.1"}; for(int i=0; i < values.length; ++i) { if (values[i] != null) { - ru.update(2, new BigRow(i, i, values[i], i, i, i, 0, BUCKET)); + ru.update(2, new BigRow(i, i, values[i], i, i, i, 0, BUCKET_PROPERTY)); } } - ru.delete(100, new BigRow(8, 0, BUCKET)); + ru.delete(100, new BigRow(8, 0, BUCKET_PROPERTY)); ru.close(false); InputFormat inf = new OrcInputFormat(); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java index 67c473e188..be155176fc 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.RecordUpdater; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -143,27 +144,27 @@ public void testWriter() throws Exception { OrcRecordUpdater.getOperation(row)); assertEquals(11, OrcRecordUpdater.getCurrentTransaction(row)); assertEquals(11, OrcRecordUpdater.getOriginalTransaction(row)); - assertEquals(10, OrcRecordUpdater.getBucket(row)); + assertEquals(10, getBucketId(row)); assertEquals(0, OrcRecordUpdater.getRowId(row)); assertEquals("first", OrcRecordUpdater.getRow(row).getFieldValue(0).toString()); assertEquals(true, rows.hasNext()); row = (OrcStruct) rows.next(null); assertEquals(1, OrcRecordUpdater.getRowId(row)); - assertEquals(10, OrcRecordUpdater.getBucket(row)); + assertEquals(10, getBucketId(row)); assertEquals("second", OrcRecordUpdater.getRow(row).getFieldValue(0).toString()); assertEquals(true, rows.hasNext()); row = (OrcStruct) rows.next(null); assertEquals(2, OrcRecordUpdater.getRowId(row)); - assertEquals(10, OrcRecordUpdater.getBucket(row)); + assertEquals(10, getBucketId(row)); assertEquals("third", OrcRecordUpdater.getRow(row).getFieldValue(0).toString()); assertEquals(true, rows.hasNext()); row = (OrcStruct) rows.next(null); assertEquals(12, OrcRecordUpdater.getCurrentTransaction(row)); assertEquals(12, OrcRecordUpdater.getOriginalTransaction(row)); - assertEquals(10, OrcRecordUpdater.getBucket(row)); + assertEquals(10, getBucketId(row)); assertEquals(0, OrcRecordUpdater.getRowId(row)); assertEquals("fourth", OrcRecordUpdater.getRow(row).getFieldValue(0).toString()); @@ -184,7 +185,11 @@ public void testWriter() throws Exception { assertEquals(false, fs.exists(sidePath)); } - + private static int getBucketId(OrcStruct row) { + int bucketValue = OrcRecordUpdater.getBucket(row); + return + BucketCodec.determineVersion(bucketValue).decodeWriterId(bucketValue); + } @Test public void testWriterTblProperties() throws Exception { Path root = new Path(workDir, "testWriterTblProperties"); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java index 6bf13129b8..3ed047af4b 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.RecordUpdater; @@ -72,6 +73,7 @@ DummyRow(long val, long rowId, long origTxn, int bucket) { field = new LongWritable(val); + bucket = BucketCodec.V1.encode(new AcidOutputFormat.Options(null).bucket(bucket)); ROW__ID = new RecordIdentifier(origTxn, bucket, rowId); } diff --git ql/src/test/queries/clientpositive/acid_bucket_pruning.q ql/src/test/queries/clientpositive/acid_bucket_pruning.q index 24f8de1731..d8d59b241c 100644 --- ql/src/test/queries/clientpositive/acid_bucket_pruning.q +++ ql/src/test/queries/clientpositive/acid_bucket_pruning.q @@ -18,4 +18,10 @@ INSERT INTO TABLE acidTblDefault VALUES (1); -- Exactly one of the buckets should be selected out of the 16 buckets -- by the following selection query. EXPLAIN EXTENDED -SELECT * FROM acidTblDefault WHERE a = 1; \ No newline at end of file +SELECT * FROM acidTblDefault WHERE a = 1; + +select count(*) from acidTblDefault WHERE a = 1; + +set hive.tez.bucket.pruning=false; + +select count(*) from acidTblDefault WHERE a = 1; diff --git ql/src/test/results/clientpositive/acid_table_stats.q.out ql/src/test/results/clientpositive/acid_table_stats.q.out index 195278a6eb..f2181a1ae5 100644 --- ql/src/test/results/clientpositive/acid_table_stats.q.out +++ ql/src/test/results/clientpositive/acid_table_stats.q.out @@ -98,7 +98,7 @@ Partition Parameters: numFiles 2 numRows 0 rawDataSize 0 - totalSize 3852 + totalSize 3949 #### A masked pattern was here #### # Storage Information @@ -136,9 +136,9 @@ STAGE PLANS: Map Operator Tree: TableScan alias: acid - Statistics: Num rows: 1 Data size: 3852 Basic stats: PARTIAL Column stats: NONE + Statistics: Num rows: 1 Data size: 3949 Basic stats: PARTIAL Column stats: NONE Select Operator - Statistics: Num rows: 1 Data size: 3852 Basic stats: PARTIAL Column stats: NONE + Statistics: Num rows: 1 Data size: 3949 Basic stats: PARTIAL Column stats: NONE Group By Operator aggregations: count() mode: hash @@ -215,7 +215,7 @@ Partition Parameters: numFiles 2 numRows 1000 rawDataSize 208000 - totalSize 3852 + totalSize 3949 #### A masked pattern was here #### # Storage Information @@ -264,7 +264,7 @@ Partition Parameters: numFiles 2 numRows 1000 rawDataSize 208000 - totalSize 3852 + totalSize 3949 #### A masked pattern was here #### # Storage Information @@ -391,7 +391,7 @@ Partition Parameters: numFiles 4 numRows 1000 rawDataSize 208000 - totalSize 7718 + totalSize 7903 #### A masked pattern was here #### # Storage Information @@ -440,7 +440,7 @@ Partition Parameters: numFiles 4 numRows 2000 rawDataSize 416000 - totalSize 7718 + totalSize 7903 #### A masked pattern was here #### # Storage Information diff --git ql/src/test/results/clientpositive/autoColumnStats_4.q.out ql/src/test/results/clientpositive/autoColumnStats_4.q.out index c3ad1920b5..fe3b9e53ef 100644 --- ql/src/test/results/clientpositive/autoColumnStats_4.q.out +++ ql/src/test/results/clientpositive/autoColumnStats_4.q.out @@ -201,7 +201,7 @@ Table Parameters: numFiles 2 numRows 0 rawDataSize 0 - totalSize 1724 + totalSize 1798 transactional true #### A masked pattern was here #### @@ -244,7 +244,7 @@ Table Parameters: numFiles 4 numRows 0 rawDataSize 0 - totalSize 2763 + totalSize 2909 transactional true #### A masked pattern was here #### diff --git ql/src/test/results/clientpositive/insert_values_orig_table_use_metadata.q.out ql/src/test/results/clientpositive/insert_values_orig_table_use_metadata.q.out index 0d8e7790e1..1564106b6d 100644 --- ql/src/test/results/clientpositive/insert_values_orig_table_use_metadata.q.out +++ ql/src/test/results/clientpositive/insert_values_orig_table_use_metadata.q.out @@ -308,7 +308,7 @@ Table Parameters: numFiles 1 numRows 0 rawDataSize 0 - totalSize 1512 + totalSize 1555 transactional true #### A masked pattern was here #### @@ -336,9 +336,9 @@ STAGE PLANS: Map Operator Tree: TableScan alias: acid_ivot - Statistics: Num rows: 1 Data size: 1512 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 1555 Basic stats: COMPLETE Column stats: COMPLETE Select Operator - Statistics: Num rows: 1 Data size: 1512 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 1555 Basic stats: COMPLETE Column stats: COMPLETE Group By Operator aggregations: count() mode: hash @@ -430,7 +430,7 @@ Table Parameters: numFiles 2 numRows 0 rawDataSize 0 - totalSize 3024 + totalSize 3110 transactional true #### A masked pattern was here #### @@ -458,9 +458,9 @@ STAGE PLANS: Map Operator Tree: TableScan alias: acid_ivot - Statistics: Num rows: 1 Data size: 3024 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 3110 Basic stats: COMPLETE Column stats: COMPLETE Select Operator - Statistics: Num rows: 1 Data size: 3024 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 3110 Basic stats: COMPLETE Column stats: COMPLETE Group By Operator aggregations: count() mode: hash @@ -538,7 +538,7 @@ Table Parameters: numFiles 3 numRows 0 rawDataSize 0 - totalSize 380261 + totalSize 380347 transactional true #### A masked pattern was here #### @@ -566,9 +566,9 @@ STAGE PLANS: Map Operator Tree: TableScan alias: acid_ivot - Statistics: Num rows: 1 Data size: 380261 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 380347 Basic stats: COMPLETE Column stats: COMPLETE Select Operator - Statistics: Num rows: 1 Data size: 380261 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 380347 Basic stats: COMPLETE Column stats: COMPLETE Group By Operator aggregations: count() mode: hash diff --git ql/src/test/results/clientpositive/llap/acid_bucket_pruning.q.out ql/src/test/results/clientpositive/llap/acid_bucket_pruning.q.out index 357ae7bdaf..a61277c091 100644 --- ql/src/test/results/clientpositive/llap/acid_bucket_pruning.q.out +++ ql/src/test/results/clientpositive/llap/acid_bucket_pruning.q.out @@ -43,22 +43,22 @@ STAGE PLANS: alias: acidtbldefault filterExpr: (a = 1) (type: boolean) buckets included: [1,] of 16 - Statistics: Num rows: 7972 Data size: 31888 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 8142 Data size: 32570 Basic stats: COMPLETE Column stats: NONE GatherStats: false Filter Operator isSamplingPred: false predicate: (a = 1) (type: boolean) - Statistics: Num rows: 3986 Data size: 15944 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 4071 Data size: 16285 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: 1 (type: int) outputColumnNames: _col0 - Statistics: Num rows: 3986 Data size: 15944 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 4071 Data size: 16285 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false GlobalTableId: 0 #### A masked pattern was here #### NumFilesPerFileSink: 1 - Statistics: Num rows: 3986 Data size: 15944 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 4071 Data size: 16285 Basic stats: COMPLETE Column stats: NONE #### A masked pattern was here #### table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat @@ -100,7 +100,7 @@ STAGE PLANS: serialization.ddl struct acidtbldefault { i32 a} serialization.format 1 serialization.lib org.apache.hadoop.hive.ql.io.orc.OrcSerde - totalSize 31888 + totalSize 32570 transactional true transactional_properties default #### A masked pattern was here #### @@ -123,7 +123,7 @@ STAGE PLANS: serialization.ddl struct acidtbldefault { i32 a} serialization.format 1 serialization.lib org.apache.hadoop.hive.ql.io.orc.OrcSerde - totalSize 31888 + totalSize 32570 transactional true transactional_properties default #### A masked pattern was here #### @@ -139,3 +139,21 @@ STAGE PLANS: Processor Tree: ListSink +PREHOOK: query: select count(*) from acidTblDefault WHERE a = 1 +PREHOOK: type: QUERY +PREHOOK: Input: default@acidtbldefault +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from acidTblDefault WHERE a = 1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@acidtbldefault +#### A masked pattern was here #### +1 +PREHOOK: query: select count(*) from acidTblDefault WHERE a = 1 +PREHOOK: type: QUERY +PREHOOK: Input: default@acidtbldefault +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from acidTblDefault WHERE a = 1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@acidtbldefault +#### A masked pattern was here #### +1 diff --git ql/src/test/results/clientpositive/row__id.q.out ql/src/test/results/clientpositive/row__id.q.out index 43c9b600ca..059ace91b5 100644 --- ql/src/test/results/clientpositive/row__id.q.out +++ ql/src/test/results/clientpositive/row__id.q.out @@ -56,23 +56,23 @@ STAGE PLANS: Map Operator Tree: TableScan alias: hello_acid - Statistics: Num rows: 1 Data size: 2936 Basic stats: PARTIAL Column stats: NONE + Statistics: Num rows: 1 Data size: 3054 Basic stats: PARTIAL Column stats: NONE Select Operator expressions: ROW__ID.transactionid (type: bigint) outputColumnNames: _col0 - Statistics: Num rows: 1 Data size: 2936 Basic stats: PARTIAL Column stats: NONE + Statistics: Num rows: 1 Data size: 3054 Basic stats: PARTIAL Column stats: NONE Reduce Output Operator key expressions: _col0 (type: bigint) sort order: + - Statistics: Num rows: 1 Data size: 2936 Basic stats: PARTIAL Column stats: NONE + Statistics: Num rows: 1 Data size: 3054 Basic stats: PARTIAL Column stats: NONE Reduce Operator Tree: Select Operator expressions: KEY.reducesinkkey0 (type: bigint) outputColumnNames: _col0 - Statistics: Num rows: 1 Data size: 2936 Basic stats: PARTIAL Column stats: NONE + Statistics: Num rows: 1 Data size: 3054 Basic stats: PARTIAL Column stats: NONE File Output Operator compressed: false - Statistics: Num rows: 1 Data size: 2936 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 1 Data size: 3054 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat @@ -117,17 +117,17 @@ STAGE PLANS: Map Operator Tree: TableScan alias: hello_acid - Statistics: Num rows: 1 Data size: 2936 Basic stats: PARTIAL Column stats: NONE + Statistics: Num rows: 1 Data size: 3054 Basic stats: PARTIAL Column stats: NONE Filter Operator predicate: (ROW__ID.transactionid = 3) (type: boolean) - Statistics: Num rows: 1 Data size: 2936 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 1 Data size: 3054 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: ROW__ID.transactionid (type: bigint) outputColumnNames: _col0 - Statistics: Num rows: 1 Data size: 2936 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 1 Data size: 3054 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false - Statistics: Num rows: 1 Data size: 2936 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 1 Data size: 3054 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat