diff --git streaming/src/java/org/apache/hive/streaming/AbstractStreamingTransaction.java streaming/src/java/org/apache/hive/streaming/AbstractStreamingTransaction.java index a99fdba2c1..6ab3ffeb8c 100644 --- streaming/src/java/org/apache/hive/streaming/AbstractStreamingTransaction.java +++ streaming/src/java/org/apache/hive/streaming/AbstractStreamingTransaction.java @@ -22,6 +22,7 @@ import java.io.InputStream; import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -151,6 +152,9 @@ public long getCurrentTxnId() { } public void commit() throws StreamingException { - commitWithPartitions(null); + commit(null); + } + public void commit(Set partitions) throws StreamingException { + commit(partitions, null, null); } } diff --git streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java index f79b844826..365592be23 100644 --- streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java +++ streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java @@ -146,6 +146,7 @@ public String toString() { private boolean manageTransactions; private int countTransactions = 0; private Set partitions; + private Long tableId; private HiveStreamingConnection(Builder builder) throws StreamingException { this.database = builder.database.toLowerCase(); @@ -574,12 +575,18 @@ public void beginTransaction() throws StreamingException { @Override public void commitTransaction() throws StreamingException { - commitTransactionWithPartition(null); + commitTransaction(null); } @Override - public void commitTransactionWithPartition(Set partitions) + public void commitTransaction(Set partitions) throws StreamingException { + commitTransaction(partitions, null, null); + } + + @Override + public void commitTransaction(Set partitions, String key, + String value) throws StreamingException { checkState(); Set createdPartitions = new HashSet<>(); @@ -598,7 +605,7 @@ public void commitTransactionWithPartition(Set partitions) connectionStats.incrementTotalPartitions(partitions.size()); } - currentTransactionBatch.commitWithPartitions(createdPartitions); + currentTransactionBatch.commit(createdPartitions, key, value); this.partitions.addAll( currentTransactionBatch.getPartitions()); connectionStats.incrementCreatedPartitions(createdPartitions.size()); @@ -688,6 +695,18 @@ Long getCurrentTxnId() { return currentTransactionBatch.getCurrentTxnId(); } + long getTableId() throws TException { + if (tableId == null) { + synchronized (HiveStreamingConnection.class) { + if (tableId == null) { + tableId = getMSC().getTable(tableObject.getDbName(), + tableObject.getTableName()).getId(); + } + } + } + return tableId; + } + private HiveConf createHiveConf(Class clazz, String metaStoreUri) { HiveConf conf = new HiveConf(clazz); if (metaStoreUri != null) { diff --git streaming/src/java/org/apache/hive/streaming/StreamingConnection.java streaming/src/java/org/apache/hive/streaming/StreamingConnection.java index 92016e5f68..ba4c6a5aac 100644 --- streaming/src/java/org/apache/hive/streaming/StreamingConnection.java +++ streaming/src/java/org/apache/hive/streaming/StreamingConnection.java @@ -66,13 +66,26 @@ void commitTransaction() throws StreamingException; /** - * Commit a transaction to make the writes visible for readers. Include - * other partitions that may have been added independently. - * + * Commits the transaction together with a key value atomically. * @param partitions - extra partitions to commit. - * @throws StreamingException - if there are errors when committing the open transaction. + * @param key - key to commit. + * @param value - value to commit. + * @throws StreamingException - if there are errors when committing + * the open transaction. */ - default void commitTransactionWithPartition(@Nullable Set partitions) + default void commitTransaction(@Nullable Set partitions, + @Nullable String key, @Nullable String value) throws StreamingException { + throw new UnsupportedOperationException(); + } + + /** + * Commit a transaction to make the writes visible for readers. Include + * other partitions that may have been added independently. + * + * @param partitions - extra partitions to commit. + * @throws StreamingException - if there are errors when committing the open transaction. + */ + default void commitTransaction(@Nullable Set partitions) throws StreamingException { throw new UnsupportedOperationException(); } diff --git streaming/src/java/org/apache/hive/streaming/StreamingTransaction.java streaming/src/java/org/apache/hive/streaming/StreamingTransaction.java index 83b2f15df3..c0ee03426e 100644 --- streaming/src/java/org/apache/hive/streaming/StreamingTransaction.java +++ streaming/src/java/org/apache/hive/streaming/StreamingTransaction.java @@ -19,6 +19,8 @@ package org.apache.hive.streaming; import org.apache.hadoop.hive.metastore.api.TxnToWriteId; + +import javax.annotation.Nullable; import java.io.InputStream; import java.util.List; import java.util.Set; @@ -45,7 +47,17 @@ * @param partitions to commit. * @throws StreamingException */ - void commitWithPartitions(Set partitions) throws StreamingException; + void commit(@Nullable Set partitions) throws StreamingException; + + /** + * Commits atomically together with a key and a value. + * @param partitions to commit. + * @param key to commit. + * @param value to commit. + * @throws StreamingException + */ + void commit(@Nullable Set partitions, @Nullable String key, + @Nullable String value) throws StreamingException; /** * Abort a transaction. diff --git streaming/src/java/org/apache/hive/streaming/TransactionBatch.java streaming/src/java/org/apache/hive/streaming/TransactionBatch.java index dabbe2110e..3ba81c601a 100644 --- streaming/src/java/org/apache/hive/streaming/TransactionBatch.java +++ streaming/src/java/org/apache/hive/streaming/TransactionBatch.java @@ -244,19 +244,26 @@ private void beginNextTransactionImpl() throws StreamingException { } } - public void commitWithPartitions(Set partitions) throws StreamingException { + public void commit(Set partitions, String key, String value) + throws StreamingException { checkIsClosed(); boolean success = false; try { - commitImpl(partitions); + commitImpl(partitions, key, value); success = true; } finally { markDead(success); } } - private void commitImpl(Set partitions) throws StreamingException { + private void commitImpl(Set partitions, String key, String value) + throws StreamingException { try { + if ((key == null && value != null) || (key != null && value == null)) { + throw new StreamingException(String.format( + "If key is set, the value should be as well and vice versa," + + " key, value = %s, %s", key, value)); + } recordWriter.flush(); TxnToWriteId txnToWriteId = txnToWriteIds.get(currentTxnIndex); if (conn.isDynamicPartitioning()) { @@ -274,7 +281,13 @@ private void commitImpl(Set partitions) throws StreamingException { } transactionLock.lock(); try { - conn.getMSC().commitTxn(txnToWriteId.getTxnId()); + if (key != null) { + long tableId = conn.getTableId(); + conn.getMSC().commitTxnWithKeyValue(txnToWriteId.getTxnId(), + tableId, key, value); + } else { + conn.getMSC().commitTxn(txnToWriteId.getTxnId()); + } // increment the min txn id so that heartbeat thread will heartbeat only from the next open transaction. // the current transaction is going to committed or fail, so don't need heartbeat for current transaction. if (currentTxnIndex + 1 < txnToWriteIds.size()) { diff --git streaming/src/java/org/apache/hive/streaming/UnManagedSingleTransaction.java streaming/src/java/org/apache/hive/streaming/UnManagedSingleTransaction.java index 68b0906b3d..75779d50c7 100644 --- streaming/src/java/org/apache/hive/streaming/UnManagedSingleTransaction.java +++ streaming/src/java/org/apache/hive/streaming/UnManagedSingleTransaction.java @@ -69,7 +69,8 @@ public void beginNextTransaction() throws StreamingException { } @Override - public void commitWithPartitions(Set partitions) throws StreamingException { + public void commit(Set partitions, String key, String value) + throws StreamingException { checkIsClosed(); boolean success = false; try { diff --git streaming/src/test/org/apache/hive/streaming/TestStreaming.java streaming/src/test/org/apache/hive/streaming/TestStreaming.java index 1c9e43fad1..50433b6243 100644 --- streaming/src/test/org/apache/hive/streaming/TestStreaming.java +++ streaming/src/test/org/apache/hive/streaming/TestStreaming.java @@ -438,6 +438,43 @@ public void testGetDeltaPath() throws Exception { + "=Asia/country=India/delta_0000005_0000005_0009/bucket_00000")); } + @Test + public void testCommitWithKeyValue() throws Exception { + queryTable(driver, "drop table if exists default.keyvalue"); + queryTable(driver, "create table default.keyvalue (a string, b string) stored as orc " + + "TBLPROPERTIES('transactional'='true')"); + queryTable(driver, "insert into default.keyvalue values('foo','bar')"); + queryTable(driver, "ALTER TABLE default.keyvalue SET TBLPROPERTIES('_metamykey' = 'myvalue')"); + List rs = queryTable(driver, "select * from default.keyvalue"); + Assert.assertEquals(1, rs.size()); + Assert.assertEquals("foo\tbar", rs.get(0)); + StrictDelimitedInputWriter wr = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + HiveStreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase("Default") + .withTable("keyvalue") + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withTransactionBatchSize(2) + .withRecordWriter(wr) + .withHiveConf(conf) + .connect(); + connection.beginTransaction(); + connection.write("a1,b2".getBytes()); + connection.write("a3,b4".getBytes()); + connection.commitTransaction(null, "_metamykey", "myvalue"); + connection.close(); + + rs = queryTable(driver, "select ROW__ID, a, b, INPUT__FILE__NAME from default.keyvalue order by ROW__ID"); + Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2")); + Assert.assertTrue(rs.get(1), rs.get(1).endsWith("keyvalue/delta_0000002_0000003/bucket_00000")); + Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4")); + Assert.assertTrue(rs.get(2), rs.get(2).endsWith("keyvalue/delta_0000002_0000003/bucket_00000")); + + rs = queryTable(driver, "SHOW TBLPROPERTIES default.keyvalue('_metamykey')"); + Assert.assertEquals(rs.get(0), "_metamykey\tmyvalue", rs.get(0)); + } + @Test public void testConnectionWithWriteId() throws Exception { queryTable(driver, "drop table if exists default.writeidconnection"); @@ -1139,7 +1176,7 @@ public void testAddPartitionWithWriteId() throws Exception { Assert.fail("Partition shouldn't exist so a NoSuchObjectException should have been raised"); } catch (NoSuchObjectException e) {} - transactionConnection.commitTransactionWithPartition(partitions); + transactionConnection.commitTransaction(partitions); // Ensure partition is present Partition p = msClient.getPartition(dbName, tblName, newPartVals); @@ -1217,7 +1254,7 @@ public void testAddDynamicPartitionWithWriteId() throws Exception { partitionsOne.addAll(partitionsTwo); Set allPartitions = partitionsOne; - transactionConnection.commitTransactionWithPartition(allPartitions); + transactionConnection.commitTransaction(allPartitions); // Ensure partition is present for (String partition : allPartitions) {