diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java index e409e75571..849f6cca69 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java @@ -267,7 +267,8 @@ private RecordUpdater createRecordUpdater(int bucketId, Long minTxnId, Long maxT .minimumTransactionId(minTxnId) .maximumTransactionId(maxTxnID) .statementId(-1) - .finalDestination(partitionPath)); + .finalDestination(partitionPath) + .numBuckets(tbl.getSd().getNumBuckets())); } catch (SerDeException e) { throw new SerializationError("Failed to get object inspector from Serde " + getSerde().getClass().getName(), e); diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java index 571e076588..9da1cda14b 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java @@ -68,7 +68,7 @@ public BucketIdResolverImpl(ObjectInspector objectInspector, int recordIdColumn, @Override public Object attachBucketIdToRecord(Object record) { int bucketId = computeBucketId(record); - RecordIdentifier recordIdentifier = new RecordIdentifier(INVALID_TRANSACTION_ID, bucketId, INVALID_ROW_ID); + RecordIdentifier recordIdentifier = new RecordIdentifier(INVALID_TRANSACTION_ID, bucketId, bucketId, INVALID_ROW_ID); structObjectInspector.setStructFieldData(record, recordIdentifierField, recordIdentifier); return record; } diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java index 1ad0842d98..a59c606497 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java @@ -228,7 +228,7 @@ private void resetMutator(int newBucketId, List newPartitionValues, Path // TODO: Should this be the concern of the mutator? deleteDeltaIfExists(newPartitionPath, table.getTransactionId(), newBucketId); } - mutator = mutatorFactory.newMutator(outputFormat, table.getTransactionId(), newPartitionPath, newBucketId); + mutator = mutatorFactory.newMutator(outputFormat, table, newPartitionPath, newBucketId); bucketId = newBucketId; partitionValues = newPartitionValues; partitionPath = newPartitionPath; diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorFactory.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorFactory.java index 22cd9b747a..54eabbd69d 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorFactory.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorFactory.java @@ -21,10 +21,11 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hive.hcatalog.streaming.mutate.client.AcidTable; public interface MutatorFactory { - Mutator newMutator(AcidOutputFormat outputFormat, long transactionId, Path partitionPath, int bucketId) throws IOException; + Mutator newMutator(AcidOutputFormat outputFormat, AcidTable table, Path partitionPath, int bucketId) throws IOException; RecordInspector newRecordInspector(); diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java index 8998de99e9..b8b0193108 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java @@ -32,19 +32,22 @@ private final long transactionId; private final Path partitionPath; private final int bucketId; + private final int numBuckets; private final Configuration configuration; private final int recordIdColumn; private final ObjectInspector objectInspector; private RecordUpdater updater; public MutatorImpl(Configuration configuration, int recordIdColumn, ObjectInspector objectInspector, - AcidOutputFormat outputFormat, long transactionId, Path partitionPath, int bucketId) throws IOException { + AcidOutputFormat outputFormat, long transactionId, Path partitionPath, int bucketId, + int numBuckets) throws IOException { this.configuration = configuration; this.recordIdColumn = recordIdColumn; this.objectInspector = objectInspector; this.transactionId = transactionId; this.partitionPath = partitionPath; this.bucketId = bucketId; + this.numBuckets = numBuckets; updater = createRecordUpdater(outputFormat); } @@ -97,7 +100,8 @@ protected RecordUpdater createRecordUpdater(AcidOutputFormat outputFormat) .maximumTransactionId(transactionId) .recordIdColumn(recordIdColumn) .finalDestination(partitionPath) - .statementId(-1)); + .statementId(-1) + .numBuckets(numBuckets)); } } diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ReflectiveMutatorFactory.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ReflectiveMutatorFactory.java index e057da7025..e39aaedc9f 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ReflectiveMutatorFactory.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ReflectiveMutatorFactory.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hive.hcatalog.streaming.mutate.client.AcidTable; import org.apache.hive.hcatalog.streaming.mutate.worker.BucketIdResolver; import org.apache.hive.hcatalog.streaming.mutate.worker.BucketIdResolverImpl; import org.apache.hive.hcatalog.streaming.mutate.worker.Mutator; @@ -49,10 +50,11 @@ public ReflectiveMutatorFactory(Configuration configuration, Class recordClas } @Override - public Mutator newMutator(AcidOutputFormat outputFormat, long transactionId, Path partitionPath, int bucketId) - throws IOException { - return new MutatorImpl(configuration, recordIdColumn, objectInspector, outputFormat, transactionId, partitionPath, - bucketId); + public Mutator newMutator(AcidOutputFormat outputFormat, AcidTable table, + Path partitionPath, int bucketId) throws IOException { + return new MutatorImpl(configuration, recordIdColumn, objectInspector, outputFormat, + table.getTransactionId(), partitionPath, bucketId, table.getTotalBuckets()); + } @Override diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java index 686767908a..197996ebc2 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java @@ -148,7 +148,7 @@ public void assertMaxTransactionId(long expectedMaxTransactionId) { while (recordReader.next(key, value)) { RecordIdentifier recordIdentifier = recordReader.getRecordIdentifier(); Record record = new Record(new RecordIdentifier(recordIdentifier.getTransactionId(), - recordIdentifier.getBucketId(), recordIdentifier.getRowId()), value.toString()); + recordIdentifier.getBucketId(), recordIdentifier.getWriterId(), recordIdentifier.getRowId()), value.toString()); System.out.println(record); records.add(record); } diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java index f1de1dfe4c..a0721726e0 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java @@ -242,7 +242,7 @@ public void testTransactionBatchCommitPartitioned() throws Exception { List readRecords = streamingAssertions.readRecords(); assertThat(readRecords.size(), is(1)); assertThat(readRecords.get(0).getRow(), is("{1, Hello streaming}")); - assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0L))); + assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0, 0L))); assertThat(transaction.getState(), is(COMMITTED)); client.close(); @@ -299,7 +299,7 @@ public void testMulti() throws Exception { List readRecords = streamingAssertions.readRecords(); assertThat(readRecords.size(), is(1)); assertThat(readRecords.get(0).getRow(), is("{1, Hello streaming}")); - assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0L))); + assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0, 0L))); // EUROPE_UK streamingAssertions = assertionFactory.newStreamingAssert(table, EUROPE_UK); @@ -310,7 +310,7 @@ public void testMulti() throws Exception { readRecords = streamingAssertions.readRecords(); assertThat(readRecords.size(), is(1)); assertThat(readRecords.get(0).getRow(), is("{2, Hello streaming}")); - assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0L))); + assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0, 0L))); // EUROPE_FRANCE streamingAssertions = assertionFactory.newStreamingAssert(table, EUROPE_FRANCE); @@ -321,9 +321,9 @@ public void testMulti() throws Exception { readRecords = streamingAssertions.readRecords(); assertThat(readRecords.size(), is(2)); assertThat(readRecords.get(0).getRow(), is("{3, Hello streaming}")); - assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0L))); + assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0, 0L))); assertThat(readRecords.get(1).getRow(), is("{4, Bonjour streaming}")); - assertThat(readRecords.get(1).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 1L))); + assertThat(readRecords.get(1).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0, 1L))); client.close(); } @@ -369,7 +369,7 @@ public void testTransactionBatchCommitUnpartitioned() throws Exception { List readRecords = streamingAssertions.readRecords(); assertThat(readRecords.size(), is(1)); assertThat(readRecords.get(0).getRow(), is("{1, Hello streaming}")); - assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0L))); + assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0, 0L))); assertThat(transaction.getState(), is(COMMITTED)); client.close(); @@ -499,13 +499,13 @@ public void testUpdatesAndDeletes() throws Exception { "Namaste streaming 3")); mutateCoordinator.update(ASIA_INDIA, new MutableRecord(2, "UPDATED: Namaste streaming 2", new RecordIdentifier(1L, - 0, 1L))); + 0, 0, 1L))); mutateCoordinator.insert(ASIA_INDIA, asiaIndiaRecord3); - mutateCoordinator.delete(EUROPE_UK, new MutableRecord(3, "Hello streaming 1", new RecordIdentifier(1L, 0, 0L))); + mutateCoordinator.delete(EUROPE_UK, new MutableRecord(3, "Hello streaming 1", new RecordIdentifier(1L, 0, 0, 0L))); mutateCoordinator.delete(EUROPE_FRANCE, - new MutableRecord(5, "Bonjour streaming 1", new RecordIdentifier(1L, 0, 0L))); + new MutableRecord(5, "Bonjour streaming 1", new RecordIdentifier(1L, 0, 0, 0L))); mutateCoordinator.update(EUROPE_FRANCE, new MutableRecord(6, "UPDATED: Bonjour streaming 2", new RecordIdentifier( - 1L, 0, 1L))); + 1L, 0, 0, 1L))); mutateCoordinator.close(); mutateTransaction.commit(); @@ -518,11 +518,11 @@ public void testUpdatesAndDeletes() throws Exception { List indiaRecords = indiaAssertions.readRecords(); assertThat(indiaRecords.size(), is(3)); assertThat(indiaRecords.get(0).getRow(), is("{1, Namaste streaming 1}")); - assertThat(indiaRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0L))); + assertThat(indiaRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0, 0L))); assertThat(indiaRecords.get(1).getRow(), is("{2, UPDATED: Namaste streaming 2}")); - assertThat(indiaRecords.get(1).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 1L))); + assertThat(indiaRecords.get(1).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0, 1L))); assertThat(indiaRecords.get(2).getRow(), is("{20, Namaste streaming 3}")); - assertThat(indiaRecords.get(2).getRecordIdentifier(), is(new RecordIdentifier(2L, 0, 0L))); + assertThat(indiaRecords.get(2).getRecordIdentifier(), is(new RecordIdentifier(2L, 0, 0, 0L))); StreamingAssert ukAssertions = assertionFactory.newStreamingAssert(table, EUROPE_UK); ukAssertions.assertMinTransactionId(1L); @@ -530,7 +530,7 @@ public void testUpdatesAndDeletes() throws Exception { List ukRecords = ukAssertions.readRecords(); assertThat(ukRecords.size(), is(1)); assertThat(ukRecords.get(0).getRow(), is("{4, Hello streaming 2}")); - assertThat(ukRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 1L))); + assertThat(ukRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0, 1L))); StreamingAssert franceAssertions = assertionFactory.newStreamingAssert(table, EUROPE_FRANCE); franceAssertions.assertMinTransactionId(1L); @@ -538,7 +538,7 @@ public void testUpdatesAndDeletes() throws Exception { List franceRecords = franceAssertions.readRecords(); assertThat(franceRecords.size(), is(1)); assertThat(franceRecords.get(0).getRow(), is("{6, UPDATED: Bonjour streaming 2}")); - assertThat(franceRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 1L))); + assertThat(franceRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0, 1L))); client.close(); } diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java index 437946b0c6..fa8806482f 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java @@ -40,7 +40,7 @@ public void testAttachBucketIdToRecord() { MutableRecord record = new MutableRecord(1, "hello"); capturingBucketIdResolver.attachBucketIdToRecord(record); - assertThat(record.rowId, is(new RecordIdentifier(-1L, 1, -1L))); + assertThat(record.rowId, is(new RecordIdentifier(-1L, 1, 1, -1L))); assertThat(record.id, is(1)); assertThat(record.msg.toString(), is("hello")); } diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java index d8974777bd..cb0d1a9ce7 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java @@ -54,10 +54,10 @@ private static final Path PATH_A = new Path("X"); private static final Path PATH_B = new Path("B"); private static final Object RECORD = "RECORD"; - private static final RecordIdentifier ROW__ID_B0_R0 = new RecordIdentifier(10L, BUCKET_ID, 0L); - private static final RecordIdentifier ROW__ID_B0_R1 = new RecordIdentifier(10L, BUCKET_ID, 1L); - private static final RecordIdentifier ROW__ID_B1_R0 = new RecordIdentifier(10L, BUCKET_ID + 1, 0L); - private static final RecordIdentifier ROW__ID_INSERT = new RecordIdentifier(-1L, BUCKET_ID, -1L); + private static final RecordIdentifier ROW__ID_B0_R0 = new RecordIdentifier(10L, BUCKET_ID, BUCKET_ID, 0L); + private static final RecordIdentifier ROW__ID_B0_R1 = new RecordIdentifier(10L, BUCKET_ID, BUCKET_ID, 1L); + private static final RecordIdentifier ROW__ID_B1_R0 = new RecordIdentifier(10L, BUCKET_ID + 1, BUCKET_ID + 1, 0L); + private static final RecordIdentifier ROW__ID_INSERT = new RecordIdentifier(-1L, BUCKET_ID, BUCKET_ID, -1L); @Mock private MutatorFactory mockMutatorFactory; @@ -88,7 +88,7 @@ public void createCoordinator() throws Exception { when(mockAcidTable.createPartitions()).thenReturn(true); when(mockMutatorFactory.newRecordInspector()).thenReturn(mockRecordInspector); when(mockMutatorFactory.newBucketIdResolver(anyInt())).thenReturn(mockBucketIdResolver); - when(mockMutatorFactory.newMutator(any(OrcOutputFormat.class), anyLong(), any(Path.class), anyInt())).thenReturn( + when(mockMutatorFactory.newMutator(any(OrcOutputFormat.class), eq(mockAcidTable), any(Path.class), anyInt())).thenReturn( mockMutator); when(mockPartitionHelper.getPathForPartition(any(List.class))).thenReturn(PATH_A); when(mockRecordInspector.extractRecordIdentifier(RECORD)).thenReturn(ROW__ID_INSERT); @@ -104,7 +104,7 @@ public void insert() throws Exception { coordinator.insert(UNPARTITIONED, RECORD); verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED); - verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID)); + verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(mockAcidTable), eq(PATH_A), eq(BUCKET_ID)); verify(mockMutator).insert(RECORD); } @@ -115,7 +115,7 @@ public void multipleInserts() throws Exception { coordinator.insert(UNPARTITIONED, RECORD); verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED); - verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID)); + verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(mockAcidTable), eq(PATH_A), eq(BUCKET_ID)); verify(mockMutator, times(3)).insert(RECORD); } @@ -129,8 +129,8 @@ public void insertPartitionChanges() throws Exception { verify(mockPartitionHelper).createPartitionIfNotExists(PARTITION_A); verify(mockPartitionHelper).createPartitionIfNotExists(PARTITION_B); - verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID)); - verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_B), eq(BUCKET_ID)); + verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(mockAcidTable), eq(PATH_A), eq(BUCKET_ID)); + verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(mockAcidTable), eq(PATH_B), eq(BUCKET_ID)); verify(mockMutator, times(2)).insert(RECORD); } @@ -143,9 +143,9 @@ public void bucketChanges() throws Exception { coordinator.update(UNPARTITIONED, RECORD); coordinator.delete(UNPARTITIONED, RECORD); - verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID)); + verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(mockAcidTable), eq(PATH_A), eq(BUCKET_ID)); verify(mockMutatorFactory) - .newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID + 1)); + .newMutator(any(OrcOutputFormat.class), eq(mockAcidTable), eq(PATH_A), eq(BUCKET_ID + 1)); verify(mockMutator).update(RECORD); verify(mockMutator).delete(RECORD); } @@ -166,11 +166,11 @@ public void partitionThenBucketChanges() throws Exception { coordinator.update(PARTITION_B, RECORD); /* PbB1 */ verify(mockPartitionHelper).createPartitionIfNotExists(PARTITION_B); - verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID)); - verify(mockMutatorFactory, times(2)).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_B), + verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(mockAcidTable), eq(PATH_A), eq(BUCKET_ID)); + verify(mockMutatorFactory, times(2)).newMutator(any(OrcOutputFormat.class), eq(mockAcidTable), eq(PATH_B), eq(BUCKET_ID)); verify(mockMutatorFactory) - .newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_B), eq(BUCKET_ID + 1)); + .newMutator(any(OrcOutputFormat.class), eq(mockAcidTable), eq(PATH_B), eq(BUCKET_ID + 1)); verify(mockMutator, times(2)).update(RECORD); verify(mockMutator).delete(RECORD); verify(mockMutator).insert(RECORD); @@ -197,7 +197,7 @@ public void outOfSequence() throws Exception { coordinator.delete(UNPARTITIONED, RECORD); verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED); - verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID)); + verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(mockAcidTable), eq(PATH_A), eq(BUCKET_ID)); verify(mockMutator).update(RECORD); verify(mockMutator).delete(RECORD); } @@ -210,7 +210,7 @@ public void revisitGroup() throws Exception { coordinator.delete(UNPARTITIONED, RECORD); verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED); - verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID)); + verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(mockAcidTable), eq(PATH_A), eq(BUCKET_ID)); verify(mockMutator).update(RECORD); verify(mockMutator).delete(RECORD); } diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorImpl.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorImpl.java index 9aeeb312d9..2472ace7ec 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorImpl.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorImpl.java @@ -48,6 +48,7 @@ private static final Object RECORD = new Object(); private static final int RECORD_ID_COLUMN = 2; private static final int BUCKET_ID = 0; + private static final int NUM_BUCKETS = 2; private static final Path PATH = new Path("X"); private static final long TRANSACTION_ID = 1L; @@ -68,7 +69,7 @@ public void injectMocks() throws IOException { when(mockOutputFormat.getRecordUpdater(eq(PATH), any(Options.class))).thenReturn(mockRecordUpdater); mutator = new MutatorImpl(configuration, RECORD_ID_COLUMN, mockObjectInspector, mockOutputFormat, TRANSACTION_ID, - PATH, BUCKET_ID); + PATH, BUCKET_ID, NUM_BUCKETS); } @Test @@ -76,6 +77,7 @@ public void testCreatesRecordReader() throws IOException { verify(mockOutputFormat).getRecordUpdater(eq(PATH), captureOptions.capture()); Options options = captureOptions.getValue(); assertThat(options.getBucket(), is(BUCKET_ID)); + assertThat(options.getNumBuckets(), is(NUM_BUCKETS)); assertThat(options.getConfiguration(), is((Configuration) configuration)); assertThat(options.getInspector(), is(mockObjectInspector)); assertThat(options.getRecordIdColumn(), is(RECORD_ID_COLUMN)); diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestRecordInspectorImpl.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestRecordInspectorImpl.java index 55da312a5a..214022219e 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestRecordInspectorImpl.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestRecordInspectorImpl.java @@ -35,7 +35,7 @@ @Test public void testExtractRecordIdentifier() { - RecordIdentifier recordIdentifier = new RecordIdentifier(10L, 4, 20L); + RecordIdentifier recordIdentifier = new RecordIdentifier(10L, 4, 4, 20L); MutableRecord record = new MutableRecord(1, "hello", recordIdentifier); assertThat(inspector.extractRecordIdentifier(record), is(recordIdentifier)); } diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestSequenceValidator.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestSequenceValidator.java index 2b3f79fcc0..44075db845 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestSequenceValidator.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestSequenceValidator.java @@ -31,73 +31,73 @@ @Test public void testSingleInSequence() { - assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, BUCKET_ID, 0)), is(true)); } @Test public void testRowIdInSequence() { - assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true)); - assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 1)), is(true)); - assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 4)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, BUCKET_ID, 0)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, BUCKET_ID, 1)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, BUCKET_ID, 4)), is(true)); } @Test public void testTxIdInSequence() { - assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true)); - assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, 0)), is(true)); - assertThat(validator.isInSequence(new RecordIdentifier(4L, BUCKET_ID, 0)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, BUCKET_ID, 0)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, BUCKET_ID, 0)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(4L, BUCKET_ID, BUCKET_ID, 0)), is(true)); } @Test public void testMixedInSequence() { - assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true)); - assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 1)), is(true)); - assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, 0)), is(true)); - assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, 1)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, BUCKET_ID, 0)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, BUCKET_ID, 1)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, BUCKET_ID, 0)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, BUCKET_ID, 1)), is(true)); } @Test public void testNegativeTxId() { - assertThat(validator.isInSequence(new RecordIdentifier(-1L, BUCKET_ID, 0)), is(true)); - assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(-1L, BUCKET_ID, BUCKET_ID, 0)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, BUCKET_ID, 0)), is(true)); } @Test public void testNegativeRowId() { - assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, -1)), is(true)); - assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, BUCKET_ID, -1)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, BUCKET_ID, 0)), is(true)); } @Test public void testRowIdOutOfSequence() { - assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true)); - assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 4)), is(true)); - assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 1)), is(false)); + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, BUCKET_ID, 0)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, BUCKET_ID, 4)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, BUCKET_ID, 1)), is(false)); } @Test public void testReset() { - assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true)); - assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 4)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, BUCKET_ID, 0)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, BUCKET_ID, 4)), is(true)); // New partition for example validator.reset(); - assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 1)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, BUCKET_ID, 1)), is(true)); } @Test public void testTxIdOutOfSequence() { - assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true)); - assertThat(validator.isInSequence(new RecordIdentifier(4L, BUCKET_ID, 0)), is(true)); - assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, 0)), is(false)); + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, BUCKET_ID, 0)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(4L, BUCKET_ID, BUCKET_ID, 0)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, BUCKET_ID, 0)), is(false)); } @Test public void testMixedOutOfSequence() { - assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 0)), is(true)); - assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, 4)), is(true)); - assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, 0)), is(false)); - assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, 5)), is(true)); - assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, 6)), is(false)); + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, BUCKET_ID, 0)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, BUCKET_ID, 4)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, BUCKET_ID, 0)), is(false)); + assertThat(validator.isInSequence(new RecordIdentifier(1L, BUCKET_ID, BUCKET_ID, 5)), is(true)); + assertThat(validator.isInSequence(new RecordIdentifier(0L, BUCKET_ID, BUCKET_ID, 6)), is(false)); } @Test(expected = NullPointerException.class) diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 9aced9fc02..3575dc8584 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1203,6 +1203,8 @@ private int acquireLocks() { //it's possible to have > 1 FileSink writing to the same table/partition //e.g. Merge stmt, multi-insert stmt when mixing DP and SP writes desc.setStatementId(txnMgr.getWriteIdAndIncrement()); + /*so if we have > 1 FS in the same plan we may be writing to the same partition in parallel*/ + desc.setNumBuckets(); } } /*It's imperative that {@code acquireLocks()} is called for all commands so that diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 3e0943251e..0585cd4292 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -769,6 +769,7 @@ public void process(Object row, int tag) throws HiveException { ObjectInspector rowInspector = bDynParts ? subSetOI : outputObjInspector; Object recId = ((StructObjectInspector)rowInspector).getStructFieldData(row, recIdField); int bucketNum = +// bucketInspector.get(recIdInspector.getStructFieldData(recId, bucketField)) % conf.getNumBuckets();//todo: move to JavaUtils or something bucketInspector.get(recIdInspector.getStructFieldData(recId, bucketField)); if (fpaths.acidLastBucket != bucketNum) { fpaths.acidLastBucket = bucketNum; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java index 7c7074d156..1186478a40 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java @@ -209,6 +209,7 @@ public Reporter getReporter() { public ObjectInspector getObjectInspector(); public boolean isDelete(V value); + int getNumBuckets(); } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java index b85b827424..d3d90a1601 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; +import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.Reporter; @@ -51,6 +52,8 @@ private long minimumTransactionId; private long maximumTransactionId; private int bucket; + //"total number of buckets from cluster by (x) into N buckets" + private int numBuckets = OrcRecordUpdater.BC_UNKNOWN; private PrintStream dummyStream = null; private boolean oldStyle = false; private int recIdCol = -1; // Column the record identifier is in, -1 indicates no record id @@ -178,7 +181,10 @@ public Options bucket(int bucket) { this.bucket = bucket; return this; } - + public Options numBuckets(int numBuckets) { + this.numBuckets = numBuckets; + return this; + } /** * Whether it should use the old style (0000000_0) filenames. * @param value should use the old style names @@ -278,6 +284,7 @@ public boolean isWritingDeleteDelta() { public int getBucket() { return bucket; } + public int getNumBuckets() { return numBuckets; } public int getRecordIdColumn() { return recIdCol; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index da00bb3363..1640c7b38b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.TransactionalValidationListener; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; @@ -1177,4 +1178,9 @@ public static AcidOperationalProperties getAcidOperationalProperties( } return AcidOperationalProperties.parseString(resultStr); } + public static int getBucketId(int writerId, int numBuckets) { + assert writerId >= 0 && (numBuckets == OrcRecordUpdater.BC_UNKNOWN || numBuckets >= 0) : + "writerId=" + writerId + " numBuckets=" + numBuckets; + return numBuckets > 0 ? writerId % numBuckets : writerId; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java index cc69c7eab8..c69d38be3f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java @@ -344,6 +344,7 @@ private static RecordUpdater getRecordUpdater(JobConf jc, .minimumTransactionId(conf.getTransactionId()) .maximumTransactionId(conf.getTransactionId()) .bucket(bucket) + .numBuckets(conf.getNumBuckets()) .inspector(inspector) .recordIdColumn(rowIdColNum) .statementId(conf.getStatementId()) diff --git ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java index 7f2c169ce5..82bf786e95 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java @@ -48,6 +48,7 @@ transactionId(TypeInfoFactory.longTypeInfo, PrimitiveObjectInspectorFactory.javaLongObjectInspector), bucketId(TypeInfoFactory.intTypeInfo, PrimitiveObjectInspectorFactory.javaIntObjectInspector), + writerId(TypeInfoFactory.intTypeInfo, PrimitiveObjectInspectorFactory.javaIntObjectInspector), rowId(TypeInfoFactory.longTypeInfo, PrimitiveObjectInspectorFactory.javaLongObjectInspector); public final TypeInfo fieldType; public final ObjectInspector fieldOI; @@ -90,20 +91,29 @@ public static void toArray(RecordIdentifier ri, Object[] struct) { } struct[Field.transactionId.ordinal()] = ri.getTransactionId(); struct[Field.bucketId.ordinal()] = ri.getBucketId(); + struct[Field.writerId.ordinal()] = ri.getWriterId(); struct[Field.rowId.ordinal()] = ri.getRowId(); } } - private long transactionId; + /** + * Invariant: writerId % (# buckets) == bucketId for bucketed tables + * Invariant: writerId == bucketId for non-bucketed tables + */ private int bucketId; + /** + * {@link org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater#writerId} + */ + private int writerId; private long rowId; public RecordIdentifier() { } - public RecordIdentifier(long transactionId, int bucket, long rowId) { + public RecordIdentifier(long transactionId, int bucket, int writerId, long rowId) { this.transactionId = transactionId; this.bucketId = bucket; + this.writerId = writerId; this.rowId = rowId; } @@ -113,9 +123,10 @@ public RecordIdentifier(long transactionId, int bucket, long rowId) { * @param bucketId the bucket id * @param rowId the row id */ - public void setValues(long transactionId, int bucketId, long rowId) { + public void setValues(long transactionId, int bucketId, int writerId, long rowId) { this.transactionId = transactionId; this.bucketId = bucketId; + this.writerId = writerId; this.rowId = rowId; } @@ -126,6 +137,7 @@ public void setValues(long transactionId, int bucketId, long rowId) { public void set(RecordIdentifier other) { this.transactionId = other.transactionId; this.bucketId = other.bucketId; + this.writerId = other.writerId; this.rowId = other.rowId; } @@ -148,6 +160,9 @@ public long getTransactionId() { public int getBucketId() { return bucketId; } + public int getWriterId() { + return writerId; + } /** * What was the original row id for the last row? @@ -167,6 +182,9 @@ protected int compareToInternal(RecordIdentifier other) { if (bucketId != other.bucketId) { return bucketId < other.bucketId ? - 1 : 1; } + if (writerId != other.bucketId) { + return writerId < other.writerId ? -1 : 1; + } if (rowId != other.rowId) { return rowId < other.rowId ? -1 : 1; } @@ -185,6 +203,7 @@ public int compareTo(RecordIdentifier other) { public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(transactionId); dataOutput.writeInt(bucketId); + dataOutput.writeInt(writerId); dataOutput.writeLong(rowId); } @@ -192,6 +211,7 @@ public void write(DataOutput dataOutput) throws IOException { public void readFields(DataInput dataInput) throws IOException { transactionId = dataInput.readLong(); bucketId = dataInput.readInt(); + writerId = dataInput.readInt(); rowId = dataInput.readLong(); } @@ -205,21 +225,22 @@ public boolean equals(Object other) { } RecordIdentifier oth = (RecordIdentifier) other; return oth.transactionId == transactionId && - oth.bucketId == bucketId && + oth.writerId == writerId &&//if writerIds are ==, bucketId must be as well oth.rowId == rowId; } @Override public int hashCode() { int result = 17; result = 31 * result + (int)(transactionId ^ (transactionId >>> 32)); - result = 31 * result + bucketId; + //ignore bucketId since bucketId = f(writerId) + result = 31 * result + writerId; result = 31 * result + (int)(rowId ^ (rowId >>> 32)); return result; } @Override public String toString() { - return "{originalTxn: " + transactionId + ", bucket: " + - bucketId + ", row: " + getRowId() + "}"; + return "{originalTxn: " + transactionId + "bucket: " + bucketId + ", writer: " + + writerId + ", row: " + getRowId() + "}"; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java index a179300919..6465777763 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java @@ -296,7 +296,7 @@ public RecordUpdater getRecordUpdater(Path path, .rowIndexStride(0); } final OrcRecordUpdater.KeyIndexBuilder watcher = - new OrcRecordUpdater.KeyIndexBuilder(); + new OrcRecordUpdater.KeyIndexBuilder(options.getNumBuckets()); opts.inspector(options.getInspector()) .callback(watcher); final Writer writer = OrcFile.createWriter(filename, opts); @@ -308,7 +308,7 @@ public void write(Writable w) throws IOException { ((IntWritable) orc.getFieldValue(OrcRecordUpdater.OPERATION)).get(), ((LongWritable) orc.getFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION)).get(), - ((IntWritable) orc.getFieldValue(OrcRecordUpdater.BUCKET)).get(), + ((IntWritable) orc.getFieldValue(OrcRecordUpdater.BUCKET)).get(),//this is now the writer I hope ((LongWritable) orc.getFieldValue(OrcRecordUpdater.ROW_ID)).get()); writer.addRow(w); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 95b8806e70..84024f5fd6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.io.orc; import java.io.IOException; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -66,6 +65,12 @@ private RecordIdentifier maxKey; // an extra value so that we can return it while reading ahead private OrcStruct extraValue; + private final int numBuckets; + @Override + public int getNumBuckets() { + return numBuckets; + } + /** * A RecordIdentifier extended with the current transaction id. This is the @@ -80,19 +85,19 @@ private int statementId;//sort on this descending, like currentTransactionId public ReaderKey() { - this(-1, -1, -1, -1, 0); + this(-1, -1, -1, -1, -1, 0); } - public ReaderKey(long originalTransaction, int bucket, long rowId, + public ReaderKey(long originalTransaction, int bucket, int writerId, long rowId, long currentTransactionId) { - this(originalTransaction, bucket, rowId, currentTransactionId, 0); + this(originalTransaction, bucket, writerId, rowId, currentTransactionId, 0); } /** * @param statementId - set this to 0 if N/A */ - public ReaderKey(long originalTransaction, int bucket, long rowId, + public ReaderKey(long originalTransaction, int bucket, int writerId, long rowId, long currentTransactionId, int statementId) { - super(originalTransaction, bucket, rowId); + super(originalTransaction, bucket, writerId, rowId); this.currentTransactionId = currentTransactionId; this.statementId = statementId; } @@ -106,10 +111,11 @@ public void set(RecordIdentifier other) { public void setValues(long originalTransactionId, int bucket, + int writerId, long rowId, long currentTransactionId, int statementId) { - setValues(originalTransactionId, bucket, rowId); + setValues(originalTransactionId, bucket, writerId, rowId); this.currentTransactionId = currentTransactionId; this.statementId = statementId; } @@ -190,8 +196,12 @@ public String toString() { final ReaderKey key; final RecordIdentifier maxKey; final int bucket; + private final int numBuckets; private final int statementId; + private int getNumBuckets() { + return numBuckets; + } /** * Create a reader that reads from the first key larger than minKey to any * keys equal to maxKey. @@ -212,6 +222,7 @@ public String toString() { this.key = key; this.maxKey = maxKey; this.bucket = bucket; + this.numBuckets = OrcRecordUpdater.parseBucketCount(reader); // TODO use stripe statistics to jump over stripes recordReader = reader.rowsOptions(options); this.statementId = statementId; @@ -225,9 +236,11 @@ public String toString() { void next(OrcStruct next) throws IOException { if (recordReader.hasNext()) { nextRecord = (OrcStruct) recordReader.next(next); + int writerId = OrcRecordUpdater.getWriterId(nextRecord); // set the key key.setValues(OrcRecordUpdater.getOriginalTransaction(nextRecord), - OrcRecordUpdater.getBucket(nextRecord), + AcidUtils.getBucketId(writerId, numBuckets), + writerId, OrcRecordUpdater.getRowId(nextRecord), OrcRecordUpdater.getCurrentTransaction(nextRecord), statementId); @@ -277,8 +290,8 @@ void next(OrcStruct next) throws IOException { new LongWritable(0)); nextRecord.setFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION, new LongWritable(0)); - nextRecord.setFieldValue(OrcRecordUpdater.BUCKET, - new IntWritable(bucket)); + nextRecord.setFieldValue(OrcRecordUpdater.BUCKET,//this is actually writerId + new IntWritable(bucket));//this is 'original' so writerId = bucketId - todo: name change nextRecord.setFieldValue(OrcRecordUpdater.ROW_ID, new LongWritable(nextRowId)); nextRecord.setFieldValue(OrcRecordUpdater.ROW, @@ -290,7 +303,7 @@ void next(OrcStruct next) throws IOException { ((LongWritable) next.getFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION)) .set(0); ((IntWritable) next.getFieldValue(OrcRecordUpdater.BUCKET)) - .set(bucket); + .set(bucket);//this is 'original' so writerId = bucketId - todo: name change ((LongWritable) next.getFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION)) .set(0); ((LongWritable) next.getFieldValue(OrcRecordUpdater.ROW_ID)) @@ -298,7 +311,7 @@ void next(OrcStruct next) throws IOException { nextRecord.setFieldValue(OrcRecordUpdater.ROW, recordReader.next(OrcRecordUpdater.getRow(next))); } - key.setValues(0L, bucket, nextRowId, 0L, 0); + key.setValues(0L, bucket, bucket, nextRowId, 0L, 0); if (maxKey != null && key.compareRow(maxKey) > 0) { if (LOG.isDebugEnabled()) { LOG.debug("key " + key + " > maxkey " + maxKey); @@ -352,11 +365,13 @@ private void discoverOriginalKeyBounds(Reader reader, int bucket, break; } } + //writerId == bucketId below since we assume the the "original" files were written such that + //number of reducers (writers) <= num buckets if (rowOffset > 0) { - minKey = new RecordIdentifier(0, bucket, rowOffset - 1); + minKey = new RecordIdentifier(0, bucket, bucket, rowOffset - 1); } if (!isTail) { - maxKey = new RecordIdentifier(0, bucket, rowOffset + rowLength - 1); + maxKey = new RecordIdentifier(0, bucket, bucket, rowOffset + rowLength - 1); } } @@ -439,7 +454,8 @@ private void discoverKeyBounds(Reader reader, this.offset = options.getOffset(); this.length = options.getLength(); this.validTxnList = validTxnList; - + int numBuckets = OrcRecordUpdater.BC_UNKNOWN; + TypeDescription typeDescr = OrcInputFormat.getDesiredRowTypeDescr(conf, true, Integer.MAX_VALUE); @@ -476,6 +492,7 @@ private void discoverKeyBounds(Reader reader, readers.put(key, pair); } baseReader = pair.recordReader; + numBuckets = pair.getNumBuckets(); } // we always want to read all of the deltas @@ -507,10 +524,11 @@ private void discoverKeyBounds(Reader reader, if (deltaPair.nextRecord != null) { readers.put(key, deltaPair); } + numBuckets = getNumBuckets(deltaPair, numBuckets, deltaFile); } } } - + this.numBuckets = numBuckets; // get the first record Map.Entry entry = readers.pollFirstEntry(); if (entry == null) { @@ -528,6 +546,22 @@ private void discoverKeyBounds(Reader reader, } } + /** + * Base/delta may or may predate {@link OrcRecordUpdater#ACID_BUCKET_COUNT} so + * this figures out numBuckets across all files and asserts that all files agree on a value. + */ + private int getNumBuckets(ReaderPair deltaPair, int numBuckets, Path deltaFile) { + if(numBuckets == OrcRecordUpdater.BC_UNKNOWN) { + return deltaPair.getNumBuckets(); + } + else { + if(numBuckets != deltaPair.getNumBuckets()) { + throw new IllegalStateException(deltaFile + " has " + OrcRecordUpdater.ACID_BUCKET_COUNT + + "=" + deltaPair.getNumBuckets() + " but other deltas or base has " + numBuckets); + } + } + return numBuckets; + } @VisibleForTesting RecordIdentifier getMinKey() { return minKey; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 65f4a24750..935c658dc6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -48,16 +48,26 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; - /** * A RecordUpdater where the files are stored as ORC. + * + * A note on various record structures: the {@code row} coming in (as in {@link #insert(long, Object)} + * for example), is a struct like but what is written to the file + * is > (see {@link #createEventSchema(ObjectInspector)}) + * So there are OIs here to make the translation. */ public class OrcRecordUpdater implements RecordUpdater { private static final Logger LOG = LoggerFactory.getLogger(OrcRecordUpdater.class); public static final String ACID_KEY_INDEX_NAME = "hive.acid.key.index"; + /** + * For bucketed tables, this is the number of buckets in the table. + * What should this be for un-bucketed? 0? -1? + */ + static final String ACID_BUCKET_COUNT = "hive.acid.bucket.count"; + public static final int BC_UNKNOWN = -1; + public static final int BC_UNBUCKETED = 0; public static final String ACID_FORMAT = "_orc_acid_version"; public static final int ORC_ACID_VERSION = 0; @@ -68,7 +78,7 @@ final static int OPERATION = 0; final static int ORIGINAL_TRANSACTION = 1; - final static int BUCKET = 2; + final static int BUCKET = 2;//todo: make WRITER_ID final static int ROW_ID = 3; final static int CURRENT_TRANSACTION = 4; final static int ROW = 5; @@ -93,24 +103,52 @@ private final IntWritable operation = new IntWritable(); private final LongWritable currentTransaction = new LongWritable(-1); private final LongWritable originalTransaction = new LongWritable(-1); - private final IntWritable bucket = new IntWritable(); + /** + * This is the bucket id used in the data file name. This is a calculated value - not written to + * the data file. + * {@link RecordIdentifier.Field#bucketId} + * Invariant: (0 <= bucketId < numBuckets) + */ + private final int bucketId; + /** + * This is the logical ID of the writer. + * {@link RecordIdentifier.Field#writerId} + * Invariant: (writerId % numBuckets == bucketId) + */ + private final int writerId; + /** + * N in "CLUSTER BY (...) INTO N BUCKETS" + * {@link #BC_UNBUCKETED} if not bucketed (not supported yet) + */ + private final int numBuckets; + /** + * this contains the value for {@link RecordIdentifier.Field#writerId}. For INSERT it should + * always be {@link #writerId}. For UPDATE/DELETE it should always come from the row + * being modified. + * Invariant: (writerID.get() % numBuckets == bucketId) + */ + private final IntWritable writerID = new IntWritable(); private final LongWritable rowId = new LongWritable(); + /** + * per txn; used to generate RecordIdentifier.rowId + */ private long insertedRows = 0; private long rowIdOffset = 0; // This records how many rows have been inserted or deleted. It is separate from insertedRows // because that is monotonically increasing to give new unique row ids. private long rowCountDelta = 0; - private final KeyIndexBuilder indexBuilder = new KeyIndexBuilder(); + private final KeyIndexBuilder indexBuilder; private KeyIndexBuilder deleteEventIndexBuilder; - private StructField recIdField = null; // field to look for the record identifier in + private StructField recordIdentifierField = null; // field to look for the record identifier in private StructField rowIdField = null; // field inside recId to look for row id in private StructField originalTxnField = null; // field inside recId to look for original txn in - private StructField bucketField = null; // field inside recId to look for bucket in + private StructField writerIdField = null; // field inside recId to look for writerId in private StructObjectInspector rowInspector; // OI for the original row - private StructObjectInspector recIdInspector; // OI for the record identifier struct + private StructObjectInspector recordIdentifierInspector; // OI for the record identifier struct private LongObjectInspector rowIdInspector; // OI for the long row id inside the recordIdentifier private LongObjectInspector origTxnInspector; // OI for the original txn inside the record - // identifer + // identifier + private IntObjectInspector writerIdInspector; static int getOperation(OrcStruct struct) { return ((IntWritable) struct.getFieldValue(OPERATION)).get(); @@ -124,7 +162,7 @@ static long getOriginalTransaction(OrcStruct struct) { return ((LongWritable) struct.getFieldValue(ORIGINAL_TRANSACTION)).get(); } - static int getBucket(OrcStruct struct) { + static int getWriterId(OrcStruct struct) { return ((IntWritable) struct.getFieldValue(BUCKET)).get(); } @@ -174,7 +212,7 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { fields.add(new OrcStruct.Field("originalTransaction", PrimitiveObjectInspectorFactory.writableLongObjectInspector, ORIGINAL_TRANSACTION)); - fields.add(new OrcStruct.Field("bucket", + fields.add(new OrcStruct.Field("bucket",//todo: is this safe to change? re: reading old files - it changes metadata in the file - evidently not.... see SchemaEvolution.checkAcidSchema() PrimitiveObjectInspectorFactory.writableIntObjectInspector, BUCKET)); fields.add(new OrcStruct.Field("rowId", PrimitiveObjectInspectorFactory.writableLongObjectInspector, ROW_ID)); @@ -200,7 +238,19 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { this.acidOperationalProperties = AcidUtils.getAcidOperationalProperties(options.getConfiguration()); } - this.bucket.set(options.getBucket()); + numBuckets = options.getNumBuckets(); + indexBuilder = new KeyIndexBuilder(numBuckets); + bucketId = options.getBucket(); + assert numBuckets > 0 && numBuckets > bucketId : + "bucketId=" + bucketId + " numBuckets=" + numBuckets + + " statementId=" + options.getStatementId() + " path=" + path; + + writerId = options.getStatementId() >=0 ?//todo: comment this + bucketId + numBuckets * options.getStatementId() : bucketId; + //writerId = options.getBucket();//todo: remove this - just for testing + assert writerId >= 0 && writerId % numBuckets == bucketId : + "lb=" + writerId + " b=" + bucketId + " stId=" + options.getStatementId(); + this.writerID.set(writerId); this.path = AcidUtils.createFilename(path, options); this.deleteEventWriter = null; this.deleteEventPath = null; @@ -221,7 +271,8 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { } } if (options.getMinimumTransactionId() != options.getMaximumTransactionId() - && !options.isWritingBase()){ + && !options.isWritingBase()){//compactor doesn't use OrcRecordUpdater so if min != max + // this must be called from Streaming Ingest API flushLengths = fs.create(OrcAcidUtils.getSideFile(this.path), true, 8, options.getReporter()); } else { @@ -275,7 +326,7 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { item.setFieldValue(OPERATION, operation); item.setFieldValue(CURRENT_TRANSACTION, currentTransaction); item.setFieldValue(ORIGINAL_TRANSACTION, originalTransaction); - item.setFieldValue(BUCKET, bucket); + item.setFieldValue(BUCKET, writerID); item.setFieldValue(ROW_ID, rowId); } @@ -287,7 +338,6 @@ public String toString() { * To handle multiple INSERT... statements in a single transaction, we want to make sure * to generate unique {@code rowId} for all inserted rows of the transaction. * @return largest rowId created by previous statements (maybe 0) - * @throws IOException */ private long findRowIdOffsetForInsert() throws IOException { /* @@ -330,19 +380,22 @@ private ObjectInspector findRecId(ObjectInspector inspector, int rowIdColNum) { } else { RecIdStrippingObjectInspector newInspector = new RecIdStrippingObjectInspector(inspector, rowIdColNum); - recIdField = newInspector.getRecId(); - List fields = - ((StructObjectInspector) recIdField.getFieldObjectInspector()).getAllStructFieldRefs(); + recordIdentifierField = newInspector.getRecId();//this is the RecordIdentifier + List fields = + ((StructObjectInspector) recordIdentifierField.getFieldObjectInspector()) + .getAllStructFieldRefs(); // Go by position, not field name, as field names aren't guaranteed. The order of fields // in RecordIdentifier is transactionId, bucketId, rowId originalTxnField = fields.get(0); origTxnInspector = (LongObjectInspector)originalTxnField.getFieldObjectInspector(); - bucketField = fields.get(1); - rowIdField = fields.get(2); + writerIdField = fields.get(2); + writerIdInspector = (IntObjectInspector) writerIdField.getFieldObjectInspector(); + rowIdField = fields.get(3); rowIdInspector = (LongObjectInspector)rowIdField.getFieldObjectInspector(); - recIdInspector = (StructObjectInspector) recIdField.getFieldObjectInspector(); + recordIdentifierInspector = (StructObjectInspector) + recordIdentifierField.getFieldObjectInspector(); return newInspector; } } @@ -355,23 +408,39 @@ private void addSimpleEvent(int operation, long currentTransaction, long rowId, // it will be reset by the following if anyway. long originalTransaction = currentTransaction; if (operation == DELETE_OPERATION || operation == UPDATE_OPERATION) { - Object rowIdValue = rowInspector.getStructFieldData(row, recIdField); - originalTransaction = origTxnInspector.get( - recIdInspector.getStructFieldData(rowIdValue, originalTxnField)); - rowId = rowIdInspector.get(recIdInspector.getStructFieldData(rowIdValue, rowIdField)); + Object recordIdentifier = rowInspector.getStructFieldData(row, recordIdentifierField); + originalTransaction = origTxnInspector.get(recordIdentifierInspector + .getStructFieldData(recordIdentifier, originalTxnField)); + rowId = rowIdInspector.get(recordIdentifierInspector + .getStructFieldData(recordIdentifier, rowIdField)); + /** + * If next line throws, something in the shuffle went wrong - i.e. this row belongs to another + * bucket. + * + * When handling update/delete all rows with a given + * RecordIdentifier{@link #bucketId} will shuffle to the same writer but but the + * {@link RecordIdentifier#writerId} may be different; + * in particular, it may be different from this {@link #writerId} + */ + setWriterID(operation, writerIdInspector.get(recordIdentifierInspector + .getStructFieldData(recordIdentifier, writerIdField)), this.path); } else if(operation == INSERT_OPERATION) { + assertInvariants(this.path);//if this fails, restoreWriterID() wasn't called somewhere rowId += rowIdOffset; } this.rowId.set(rowId); this.originalTransaction.set(originalTransaction); item.setFieldValue(OrcRecordUpdater.OPERATION, new IntWritable(operation)); item.setFieldValue(OrcRecordUpdater.ROW, (operation == DELETE_OPERATION ? null : row)); - indexBuilder.addKey(operation, originalTransaction, bucket.get(), rowId); + updateIndex(indexBuilder, operation, originalTransaction, rowId); if (writer == null) { writer = OrcFile.createWriter(path, writerOptions); } writer.addRow(item); + if (operation == DELETE_OPERATION || operation == UPDATE_OPERATION) { + restoreWriterID(operation); + } } private void addSplitUpdateEvent(int operation, long currentTransaction, long rowId, Object row) @@ -383,17 +452,19 @@ private void addSplitUpdateEvent(int operation, long currentTransaction, long ro } this.operation.set(operation); this.currentTransaction.set(currentTransaction); - Object rowValue = rowInspector.getStructFieldData(row, recIdField); + Object recordIdentifier = rowInspector.getStructFieldData(row, recordIdentifierField); long originalTransaction = origTxnInspector.get( - recIdInspector.getStructFieldData(rowValue, originalTxnField)); + recordIdentifierInspector.getStructFieldData(recordIdentifier, originalTxnField)); rowId = rowIdInspector.get( - recIdInspector.getStructFieldData(rowValue, rowIdField)); + recordIdentifierInspector.getStructFieldData(recordIdentifier, rowIdField)); if (operation == DELETE_OPERATION || operation == UPDATE_OPERATION) { + setWriterID(operation, writerIdInspector.get(recordIdentifierInspector + .getStructFieldData(recordIdentifier, writerIdField)), deleteEventPath); // Initialize a deleteEventWriter if not yet done. (Lazy initialization) if (deleteEventWriter == null) { // Initialize an indexBuilder for deleteEvents. - deleteEventIndexBuilder = new KeyIndexBuilder(); + deleteEventIndexBuilder = new KeyIndexBuilder(numBuckets); // Change the indexBuilder callback too for the deleteEvent file, the remaining writer // options remain the same. @@ -412,8 +483,9 @@ private void addSplitUpdateEvent(int operation, long currentTransaction, long ro this.originalTransaction.set(originalTransaction); item.setFieldValue(OrcRecordUpdater.OPERATION, new IntWritable(DELETE_OPERATION)); item.setFieldValue(OrcRecordUpdater.ROW, null); // ROW is null for delete events. - deleteEventIndexBuilder.addKey(DELETE_OPERATION, originalTransaction, bucket.get(), rowId); + updateIndex(deleteEventIndexBuilder, DELETE_OPERATION, originalTransaction, rowId); deleteEventWriter.addRow(item); + restoreWriterID(operation); } if (operation == UPDATE_OPERATION) { @@ -422,6 +494,35 @@ private void addSplitUpdateEvent(int operation, long currentTransaction, long ro } } + /** + * If these don't match we'll have data loss on read - make the write fail-fast + * @param writerPath - target file to make error msg more meaningful + */ + private void assertInvariants(Path writerPath) { + if(!(bucketId == writerID.get() % numBuckets)) { + throw new IllegalStateException("bucketId=" + bucketId + + " writerID.get()=" + writerID.get() + " numBuckets=" + numBuckets + + " writerId=" + writerId + " for " + writerPath); + } + } + /** + * all calls to this method must have a matching call to {@link #restoreWriterID(int)} + */ + private void setWriterID(int operation, int id, Path writerPath) {//todo: rename + assert operation == UPDATE_OPERATION || operation == DELETE_OPERATION + :"Unexpected operation: " + operation; + writerID.set(id); + assertInvariants(writerPath); + } + private void restoreWriterID(int operation) {//todo: rename this to writerId + assert operation == UPDATE_OPERATION || operation == DELETE_OPERATION + :"Unexpected operation: " + operation; + writerID.set(writerId); + } + private void updateIndex(KeyIndexBuilder index, int operation, long originalTxnId, long rowId) { + index.addKey(operation, originalTxnId, writerID.get(), rowId); + } + @Override public void insert(long currentTransaction, Object row) throws IOException { if (this.currentTransaction.get() != currentTransaction) { @@ -532,8 +633,32 @@ public SerDeStats getStats() { private static final Charset utf8 = Charset.forName("UTF-8"); private static final CharsetDecoder utf8Decoder = utf8.newDecoder(); + /** + * Returns -1 to indicate that writerId == bucketId should hold for for all rows + * This happens in 2 cases: + * 1. If {@code reader} is a proper acid file and {@link #ACID_BUCKET_COUNT} property is not + * there, in which case it must have been written before this property was introduced. So the + * assumption is that it's a properly bucketed table. + * 2. If {@code reader} is an "original" file (as in non-acid to acid conversion) then by definition + * {@link #ACID_BUCKET_COUNT}. Currently we expect this to be a properly bucketed table. In the + * future we'll support non-bucketed tables and we'll still have writerId == bucketId. + * Returns 0 to indicate that table is not bucketed => writerId == bucketId + * Returns N to indicate the number of buckets the table has and so writerId % N == bucketId + */ + static int parseBucketCount(Reader reader) { + try { + if(reader.hasMetadataValue(ACID_BUCKET_COUNT)) { + ByteBuffer val2 = reader.getMetadataValue(ACID_BUCKET_COUNT); + return Integer.parseInt(utf8Decoder.decode(val2).toString()); + } + } catch (CharacterCodingException e) { + throw new IllegalArgumentException("Bad string encoding for " + ACID_BUCKET_COUNT, e); + } + return BC_UNKNOWN; + } static RecordIdentifier[] parseKeyIndex(Reader reader) { String[] stripes; + final int numBuckets = parseBucketCount(reader); try { ByteBuffer val = reader.getMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME) @@ -548,26 +673,32 @@ public SerDeStats getStats() { if (stripes[i].length() != 0) { String[] parts = stripes[i].split(","); result[i] = new RecordIdentifier(); + int writerId = Integer.parseInt(parts[1]); result[i].setValues(Long.parseLong(parts[0]), - Integer.parseInt(parts[1]), Long.parseLong(parts[2])); + AcidUtils.getBucketId(writerId, numBuckets), + writerId, Long.parseLong(parts[2])); } } return result; } - static class KeyIndexBuilder implements OrcFile.WriterCallback { StringBuilder lastKey = new StringBuilder(); long lastTransaction; - int lastBucket; + int lastWriterId; long lastRowId; + private final int numBuckets; AcidStats acidStats = new AcidStats(); + KeyIndexBuilder(int numBuckets) { + this.numBuckets = numBuckets; + assert numBuckets >= 0 : "numBuckets=" + numBuckets;//throw instead? + } @Override public void preStripeWrite(OrcFile.WriterContext context ) throws IOException { lastKey.append(lastTransaction); lastKey.append(','); - lastKey.append(lastBucket); + lastKey.append(lastWriterId); lastKey.append(','); lastKey.append(lastRowId); lastKey.append(';'); @@ -580,9 +711,12 @@ public void preFooterWrite(OrcFile.WriterContext context UTF8.encode(lastKey.toString())); context.getWriter().addUserMetadata(OrcAcidUtils.ACID_STATS, UTF8.encode(acidStats.serialize())); + context.getWriter().addUserMetadata(ACID_BUCKET_COUNT, + UTF8.encode(Integer.toString(numBuckets))); } - - void addKey(int op, long transaction, int bucket, long rowId) { + //ToDo: we don't need acidStats in Acid 2.0 since num rows in file == num inserts + // (deletes for delete delta) + void addKey(int op, long transaction, int writerId, long rowId) { switch (op) { case INSERT_OPERATION: acidStats.inserts += 1; @@ -597,7 +731,7 @@ void addKey(int op, long transaction, int bucket, long rowId) { throw new IllegalArgumentException("Unknown operation " + op); } lastTransaction = transaction; - lastBucket = bucket; + lastWriterId = writerId; lastRowId = rowId; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 75c7680e26..c58e9537b9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -409,6 +409,7 @@ public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet) new RecordIdentifier( originalTransaction != null ? originalTransaction[firstValidIndex] : repeatedOriginalTransaction, bucket != null ? (int) bucket[firstValidIndex] : (int) repeatedBucket, + bucket != null ? (int) bucket[firstValidIndex] : (int) repeatedBucket,//TODO: HACK! rowId != null ? (int) rowId[firstValidIndex] : repeatedRowId); // Get the last valid row in the batch still available. @@ -417,6 +418,7 @@ public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet) new RecordIdentifier( originalTransaction != null ? originalTransaction[lastValidIndex] : repeatedOriginalTransaction, bucket != null ? (int) bucket[lastValidIndex] : (int) repeatedBucket, + bucket != null ? (int) bucket[lastValidIndex] : (int) repeatedBucket,//TODO: HACK! rowId != null ? (int) rowId[lastValidIndex] : repeatedRowId); // We must iterate over all the delete records, until we find one record with @@ -436,6 +438,7 @@ public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet) currRecordIdInBatch.setValues( (originalTransaction != null) ? originalTransaction[currIndex] : repeatedOriginalTransaction, (bucket != null) ? (int) bucket[currIndex] : (int) repeatedBucket, + (bucket != null) ? (int) bucket[currIndex] : (int) repeatedBucket,//TODO: HACK (rowId != null) ? rowId[currIndex] : repeatedRowId); if (deleteRecordKey.compareRow(currRecordIdInBatch) == 0) { @@ -551,6 +554,9 @@ public boolean next(DeleteRecordKey deleteRecordKey) throws IOException { if (indexPtrInBatch >= batch.size) { // We have exhausted our current batch, read the next batch. if (recordReader.nextBatch(batch)) { + /* + * todo: this check only happens for 2nd batch + * */ // Whenever we are reading a batch, we must ensure that all the records in the batch // have the same bucket id as the bucket id of the split. If not, throw exception. // NOTE: this assertion might not hold, once virtual bucketing is in place. However, @@ -621,6 +627,17 @@ public int compareTo(CompressedOtid other) { } } + /** + * this is a bit problematic - in order to load ColumnizedDeleteEventRegistry we still open + * all delete deltas at once - possibly causing OOM same as for {@link SortMergedDeleteEventRegistry} + * which uses {@link OrcRawRecordMerger}. Why not load all delete_delta sequentially. Each + * dd is sorted by {@link RecordIdentifier} so we could create a BTree like structure where the + * 1st level is an array of originalTransactionId where each entry points at an array + * of bucketIds where each entry points at an array of rowIds. We could probably use ArrayList to + * manage insertion as the structure is built (LinkedList?). This should reduce memory foot print + * (as far as OrcReader to a single reader) + * + */ private TreeMap sortMerger; private long rowIds[]; private CompressedOtid compressedOtids[]; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java index 0541a4035a..78c511b016 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java @@ -696,15 +696,7 @@ WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2) if(numWhenMatchedUpdateClauses > 1) { throw new SemanticException(ErrorMsg.MERGE_TOO_MANY_UPDATE, ctx.getCmd()); } - assert numInsertClauses < 2; - if(numInsertClauses == 1 && numWhenMatchedUpdateClauses == 1) { - if(AcidUtils.getAcidOperationalProperties(targetTable).isSplitUpdate()) { - throw new IllegalStateException("Tables with " + - hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES + "=" + - TransactionalValidationListener.DEFAULT_TRANSACTIONAL_PROPERTY + " currently do not " + - "support MERGE with both Insert and Update clauses."); - } - } + assert numInsertClauses < 2: "too many Insert clauses"; } if(numWhenMatchedDeleteClauses + numWhenMatchedUpdateClauses == 2 && extraPredicate == null) { throw new SemanticException(ErrorMsg.MERGE_PREDIACTE_REQUIRED, ctx.getCmd()); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index 4716adc945..c107b46e5f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@ -92,6 +92,7 @@ private AcidUtils.Operation writeType = AcidUtils.Operation.NOT_ACID; private long txnId = 0; // transaction id for this operation private int statementId = -1; + private int numBuckets = -1; private transient Table table; private Path destPath; @@ -509,4 +510,13 @@ public FileSinkOperatorExplainVectorization getFileSinkVectorization() { } return new FileSinkOperatorExplainVectorization(vectorDesc); } + public void setNumBuckets() { + if(table == null) { + throw new IllegalStateException("no 'table' object"); + } + this.numBuckets = table.getNumBuckets(); + } + public int getNumBuckets() { + return numBuckets; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index f83b6db8bf..ecf2807cfd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; import org.apache.hadoop.hive.shims.ShimLoader; @@ -658,9 +659,16 @@ public void map(WritableComparable key, CompactorInputSplit split, AcidInputFormat.RawReader reader = aif.getRawReader(jobConf, isMajor, split.getBucket(), txnList, split.getBaseDir(), split.getDeltaDirs()); + int numBuckets = reader.getNumBuckets(); + int numBucketsFromMetadata = jobConf.getInt(NUM_BUCKETS, OrcRecordUpdater.BC_UNKNOWN); + if(numBuckets != OrcRecordUpdater.BC_UNKNOWN && numBuckets != numBucketsFromMetadata) { + throw new IllegalStateException("numBuckets=" + numBuckets + + " but numBucketsFromMetadata=" + numBucketsFromMetadata); + } + numBuckets = numBucketsFromMetadata; RecordIdentifier identifier = reader.createKey(); V value = reader.createValue(); - getWriter(reporter, reader.getObjectInspector(), split.getBucket()); + getWriter(reporter, reader.getObjectInspector(), split.getBucket(), numBuckets); AcidUtils.AcidOperationalProperties acidOperationalProperties = AcidUtils.getAcidOperationalProperties(jobConf); @@ -670,7 +678,7 @@ public void map(WritableComparable key, CompactorInputSplit split, // that is used to write all the delete events (in case of minor compaction only). For major // compaction, history is not required to be maintained hence the delete events are processed // but not re-written separately. - getDeleteEventWriter(reporter, reader.getObjectInspector(), split.getBucket()); + getDeleteEventWriter(reporter, reader.getObjectInspector(), split.getBucket(), numBuckets); } while (reader.next(identifier, value)) { @@ -704,7 +712,7 @@ public void close() throws IOException { } private void getWriter(Reporter reporter, ObjectInspector inspector, - int bucket) throws IOException { + int bucket, int numBuckets) throws IOException { if (writer == null) { AcidOutputFormat.Options options = new AcidOutputFormat.Options(jobConf); options.inspector(inspector) @@ -715,6 +723,7 @@ private void getWriter(Reporter reporter, ObjectInspector inspector, .minimumTransactionId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE)) .maximumTransactionId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE)) .bucket(bucket) + .numBuckets(numBuckets) .statementId(-1);//setting statementId == -1 makes compacted delta files use //delta_xxxx_yyyy format @@ -728,7 +737,7 @@ private void getWriter(Reporter reporter, ObjectInspector inspector, } private void getDeleteEventWriter(Reporter reporter, ObjectInspector inspector, - int bucket) throws IOException { + int bucket, int numBuckets) throws IOException { if (deleteEventWriter == null) { AcidOutputFormat.Options options = new AcidOutputFormat.Options(jobConf); options.inspector(inspector) @@ -740,6 +749,7 @@ private void getDeleteEventWriter(Reporter reporter, ObjectInspector inspector, .minimumTransactionId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE)) .maximumTransactionId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE)) .bucket(bucket) + .numBuckets(numBuckets) .statementId(-1);//setting statementId == -1 makes compacted delta files use //delta_xxxx_yyyy format diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java index 461ef86b83..48bf60f681 100755 --- ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java @@ -220,7 +220,9 @@ public IntWritable evaluate(RecordIdentifier i) { if (i == null) { return null; } else { - intWritable.set(i.getBucketId()); + intWritable.set(i.getBucketId());//TODO: should this now do i.getBucketId()%numBuckets ???? or should that already be in ReduceSinkOperator.... I think the latter but VERIFY + //stash numBuckets in RecordIdentifier? Should be cheap with Vectorization of structs since it has the same value for the whole table + //seems lame return intWritable; } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index 7c66955e14..2d96d247ac 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -492,53 +492,10 @@ private static void pause(int timeMillis) { * sorts rows in dictionary order */ private List stringifyValues(int[][] rowsIn) { - assert rowsIn.length > 0; - int[][] rows = rowsIn.clone(); - Arrays.sort(rows, new RowComp()); - List rs = new ArrayList(); - for(int[] row : rows) { - assert row.length > 0; - StringBuilder sb = new StringBuilder(); - for(int value : row) { - sb.append(value).append("\t"); - } - sb.setLength(sb.length() - 1); - rs.add(sb.toString()); - } - return rs; - } - private static final class RowComp implements Comparator { - @Override - public int compare(int[] row1, int[] row2) { - assert row1 != null && row2 != null && row1.length == row2.length; - for(int i = 0; i < row1.length; i++) { - int comp = Integer.compare(row1[i], row2[i]); - if(comp != 0) { - return comp; - } - } - return 0; - } + return TestTxnCommands2.stringifyValues(rowsIn); } private String makeValuesClause(int[][] rows) { - assert rows.length > 0; - StringBuilder sb = new StringBuilder("values"); - for(int[] row : rows) { - assert row.length > 0; - if(row.length > 1) { - sb.append("("); - } - for(int value : row) { - sb.append(value).append(","); - } - sb.setLength(sb.length() - 1);//remove trailing comma - if(row.length > 1) { - sb.append(")"); - } - sb.append(","); - } - sb.setLength(sb.length() - 1);//remove trailing comma - return sb.toString(); + return TestTxnCommands2.makeValuesClause(rows); } private List runStatementOnDriver(String stmt) throws Exception { @@ -558,7 +515,6 @@ private CommandProcessorResponse runStatementOnDriverNegative(String stmt) throw throw new RuntimeException("Didn't get expected failure!"); } -// @Ignore @Test public void exchangePartition() throws Exception { runStatementOnDriver("create database ex1"); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 5786c4f659..254a998e8e 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -87,19 +87,27 @@ protected Driver d; protected static enum Table { ACIDTBL("acidTbl"), - ACIDTBLPART("acidTblPart"), + ACIDTBLPART("acidTblPart", "p"), NONACIDORCTBL("nonAcidOrcTbl"), - NONACIDPART("nonAcidPart"), - NONACIDPART2("nonAcidPart2"), - ACIDNESTEDPART("acidNestedPart"); + NONACIDPART("nonAcidPart", "p"), + NONACIDPART2("nonAcidPart2", "p2"), + ACIDNESTEDPART("acidNestedPart", "p,q"); private final String name; + private final String partitionColumns; @Override public String toString() { return name; } + String getPartitionColumns() { + return partitionColumns; + } Table(String name) { + this(name, null); + } + Table(String name, String partitionColumns) { this.name = name; + this.partitionColumns = partitionColumns; } } @@ -599,7 +607,7 @@ public void testNonAcidToAcidConversion3() throws Exception { List rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); int [][] resultData = new int[][] {{1, 2}}; Assert.assertEquals(stringifyValues(resultData), rs); - rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);//todo: what is the point of this if we just did select *? int resultCount = 1; Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); @@ -615,7 +623,7 @@ public void testNonAcidToAcidConversion3() throws Exception { rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); resultData = new int[][] {{1, 2}}; Assert.assertEquals(stringifyValues(resultData), rs); - rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);//todo: what is the point of this if we just did select *? resultCount = 1; Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); @@ -686,7 +694,7 @@ public void testNonAcidToAcidConversion3() throws Exception { rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); resultData = new int[][] {{1, 3}, {3, 4}}; Assert.assertEquals(stringifyValues(resultData), rs); - rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);//todo: what is the point of this if we just did select *? resultCount = 2; Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); @@ -722,7 +730,7 @@ public void testNonAcidToAcidConversion3() throws Exception { rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); resultData = new int[][] {{1, 3}, {3, 4}}; Assert.assertEquals(stringifyValues(resultData), rs); - rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);//todo: what is the point of this if we just did select *? resultCount = 2; Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); @@ -1438,12 +1446,13 @@ public void testMergeWithPredicate() throws Exception { String query = "merge into " + Table.ACIDTBL + " t using " + Table.NONACIDPART2 + " s ON t.a = s.a2 " + "WHEN MATCHED AND t.b between 1 and 3 THEN UPDATE set b = s.b2 " + - "WHEN NOT MATCHED and s.b2 >= 11 THEN INSERT VALUES(s.a2, s.b2)"; + "WHEN NOT MATCHED and s.b2 >= 8 THEN INSERT VALUES(s.a2, s.b2)"; runStatementOnDriver(query); r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); - int[][] rExpected = {{2,2},{4,3},{5,6},{7,8},{11,11}}; + int[][] rExpected = {{2,2},{4,3},{5,6},{7,8},{8,8},{11,11}}; Assert.assertEquals(stringifyValues(rExpected), r); + assertUniqueID(Table.ACIDTBL); } /** @@ -1471,6 +1480,7 @@ public void testMerge2() throws Exception { r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); int[][] rExpected = {{2,2},{4,44},{5,5},{7,8},{11,11}}; Assert.assertEquals(stringifyValues(rExpected), r); + assertUniqueID(Table.ACIDTBL); } /** @@ -1497,27 +1507,21 @@ public void testMerge3() throws Exception { int[][] rExpected = {{7,8},{11,11}}; Assert.assertEquals(stringifyValues(rExpected), r); } - /** - * https://hortonworks.jira.com/browse/BUG-66580 - * @throws Exception - */ - @Ignore @Test public void testMultiInsert() throws Exception { - runStatementOnDriver("create table if not exists srcpart (a int, b int, c int) " + - "partitioned by (z int) clustered by (a) into 2 buckets " + - "stored as orc tblproperties('transactional'='true')"); runStatementOnDriver("create temporary table if not exists data1 (x int)"); -// runStatementOnDriver("create temporary table if not exists data2 (x int)"); - runStatementOnDriver("insert into data1 values (1),(2),(3)"); -// runStatementOnDriver("insert into data2 values (4),(5),(6)"); d.destroy(); hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); d = new Driver(hiveConf); - List r = runStatementOnDriver(" from data1 " + - "insert into srcpart partition(z) select 0,0,1,x " + - "insert into srcpart partition(z=1) select 0,0,1"); + + runStatementOnDriver(" from data1 " + + "insert into " + Table.ACIDTBLPART + " partition(p) select 0,x, 'p' || x " + + + "insert into " + Table.ACIDTBLPART + " partition(p='p1') select 0,x"); + List r = runStatementOnDriver("select p,a,b from " + Table.ACIDTBLPART + " order by p, a, b"); + Assert.assertEquals("[p1\t0\t1, p1\t0\t1, p1\t0\t2, p1\t0\t3, p2\t0\t2, p3\t0\t3]", r.toString()); + assertUniqueID(Table.ACIDTBLPART); } /** * Investigating DP and WriteEntity, etc @@ -1583,6 +1587,7 @@ public void testDynamicPartitionsMerge() throws Exception { r1 = runStatementOnDriver("select p,a,b from " + Table.ACIDTBLPART + " order by p, a, b"); String result= r1.toString(); Assert.assertEquals("[new part\t5\t5, new part\t11\t11, p1\t1\t1, p1\t2\t15, p1\t3\t3, p2\t4\t44]", result); + assertUniqueID(Table.ACIDTBLPART); } /** * Using nested partitions and thus DummyPartition @@ -1605,6 +1610,7 @@ public void testDynamicPartitionsMerge2() throws Exception { "when not matched then insert values(s.a, s.b, 3,4)"); r1 = runStatementOnDriver("select p,q,a,b from " + Table.ACIDNESTEDPART + " order by p,q, a, b"); Assert.assertEquals(stringifyValues(new int[][] {{1,1,1,1},{1,1,3,3},{1,2,2,15},{1,2,4,44},{3,4,5,5},{3,4,11,11}}), r1); + assertUniqueID(Table.ACIDNESTEDPART); } @Ignore("Covered elsewhere") @Test @@ -1661,7 +1667,7 @@ public void testValuesSource() throws Exception { } return rs; } - private static final class RowComp implements Comparator { + static class RowComp implements Comparator { @Override public int compare(int[] row1, int[] row2) { assert row1 != null && row2 != null && row1.length == row2.length; @@ -1674,7 +1680,7 @@ public int compare(int[] row1, int[] row2) { return 0; } } - String makeValuesClause(int[][] rows) { + static String makeValuesClause(int[][] rows) { assert rows.length > 0; StringBuilder sb = new StringBuilder("values"); for(int[] row : rows) { @@ -1705,4 +1711,19 @@ String makeValuesClause(int[][] rows) { d.getResults(rs); return rs; } + final void assertUniqueID(Table table) throws Exception { + String partCols = table.getPartitionColumns(); + //check to make sure there are no duplicate ROW__IDs - HIVE-16832 + StringBuilder sb = new StringBuilder("select "); + if(partCols != null && partCols.length() > 0) { + sb.append(partCols).append(","); + } + sb.append(" ROW__ID, count(*) from ").append(table).append(" group by "); + if(partCols != null && partCols.length() > 0) { + sb.append(partCols).append(","); + } + sb.append("ROW__ID having count(*) > 1"); + List r = runStatementOnDriver(sb.toString()); + Assert.assertTrue(r.toString(),r.size() == 0); + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java index ea5ecbc842..520e958af3 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java @@ -542,45 +542,4 @@ public void testNonAcidToAcidConversion3() throws Exception { resultCount = 2; Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); } - @Ignore("HIVE-14947") - @Test - @Override - public void testDynamicPartitionsMerge() throws Exception {} - @Ignore("HIVE-14947") - @Test - @Override - public void testDynamicPartitionsMerge2() throws Exception {} - @Ignore("HIVE-14947") - @Test - @Override - public void testMerge() throws Exception {} - - /** - * todo: remove this test once HIVE-14947 is done (parent class has a better version) - */ - @Test - @Override - public void testMerge2() throws Exception { - int[][] baseValsOdd = {{5,5},{11,11}}; - int[][] baseValsEven = {{2,2},{4,44}}; - runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='odd') " + makeValuesClause(baseValsOdd)); - runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='even') " + makeValuesClause(baseValsEven)); - int[][] vals = {{2,1},{4,3},{5,6},{7,8}}; - runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals)); - List r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); - Assert.assertEquals(stringifyValues(vals), r); - String query = "merge into " + Table.ACIDTBL + - " using " + Table.NONACIDPART2 + " source ON " + Table.ACIDTBL + ".a = source.a2 " + - "WHEN MATCHED THEN UPDATE set b = source.b2 "; - r = runStatementOnDriver(query); - - r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); - int[][] rExpected = {{2,2},{4,44},{5,5},{7,8}}; - Assert.assertEquals(stringifyValues(rExpected), r); - - } - @Ignore("HIVE-14947") - @Test - @Override - public void testMergeWithPredicate() throws Exception {} } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java index 44a94127ec..34df95d29c 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java @@ -19,9 +19,12 @@ package org.apache.hadoop.hive.ql; import org.apache.hadoop.hive.conf.HiveConf; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.util.List; + /** * Same as TestTxnCommands2WithSplitUpdate but tests ACID tables with vectorization turned on by * default, and having 'transactional_properties' set to 'default'. This specifically tests the @@ -48,5 +51,28 @@ public void testFailureOnAlteringTransactionalProperties() throws Exception { // To not override this test, that temporary table needs to be renamed. However, as // mentioned this does not serve any purpose, as this test does not relate to vectorization. } + @Test + public void testMultiInsertVectorized() throws Exception { + runStatementOnDriver("create temporary table if not exists data1 (x int)"); + runStatementOnDriver("insert into data1 values (1)"); + d.destroy(); + hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); + d = new Driver(hiveConf); + + runStatementOnDriver(" from data1 " + + "insert into " + Table.ACIDTBLPART + " partition(p) select 0, 0, 'p' || x " + + + "insert into " + Table.ACIDTBLPART + " partition(p='p1') select 0, 1"); + /*w/o the fix we'd have 2 rows with the same ID in the same bucket of the same partition + * acidtblpart/p=p1/delta_0000015_0000015_0000/bucket_00000 [length: 578] {"operation":0,"originalTransaction":15,"bucket":0,"rowId":0,"currentTransaction":15,"row":{"_col0":0,"_col1":1}} + acidtblpart/p=p1/delta_0000015_0000015_0001/bucket_00000 [length: 568] {"operation":0,"originalTransaction":15,"bucket":0,"rowId":0,"currentTransaction":15,"row":{"_col0":0,"_col1":0}}*/ + List r = runStatementOnDriver("select p,a,b from " + Table.ACIDTBLPART + " order by p, a, b"); + Assert.assertEquals("[p1\t0\t0, p1\t0\t1]", r.toString()); + assertUniqueID(Table.ACIDTBLPART); + runStatementOnDriver("delete from " + Table.ACIDTBLPART); + r = runStatementOnDriver("select p,a,b from " + Table.ACIDTBLPART + " order by p, a, b"); + Assert.assertEquals("[]", r.toString()); + + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java index a8d7c9c461..d2670e85d3 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java @@ -245,14 +245,14 @@ private void setupData(DataFormat format) { (i < 5) ? new Text("Monday") : new Text("Tuesday")); break; case WITH_RECORD_ID: - r = new RowWithRecID(new RecordIdentifier(1, 1, i), + r = new RowWithRecID(new RecordIdentifier(1, 1, 1, i), (i < 5) ? new Text("Monday") : new Text("Tuesday")); break; case WITH_RECORD_ID_AND_PARTITION_VALUE: r = new RowWithPartNRecID( new Text("its fleect was white as snow"), (i < 5) ? new Text("Monday") : new Text("Tuesday"), - new RecordIdentifier(1, 1, i)); + new RecordIdentifier(1, 1, 1, i)); break; default: throw new RuntimeException("Unknown data format"); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/TestRecordIdentifier.java ql/src/test/org/apache/hadoop/hive/ql/io/TestRecordIdentifier.java index a2bd8da3bd..3f1029ea51 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/TestRecordIdentifier.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/TestRecordIdentifier.java @@ -29,21 +29,21 @@ public class TestRecordIdentifier { @Test public void TestOrdering() throws Exception { - RecordIdentifier left = new RecordIdentifier(100, 200, 1200); + RecordIdentifier left = new RecordIdentifier(100, 200, 200, 1200); RecordIdentifier right = new RecordIdentifier(); - right.setValues(100L, 200, 1000L); + right.setValues(100L, 200, 200, 1000L); assertTrue(right.compareTo(left) < 0); assertTrue(left.compareTo(right) > 0); left.set(right); assertTrue(right.compareTo(left) == 0); right.setRowId(2000); assertTrue(right.compareTo(left) > 0); - left.setValues(1, 2, 3); - right.setValues(100, 2, 3); + left.setValues(1, 2, 2, 3); + right.setValues(100, 2, 2, 3); assertTrue(left.compareTo(right) < 0); assertTrue(right.compareTo(left) > 0); - left.setValues(1, 2, 3); - right.setValues(1, 100, 3); + left.setValues(1, 2, 2, 3); + right.setValues(1, 100, 100, 3); assertTrue(left.compareTo(right) < 0); assertTrue(right.compareTo(left) > 0); } @@ -56,10 +56,10 @@ public void testHashEquals() throws Exception { long currTxn = origTxn + ThreadLocalRandom.current().nextLong(0, 10000000000L); int stmtId = ThreadLocalRandom.current().nextInt(1, 512); - RecordIdentifier left = new RecordIdentifier(origTxn, bucketId, rowId); - RecordIdentifier right = new RecordIdentifier(origTxn, bucketId, rowId); - OrcRawRecordMerger.ReaderKey rkLeft = new OrcRawRecordMerger.ReaderKey(origTxn, bucketId, rowId, currTxn, stmtId); - OrcRawRecordMerger.ReaderKey rkRight = new OrcRawRecordMerger.ReaderKey(origTxn, bucketId, rowId, currTxn, stmtId); + RecordIdentifier left = new RecordIdentifier(origTxn, bucketId, bucketId, rowId); + RecordIdentifier right = new RecordIdentifier(origTxn, bucketId, bucketId, rowId); + OrcRawRecordMerger.ReaderKey rkLeft = new OrcRawRecordMerger.ReaderKey(origTxn, bucketId, bucketId, rowId, currTxn, stmtId); + OrcRawRecordMerger.ReaderKey rkRight = new OrcRawRecordMerger.ReaderKey(origTxn, bucketId, bucketId, rowId, currTxn, stmtId); assertEquals("RecordIdentifier.equals", left, right); assertEquals("RecordIdentifier.hashCode", left.hashCode(), right.hashCode()); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index bb7985711f..566e4c15d8 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -2240,7 +2240,8 @@ public void testVectorizationWithAcid() throws Exception { Path partDir = new Path(conf.get("mapred.input.dir")); OrcRecordUpdater writer = new OrcRecordUpdater(partDir, new AcidOutputFormat.Options(conf).maximumTransactionId(10) - .writingBase(true).bucket(0).inspector(inspector).finalDestination(partDir)); + .writingBase(true).bucket(0).inspector(inspector).finalDestination(partDir) + .numBuckets(3)); for (int i = 0; i < 100; ++i) { BigRow row = new BigRow(i); writer.insert(10, row); @@ -2397,7 +2398,8 @@ public void testCombinationInputFormatWithAcid() throws Exception { // write a base file in partition 0 OrcRecordUpdater writer = new OrcRecordUpdater(partDir[0], new AcidOutputFormat.Options(conf).maximumTransactionId(10) - .writingBase(true).bucket(0).inspector(inspector).finalDestination(partDir[0])); + .writingBase(true).bucket(0).inspector(inspector).finalDestination(partDir[0]) + .numBuckets(BUCKETS)); for(int i=0; i < 10; ++i) { writer.insert(10, new MyRow(i, 2 * i)); } @@ -2410,7 +2412,8 @@ public void testCombinationInputFormatWithAcid() throws Exception { // write a delta file in partition 0 writer = new OrcRecordUpdater(partDir[0], new AcidOutputFormat.Options(conf).maximumTransactionId(10) - .writingBase(true).bucket(1).inspector(inspector).finalDestination(partDir[0])); + .writingBase(true).bucket(1).inspector(inspector).finalDestination(partDir[0]) + .numBuckets(BUCKETS)); for(int i=10; i < 20; ++i) { writer.insert(10, new MyRow(i, 2*i)); } @@ -2445,14 +2448,14 @@ public void testCombinationInputFormatWithAcid() throws Exception { assertEquals("mock:/combinationAcid/p=0/base_0000010/bucket_00000", split.getPath().toString()); assertEquals(0, split.getStart()); - assertEquals(607, split.getLength()); + assertEquals(624, split.getLength()); split = (HiveInputFormat.HiveInputSplit) splits[1]; assertEquals("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat", split.inputFormatClassName()); assertEquals("mock:/combinationAcid/p=0/base_0000010/bucket_00001", split.getPath().toString()); assertEquals(0, split.getStart()); - assertEquals(629, split.getLength()); + assertEquals(646, split.getLength()); CombineHiveInputFormat.CombineHiveInputSplit combineSplit = (CombineHiveInputFormat.CombineHiveInputSplit) splits[2]; assertEquals(BUCKETS, combineSplit.getNumPaths()); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java index 1ce1bfb1dd..f13650fb5c 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java @@ -76,9 +76,9 @@ //todo: why is statementId -1? @Test public void testOrdering() throws Exception { - ReaderKey left = new ReaderKey(100, 200, 1200, 300); + ReaderKey left = new ReaderKey(100, 200, 200, 1200, 300); ReaderKey right = new ReaderKey(); - right.setValues(100, 200, 1000, 200,1); + right.setValues(100, 200, 200, 1000, 200,1); assertTrue(right.compareTo(left) < 0); assertTrue(left.compareTo(right) > 0); assertEquals(false, left.equals(right)); @@ -87,21 +87,21 @@ public void testOrdering() throws Exception { assertEquals(true, right.equals(left)); right.setRowId(2000); assertTrue(right.compareTo(left) > 0); - left.setValues(1, 2, 3, 4,-1); - right.setValues(100, 2, 3, 4,-1); + left.setValues(1, 2, 2, 3, 4,-1); + right.setValues(100, 2, 2, 3, 4,-1); assertTrue(left.compareTo(right) < 0); assertTrue(right.compareTo(left) > 0); - left.setValues(1, 2, 3, 4,-1); - right.setValues(1, 100, 3, 4,-1); + left.setValues(1, 2, 2, 3, 4,-1); + right.setValues(1, 100, 100, 3, 4,-1); assertTrue(left.compareTo(right) < 0); assertTrue(right.compareTo(left) > 0); - left.setValues(1, 2, 3, 100,-1); - right.setValues(1, 2, 3, 4,-1); + left.setValues(1, 2, 2, 3, 100,-1); + right.setValues(1, 2, 2, 3, 4,-1); assertTrue(left.compareTo(right) < 0); assertTrue(right.compareTo(left) > 0); // ensure that we are consistent when comparing to the base class - RecordIdentifier ri = new RecordIdentifier(1, 2, 3); + RecordIdentifier ri = new RecordIdentifier(1, 2, 2, 3); assertEquals(1, ri.compareTo(left)); assertEquals(-1, left.compareTo(ri)); assertEquals(false, ri.equals(left)); @@ -185,8 +185,8 @@ private Reader createMockReader() throws IOException { public void testReaderPair() throws Exception { ReaderKey key = new ReaderKey(); Reader reader = createMockReader(); - RecordIdentifier minKey = new RecordIdentifier(10, 20, 30); - RecordIdentifier maxKey = new RecordIdentifier(40, 50, 60); + RecordIdentifier minKey = new RecordIdentifier(10, 20, 20, 30); + RecordIdentifier maxKey = new RecordIdentifier(40, 50, 50, 60); ReaderPair pair = new ReaderPair(key, reader, 20, minKey, maxKey, new Reader.Options(), 0); RecordReader recordReader = pair.recordReader; @@ -287,8 +287,8 @@ private Reader createMockOriginalReader() throws IOException { public void testOriginalReaderPair() throws Exception { ReaderKey key = new ReaderKey(); Reader reader = createMockOriginalReader(); - RecordIdentifier minKey = new RecordIdentifier(0, 10, 1); - RecordIdentifier maxKey = new RecordIdentifier(0, 10, 3); + RecordIdentifier minKey = new RecordIdentifier(0, 10, 10, 1); + RecordIdentifier maxKey = new RecordIdentifier(0, 10, 10, 3); boolean[] includes = new boolean[]{true, true}; ReaderPair pair = new OriginalReaderPair(key, reader, 10, minKey, maxKey, new Reader.Options().include(includes)); @@ -427,8 +427,8 @@ public void testNewBase() throws Exception { RecordReader rr = merger.getCurrentReader().recordReader; assertEquals(0, merger.getOtherReaders().size()); - assertEquals(new RecordIdentifier(10, 20, 30), merger.getMinKey()); - assertEquals(new RecordIdentifier(40, 50, 60), merger.getMaxKey()); + assertEquals(new RecordIdentifier(10, 20, 20, 30), merger.getMinKey()); + assertEquals(new RecordIdentifier(40, 50, 50, 60), merger.getMaxKey()); RecordIdentifier id = merger.createKey(); OrcStruct event = merger.createValue(); @@ -482,7 +482,7 @@ public void testNewBase() throws Exception { MyRow(String val, long rowId, long origTxn, int bucket) { col1 = new Text(val); - ROW__ID = new RecordIdentifier(origTxn, bucket, rowId); + ROW__ID = new RecordIdentifier(origTxn, bucket, bucket, rowId); } static String getColumnNamesProperty() { @@ -515,7 +515,7 @@ public void testEmpty() throws Exception { // write the empty base AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) .inspector(inspector).bucket(BUCKET).writingBase(true) - .maximumTransactionId(100).finalDestination(root); + .maximumTransactionId(100).finalDestination(root).numBuckets(3); of.getRecordUpdater(root, options).close(false); ValidTxnList txnList = new ValidReadTxnList("200:" + Long.MAX_VALUE); @@ -549,6 +549,7 @@ public void testNewBaseAndDelta() throws Exception { } private void testNewBaseAndDelta(boolean use130Format) throws Exception { final int BUCKET = 10; + final int BUCKET_COUNT = 16; String[] values = new String[]{"first", "second", "third", "fourth", "fifth", "sixth", "seventh", "eighth", "ninth", "tenth"}; @@ -565,7 +566,7 @@ private void testNewBaseAndDelta(boolean use130Format) throws Exception { // write the base AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) - .inspector(inspector).bucket(BUCKET).finalDestination(root); + .inspector(inspector).bucket(BUCKET).finalDestination(root).numBuckets(BUCKET_COUNT); if(!use130Format) { options.statementId(-1); } @@ -615,64 +616,64 @@ private void testNewBaseAndDelta(boolean use130Format) throws Exception { assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.UPDATE_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 0, 200), id); + assertEquals(new ReaderKey(0, BUCKET, BUCKET, 0, 200), id); assertEquals("update 1", getValue(event)); assertFalse(merger.isDelete(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 1, 0), id); + assertEquals(new ReaderKey(0, BUCKET, BUCKET, 1, 0), id); assertEquals("second", getValue(event)); assertFalse(merger.isDelete(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.UPDATE_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 2, 200), id); + assertEquals(new ReaderKey(0, BUCKET, BUCKET, 2, 200), id); assertEquals("update 2", getValue(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.UPDATE_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 3, 200), id); + assertEquals(new ReaderKey(0, BUCKET, BUCKET, 3, 200), id); assertEquals("update 3", getValue(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 4, 0), id); + assertEquals(new ReaderKey(0, BUCKET, BUCKET, 4, 0), id); assertEquals("fifth", getValue(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 5, 0), id); + assertEquals(new ReaderKey(0, BUCKET, BUCKET, 5, 0), id); assertEquals("sixth", getValue(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 6, 0), id); + assertEquals(new ReaderKey(0, BUCKET, BUCKET, 6, 0), id); assertEquals("seventh", getValue(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.DELETE_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 7, 200), id); + assertEquals(new ReaderKey(0, BUCKET, BUCKET, 7, 200), id); assertNull(OrcRecordUpdater.getRow(event)); assertTrue(merger.isDelete(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.DELETE_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 8, 200), id); + assertEquals(new ReaderKey(0, BUCKET, BUCKET, 8, 200), id); assertNull(OrcRecordUpdater.getRow(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 9, 0), id); + assertEquals(new ReaderKey(0, BUCKET, BUCKET, 9, 0), id); assertEquals("tenth", getValue(event)); assertEquals(false, merger.next(id, event)); @@ -686,90 +687,90 @@ private void testNewBaseAndDelta(boolean use130Format) throws Exception { assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.UPDATE_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 0, 200), id); + assertEquals(new ReaderKey(0, BUCKET, BUCKET, 0, 200), id); assertEquals("update 1", getValue(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 0, 0), id); + assertEquals(new ReaderKey(0, BUCKET, BUCKET, 0, 0), id); assertEquals("first", getValue(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 1, 0), id); + assertEquals(new ReaderKey(0, BUCKET, BUCKET, 1, 0), id); assertEquals("second", getValue(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.UPDATE_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 2, 200), id); + assertEquals(new ReaderKey(0, BUCKET, BUCKET, 2, 200), id); assertEquals("update 2", getValue(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 2, 0), id); + assertEquals(new ReaderKey(0, BUCKET, BUCKET, 2, 0), id); assertEquals("third", getValue(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.UPDATE_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 3, 200), id); + assertEquals(new ReaderKey(0, BUCKET, BUCKET, 3, 200), id); assertEquals("update 3", getValue(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 3, 0), id); + assertEquals(new ReaderKey(0, BUCKET, BUCKET, 3, 0), id); assertEquals("fourth", getValue(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 4, 0), id); + assertEquals(new ReaderKey(0, BUCKET, BUCKET, 4, 0), id); assertEquals("fifth", getValue(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 5, 0), id); + assertEquals(new ReaderKey(0, BUCKET, BUCKET, 5, 0), id); assertEquals("sixth", getValue(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 6, 0), id); + assertEquals(new ReaderKey(0, BUCKET, BUCKET, 6, 0), id); assertEquals("seventh", getValue(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.DELETE_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 7, 200), id); + assertEquals(new ReaderKey(0, BUCKET, BUCKET, 7, 200), id); assertNull(OrcRecordUpdater.getRow(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 7, 0), id); + assertEquals(new ReaderKey(0, BUCKET, BUCKET, 7, 0), id); assertEquals("eighth", getValue(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.DELETE_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 8, 200), id); + assertEquals(new ReaderKey(0, BUCKET, BUCKET, 8, 200), id); assertNull(OrcRecordUpdater.getRow(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 8, 0), id); + assertEquals(new ReaderKey(0, BUCKET, BUCKET, 8, 0), id); assertEquals("ninth", getValue(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, 9, 0), id); + assertEquals(new ReaderKey(0, BUCKET, BUCKET, 9, 0), id); assertEquals("tenth", getValue(event)); assertEquals(false, merger.next(id, event)); @@ -786,7 +787,7 @@ private void testNewBaseAndDelta(boolean use130Format) throws Exception { LOG.info("id = " + id + "event = " + event); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); - assertEquals(new ReaderKey(0, BUCKET, i, 0), id); + assertEquals(new ReaderKey(0, BUCKET, BUCKET, i, 0), id); assertEquals(values[i], getValue(event)); } @@ -818,11 +819,11 @@ private void testNewBaseAndDelta(boolean use130Format) throws Exception { this.mytext = new Text(mytext); this.myfloat = myfloat; this.mydouble = mydouble; - ROW__ID = new RecordIdentifier(origTxn, bucket, rowId); + ROW__ID = new RecordIdentifier(origTxn, bucket, bucket, rowId); } BigRow(long rowId, long origTxn, int bucket) { - ROW__ID = new RecordIdentifier(origTxn, bucket, rowId); + ROW__ID = new RecordIdentifier(origTxn, bucket, bucket, rowId); } static String getColumnNamesProperty() { @@ -841,6 +842,7 @@ static String getColumnTypesProperty() { @Test public void testRecordReaderOldBaseAndDelta() throws Exception { final int BUCKET = 10; + final int BUCKET_COUNT = 16; Configuration conf = new Configuration(); OrcOutputFormat of = new OrcOutputFormat(); FileSystem fs = FileSystem.getLocal(conf); @@ -881,7 +883,7 @@ public synchronized void addedRow(int rows) throws IOException { AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) .writingBase(false).minimumTransactionId(1).maximumTransactionId(1) .bucket(BUCKET).inspector(inspector).filesystem(fs).recordIdColumn(5) - .finalDestination(root); + .finalDestination(root).numBuckets(BUCKET_COUNT); RecordUpdater ru = of.getRecordUpdater(root, options); values = new String[]{"0.0", null, null, "1.1", null, null, null, "ignore.7"}; @@ -944,6 +946,7 @@ public synchronized void addedRow(int rows) throws IOException { @Test public void testRecordReaderNewBaseAndDelta() throws Exception { final int BUCKET = 11; + final int BUCKET_COUNT = 16; Configuration conf = new Configuration(); OrcOutputFormat of = new OrcOutputFormat(); FileSystem fs = FileSystem.getLocal(conf); @@ -973,7 +976,7 @@ public synchronized void addedRow(int rows) throws IOException { OrcRecordUpdater.OrcOptions options = (OrcRecordUpdater.OrcOptions) new OrcRecordUpdater.OrcOptions(conf) .writingBase(true).minimumTransactionId(0).maximumTransactionId(0) - .bucket(BUCKET).inspector(inspector).filesystem(fs); + .bucket(BUCKET).inspector(inspector).filesystem(fs).numBuckets(BUCKET_COUNT); options.orcOptions(OrcFile.writerOptions(conf) .stripeSize(1).blockPadding(false).compress(CompressionKind.NONE) .memory(mgr).batchSize(2)); @@ -1067,7 +1070,7 @@ public void testRecordReaderDelta() throws Exception { new AcidOutputFormat.Options(conf) .bucket(BUCKET).inspector(inspector).filesystem(fs) .writingBase(false).minimumTransactionId(1).maximumTransactionId(1) - .finalDestination(root); + .finalDestination(root).numBuckets(3); RecordUpdater ru = of.getRecordUpdater(root, options); String[] values = new String[]{"a", "b", "c", "d", "e"}; for(int i=0; i < values.length; ++i) { @@ -1137,7 +1140,7 @@ private void testRecordReaderIncompleteDelta(boolean use130Format) throws Except AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) .writingBase(true).minimumTransactionId(0).maximumTransactionId(0) - .bucket(BUCKET).inspector(inspector).filesystem(fs).finalDestination(root); + .bucket(BUCKET).inspector(inspector).filesystem(fs).finalDestination(root).numBuckets(3); if(!use130Format) { options.statementId(-1); } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java index 67c473e188..fd0b5db219 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java @@ -62,7 +62,7 @@ public void testAccessors() throws Exception { OrcRecordUpdater.getOperation(event)); assertEquals(50, OrcRecordUpdater.getOriginalTransaction(event)); assertEquals(100, OrcRecordUpdater.getCurrentTransaction(event)); - assertEquals(200, OrcRecordUpdater.getBucket(event)); + assertEquals(200, OrcRecordUpdater.getWriterId(event)); assertEquals(300, OrcRecordUpdater.getRowId(event)); } @@ -80,7 +80,7 @@ public void testAccessors() throws Exception { MyRow(String val, long rowId, long origTxn, int bucket) { field = new Text(val); - ROW__ID = new RecordIdentifier(origTxn, bucket, rowId); + ROW__ID = new RecordIdentifier(origTxn, bucket, bucket, rowId); } } @@ -104,7 +104,8 @@ public void testWriter() throws Exception { .maximumTransactionId(19) .inspector(inspector) .reporter(Reporter.NULL) - .finalDestination(root); + .finalDestination(root) + .numBuckets(16); RecordUpdater updater = new OrcRecordUpdater(root, options); updater.insert(11, new MyRow("first")); updater.insert(11, new MyRow("second")); @@ -143,27 +144,27 @@ public void testWriter() throws Exception { OrcRecordUpdater.getOperation(row)); assertEquals(11, OrcRecordUpdater.getCurrentTransaction(row)); assertEquals(11, OrcRecordUpdater.getOriginalTransaction(row)); - assertEquals(10, OrcRecordUpdater.getBucket(row)); + assertEquals(10, OrcRecordUpdater.getWriterId(row)); assertEquals(0, OrcRecordUpdater.getRowId(row)); assertEquals("first", OrcRecordUpdater.getRow(row).getFieldValue(0).toString()); assertEquals(true, rows.hasNext()); row = (OrcStruct) rows.next(null); assertEquals(1, OrcRecordUpdater.getRowId(row)); - assertEquals(10, OrcRecordUpdater.getBucket(row)); + assertEquals(10, OrcRecordUpdater.getWriterId(row)); assertEquals("second", OrcRecordUpdater.getRow(row).getFieldValue(0).toString()); assertEquals(true, rows.hasNext()); row = (OrcStruct) rows.next(null); assertEquals(2, OrcRecordUpdater.getRowId(row)); - assertEquals(10, OrcRecordUpdater.getBucket(row)); + assertEquals(10, OrcRecordUpdater.getWriterId(row)); assertEquals("third", OrcRecordUpdater.getRow(row).getFieldValue(0).toString()); assertEquals(true, rows.hasNext()); row = (OrcStruct) rows.next(null); assertEquals(12, OrcRecordUpdater.getCurrentTransaction(row)); assertEquals(12, OrcRecordUpdater.getOriginalTransaction(row)); - assertEquals(10, OrcRecordUpdater.getBucket(row)); + assertEquals(10, OrcRecordUpdater.getWriterId(row)); assertEquals(0, OrcRecordUpdater.getRowId(row)); assertEquals("fourth", OrcRecordUpdater.getRow(row).getFieldValue(0).toString()); @@ -209,7 +210,8 @@ public void testWriterTblProperties() throws Exception { .inspector(inspector) .reporter(Reporter.NULL) .finalDestination(root) - .tableProperties(tblProps); + .tableProperties(tblProps) + .numBuckets(16); RecordUpdater updater = new OrcRecordUpdater(root, options); updater.insert(11, new MyRow("first")); updater.insert(11, new MyRow("second")); @@ -251,7 +253,8 @@ public void testUpdates() throws Exception { .inspector(inspector) .reporter(Reporter.NULL) .recordIdColumn(1) - .finalDestination(root); + .finalDestination(root) + .numBuckets(32); RecordUpdater updater = new OrcRecordUpdater(root, options); updater.update(100, new MyRow("update", 30, 10, bucket)); updater.delete(100, new MyRow("", 60, 40, bucket)); @@ -272,7 +275,7 @@ public void testUpdates() throws Exception { OrcRecordUpdater.getOperation(row)); assertEquals(100, OrcRecordUpdater.getCurrentTransaction(row)); assertEquals(10, OrcRecordUpdater.getOriginalTransaction(row)); - assertEquals(20, OrcRecordUpdater.getBucket(row)); + assertEquals(20, OrcRecordUpdater.getWriterId(row)); assertEquals(30, OrcRecordUpdater.getRowId(row)); assertEquals("update", OrcRecordUpdater.getRow(row).getFieldValue(0).toString()); @@ -280,7 +283,7 @@ public void testUpdates() throws Exception { row = (OrcStruct) rows.next(null); assertEquals(100, OrcRecordUpdater.getCurrentTransaction(row)); assertEquals(40, OrcRecordUpdater.getOriginalTransaction(row)); - assertEquals(20, OrcRecordUpdater.getBucket(row)); + assertEquals(20, OrcRecordUpdater.getWriterId(row)); assertEquals(60, OrcRecordUpdater.getRowId(row)); assertNull(OrcRecordUpdater.getRow(row)); assertEquals(false, rows.hasNext()); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java index 6bf13129b8..0ff4fb474b 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java @@ -72,7 +72,7 @@ DummyRow(long val, long rowId, long origTxn, int bucket) { field = new LongWritable(val); - ROW__ID = new RecordIdentifier(origTxn, bucket, rowId); + ROW__ID = new RecordIdentifier(origTxn, bucket, bucket, rowId); } static String getColumnNamesProperty() { @@ -118,7 +118,8 @@ public void setup() throws Exception { .inspector(inspector) .reporter(Reporter.NULL) .recordIdColumn(1) - .finalDestination(root); + .finalDestination(root) + .numBuckets(2); RecordUpdater updater = new OrcRecordUpdater(root, options); // Create a single insert delta with 150,000 rows, with 15000 rowIds per original transaction id. for (long i = 1; i <= NUM_OTID; ++i) { diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java index bbed591144..2930f2f7f5 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.RecordUpdater; +import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.Text; @@ -330,7 +331,7 @@ private void addFile(Table t, Partition p, long minTxn, long maxTxn, out.writeInt(numRecords); } else { for (int i = 0; i < numRecords; i++) { - RecordIdentifier ri = new RecordIdentifier(maxTxn - 1, bucket, i); + RecordIdentifier ri = new RecordIdentifier(maxTxn - 1, bucket, bucket, i); ri.write(out); out.writeBytes("mary had a little lamb its fleece was white as snow\n"); } @@ -419,6 +420,11 @@ public boolean isDelete(Text value) { } @Override + public int getNumBuckets() { + return OrcRecordUpdater.BC_UNKNOWN; + } + + @Override public boolean next(RecordIdentifier identifier, Text text) throws IOException { if (is == null) { // Open the next file