diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java index aa64863..7b86b0c 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.LockComponentBuilder; import org.apache.hadoop.hive.metastore.LockRequestBuilder; +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse; import org.apache.hadoop.hive.metastore.api.LockResponse; import org.apache.hadoop.hive.metastore.api.LockState; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; @@ -81,6 +82,10 @@ public void testTxnRange() throws Exception { validTxns.isTxnRangeCommitted(1L, 3L)); List tids = client.openTxns("me", 5).getTxn_ids(); + HeartbeatTxnRangeResponse rsp = client.heartbeatTxnRange(1, 5); + Assert.assertEquals(0, rsp.getNosuch().size()); + Assert.assertEquals(0, rsp.getAborted().size()); + client.rollbackTxn(1L); client.commitTxn(2L); client.commitTxn(3L); diff --git metastore/if/hive_metastore.thrift metastore/if/hive_metastore.thrift index 97af887..ea245c7 100755 --- metastore/if/hive_metastore.thrift +++ metastore/if/hive_metastore.thrift @@ -542,6 +542,16 @@ struct HeartbeatRequest { 2: optional i64 txnid } +struct HeartbeatTxnRangeRequest { + 1: required i64 min, + 2: required i64 max +} + +struct HeartbeatTxnRangeResponse { + 1: required set aborted, + 2: required set nosuch +} + struct CompactionRequest { 1: required string dbname, 2: required string tablename, @@ -998,6 +1008,7 @@ service ThriftHiveMetastore extends fb303.FacebookService void unlock(1:UnlockRequest rqst) throws (1:NoSuchLockException o1, 2:TxnOpenException o2) ShowLocksResponse show_locks(1:ShowLocksRequest rqst) void heartbeat(1:HeartbeatRequest ids) throws (1:NoSuchLockException o1, 2:NoSuchTxnException o2, 3:TxnAbortedException o3) + HeartbeatTxnRangeResponse heartbeat_txn_range(1:HeartbeatTxnRangeRequest txns) void compact(1:CompactionRequest rqst) ShowCompactResponse show_compact(1:ShowCompactRequest rqst) } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index d5c7ba7..b419c7d 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -81,6 +81,8 @@ import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleRequest; import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleResponse; import org.apache.hadoop.hive.metastore.api.HeartbeatRequest; +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest; +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse; import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege; import org.apache.hadoop.hive.metastore.api.HiveObjectRef; import org.apache.hadoop.hive.metastore.api.HiveObjectType; @@ -4865,6 +4867,16 @@ public void heartbeat(HeartbeatRequest ids) } @Override + public HeartbeatTxnRangeResponse heartbeat_txn_range(HeartbeatTxnRangeRequest rqst) + throws TException { + try { + return getTxnHandler().heartbeatTxnRange(rqst); + } catch (MetaException e) { + throw new TException(e); + } + } + + @Override public void compact(CompactionRequest rqst) throws TException { try { getTxnHandler().compact(rqst); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index 0550589..921d109 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -72,6 +72,8 @@ import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleRequest; import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleResponse; import org.apache.hadoop.hive.metastore.api.HeartbeatRequest; +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest; +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse; import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege; import org.apache.hadoop.hive.metastore.api.HiveObjectRef; import org.apache.hadoop.hive.metastore.api.Index; @@ -1604,6 +1606,13 @@ public void heartbeat(long txnid, long lockid) } @Override + public HeartbeatTxnRangeResponse heartbeatTxnRange(long min, long max) + throws NoSuchTxnException, TxnAbortedException, TException { + HeartbeatTxnRangeRequest rqst = new HeartbeatTxnRangeRequest(min, max); + return client.heartbeat_txn_range(rqst); + } + + @Override public void compact(String dbname, String tableName, String partitionName, CompactionType type) throws TException { CompactionRequest cr = new CompactionRequest(); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java index 47c49aa..aa6eea5 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse; import org.apache.hadoop.hive.metastore.api.LockRequest; import org.apache.hadoop.hive.metastore.api.LockResponse; import org.apache.hadoop.hive.metastore.api.NoSuchLockException; @@ -1230,6 +1231,18 @@ public void heartbeat(long txnid, long lockid) TException; /** + * Send heartbeats for a range of transactions. This is for the streaming ingest client that + * will have many transactions open at once. Everyone else should use + * {@link #heartbeat(long, long)}. + * @param min minimum transaction id to heartbeat, inclusive + * @param max maximum transaction id to heartbeat, inclusive + * @return a pair of lists that tell which transactions in the list did not exist (they may + * have already been closed) and which were aborted. + * @throws TException + */ + public HeartbeatTxnRangeResponse heartbeatTxnRange(long min, long max) throws TException; + + /** * Send a request to compact a table or partition. This will not block until the compaction is * complete. It will instead put a request on the queue for that table or partition to be * compacted. No checking is done on the dbname, tableName, or partitionName to make sure they diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 2cc1157..8367021 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -600,6 +600,43 @@ public void heartbeat(HeartbeatRequest ids) } } + public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst) + throws MetaException { + try { + Connection dbConn = getDbConn(); + HeartbeatTxnRangeResponse rsp = new HeartbeatTxnRangeResponse(); + Set nosuch = new HashSet(); + Set aborted = new HashSet(); + rsp.setNosuch(nosuch); + rsp.setAborted(aborted); + try { + for (long txn = rqst.getMin(); txn <= rqst.getMax(); txn++) { + try { + heartbeatTxn(dbConn, txn); + } catch (NoSuchTxnException e) { + nosuch.add(txn); + } catch (TxnAbortedException e) { + aborted.add(txn); + } + } + return rsp; + } catch (SQLException e) { + try { + LOG.debug("Going to rollback"); + dbConn.rollback(); + } catch (SQLException e1) { + } + detectDeadlock(e, "heartbeatTxnRange"); + throw new MetaException("Unable to select from transaction database " + + StringUtils.stringifyException(e)); + } finally { + closeDbConn(dbConn); + } + } catch (DeadlockException e) { + return heartbeatTxnRange(rqst); + } + } + public void compact(CompactionRequest rqst) throws MetaException { // Put a compaction request in the queue. try { diff --git metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java index 71021e3..8287c60 100644 --- metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java +++ metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java @@ -889,6 +889,48 @@ public void testHeartbeatLock() throws Exception { } @Test + public void heartbeatTxnRange() throws Exception { + long txnid = openTxn(); + assertEquals(1, txnid); + txnid = openTxn(); + txnid = openTxn(); + HeartbeatTxnRangeResponse rsp = + txnHandler.heartbeatTxnRange(new HeartbeatTxnRangeRequest(1, 3)); + assertEquals(0, rsp.getAborted().size()); + assertEquals(0, rsp.getNosuch().size()); + } + + @Test + public void heartbeatTxnRangeOneCommitted() throws Exception { + long txnid = openTxn(); + assertEquals(1, txnid); + txnHandler.commitTxn(new CommitTxnRequest(1)); + txnid = openTxn(); + txnid = openTxn(); + HeartbeatTxnRangeResponse rsp = + txnHandler.heartbeatTxnRange(new HeartbeatTxnRangeRequest(1, 3)); + assertEquals(1, rsp.getNosuchSize()); + Long txn = rsp.getNosuch().iterator().next(); + assertEquals(1L, (long)txn); + assertEquals(0, rsp.getAborted().size()); + } + + @Test + public void heartbeatTxnRangeOneAborted() throws Exception { + long txnid = openTxn(); + assertEquals(1, txnid); + txnid = openTxn(); + txnid = openTxn(); + txnHandler.abortTxn(new AbortTxnRequest(3)); + HeartbeatTxnRangeResponse rsp = + txnHandler.heartbeatTxnRange(new HeartbeatTxnRangeRequest(1, 3)); + assertEquals(1, rsp.getAbortedSize()); + Long txn = rsp.getAborted().iterator().next(); + assertEquals(3L, (long)txn); + assertEquals(0, rsp.getNosuch().size()); + } + + @Test public void testLockTimeout() throws Exception { long timeout = txnHandler.setTimeout(1); LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");