diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 79e95cf..e5d0780 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1102,7 +1102,7 @@ private int acquireLocksAndOpenTxn(boolean startTxnImplicitly) { throw new RuntimeException("Already have an open transaction txnid:" + txnMgr.getCurrentTxnId()); } // We are writing to tables in an ACID compliant way, so we need to open a transaction - txnMgr.openTxn(userFromUGI); + txnMgr.openTxn(ctx, userFromUGI); initiatingTransaction = true; } else { diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 203eae5..af2b011 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -135,7 +135,12 @@ void setHiveConf(HiveConf conf) { } @Override - public long openTxn(String user) throws LockException { + public long openTxn(Context ctx, String user) throws LockException { + return openTxn(ctx, user, 0); + } + + @VisibleForTesting + long openTxn(Context ctx, String user, long delay) throws LockException { //todo: why don't we lock the snapshot here??? Instead of having client make an explicit call //whenever it chooses init(); @@ -146,6 +151,7 @@ public long openTxn(String user) throws LockException { txnId = client.openTxn(user); statementId = 0; LOG.debug("Opened " + JavaUtils.txnIdToString(txnId)); + ctx.setHeartbeater(startHeartbeat(delay)); return txnId; } catch (TException e) { throw new LockException(e, ErrorMsg.METASTORE_COMMUNICATION_FAILED); @@ -164,7 +170,7 @@ public HiveLockManager getLockManager() throws LockException { @Override public void acquireLocks(QueryPlan plan, Context ctx, String username) throws LockException { try { - acquireLocksWithHeartbeatDelay(plan, ctx, username, 0); + acquireLocks(plan, ctx, username, true); } catch(LockException e) { if(e.getCause() instanceof TxnAbortedException) { @@ -182,7 +188,14 @@ public void acquireLocks(QueryPlan plan, Context ctx, String username) throws Lo */ LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isBlocking) throws LockException { init(); - // Make sure we've built the lock manager + + // Start heartbeat for read-only queries which don't open transactions. + // For those that require transactions, the heartbeat has already been started in openTxn. + if (!isTxnOpen()) { + ctx.setHeartbeater(startHeartbeat(0)); + } + + // Make sure we've built the lock manager getLockManager(); boolean atLeastOneLock = false; @@ -337,6 +350,13 @@ LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isB List locks = new ArrayList(1); LockState lockState = lockMgr.lock(rqstBuilder.build(), queryId, isBlocking, locks); + + // Stop the heartbeat as soon as we realized there's no lock required for read-only queries, as it's not needed + if (!isTxnOpen() && lockState == null) { + stopHeartbeat(); + ctx.setHeartbeater(null); + } + ctx.setHiveLocks(locks); return lockState; } @@ -347,22 +367,13 @@ private static Table getTable(WriteEntity we) { } return t; } - /** - * @param delay time to delay for first heartbeat - */ - @VisibleForTesting - void acquireLocksWithHeartbeatDelay(QueryPlan plan, Context ctx, String username, long delay) throws LockException { - LockState ls = acquireLocks(plan, ctx, username, true); - if (ls != null) { // If there's no lock, we don't need to do heartbeat - ctx.setHeartbeater(startHeartbeat(delay)); - } - } - @Override public void releaseLocks(List hiveLocks) throws LockException { if (lockMgr != null) { - stopHeartbeat(); + if (isTxnOpen()) { + stopHeartbeat(); + } lockMgr.releaseLocks(hiveLocks); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java index f001f59..24fbd9a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java @@ -49,7 +49,7 @@ private HiveLockManager lockMgr; @Override - public long openTxn(String user) throws LockException { + public long openTxn(Context ctx, String user) throws LockException { // No-op return 0L; } diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java index 5b9ad60..ce220a2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java @@ -38,11 +38,12 @@ /** * Open a new transaction. + * @param ctx Context for this query * @param user Hive user who is opening this transaction. * @return The new transaction id * @throws LockException if a transaction is already open. */ - long openTxn(String user) throws LockException; + long openTxn(Context ctx, String user) throws LockException; /** * Get the lock manager. This must be used rather than instantiating an @@ -55,7 +56,7 @@ /** * Acquire all of the locks needed by a query. If used with a query that - * requires transactions, this should be called after {@link #openTxn(String)}. + * requires transactions, this should be called after {@link #openTxn(Context, String)}. * A list of acquired locks will be stored in the * {@link org.apache.hadoop.hive.ql.Context} object and can be retrieved * via {@link org.apache.hadoop.hive.ql.Context#getHiveLocks}. diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java index 460bad5..a541b9d 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java @@ -139,7 +139,7 @@ public void testJoin() throws Exception { public void testSingleWriteTable() throws Exception { WriteEntity we = addTableOutput(WriteEntity.WriteType.INSERT); QueryPlan qp = new MockQueryPlan(this); - txnMgr.openTxn("fred"); + txnMgr.openTxn(ctx, "fred"); txnMgr.acquireLocks(qp, ctx, "fred"); List locks = ctx.getHiveLocks(); Assert.assertEquals(1, locks.size()); @@ -155,7 +155,7 @@ public void testSingleWriteTable() throws Exception { public void testSingleWritePartition() throws Exception { WriteEntity we = addPartitionOutput(newTable(true), WriteEntity.WriteType.INSERT); QueryPlan qp = new MockQueryPlan(this); - txnMgr.openTxn("fred"); + txnMgr.openTxn(ctx, "fred"); txnMgr.acquireLocks(qp, ctx, "fred"); List locks = ctx.getHiveLocks(); Assert.assertEquals(1, locks.size()); @@ -170,7 +170,7 @@ public void testSingleWritePartition() throws Exception { public void testWriteDynamicPartition() throws Exception { WriteEntity we = addDynamicPartitionedOutput(newTable(true), WriteEntity.WriteType.INSERT); QueryPlan qp = new MockQueryPlan(this); - txnMgr.openTxn("fred"); + txnMgr.openTxn(ctx, "fred"); txnMgr.acquireLocks(qp, ctx, "fred"); List locks = ctx.getHiveLocks(); Assert.assertEquals(1, locks.size()); @@ -213,7 +213,7 @@ private void runReaper() throws Exception { public void testExceptions() throws Exception { addPartitionOutput(newTable(true), WriteEntity.WriteType.INSERT); QueryPlan qp = new MockQueryPlan(this); - txnMgr.openTxn("NicholasII"); + ((DbTxnManager) txnMgr).openTxn(ctx, "NicholasII", HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2); Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS)); runReaper(); LockException exception = null; @@ -227,7 +227,7 @@ public void testExceptions() throws Exception { Assert.assertEquals("Wrong Exception1", ErrorMsg.TXN_ABORTED, exception.getCanonicalErrorMsg()); exception = null; - txnMgr.openTxn("AlexanderIII"); + ((DbTxnManager) txnMgr).openTxn(ctx, "AlexanderIII", HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2); Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS)); runReaper(); try { @@ -246,14 +246,11 @@ public void testLockTimeout() throws Exception { QueryPlan qp = new MockQueryPlan(this); //make sure it works with nothing to expire expireLocks(txnMgr, 0); - //create a few read locks, all on the same resource - for(int i = 0; i < 5; i++) { - ((DbTxnManager)txnMgr).acquireLocks(qp, ctx, "PeterI" + i, true); // No heartbeat - } - expireLocks(txnMgr, 5); + // Specify a long enough delay for the first heartbeat + ((DbTxnManager) txnMgr).openTxn(ctx, "PeterI", HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 50); //create a lot of locks for(int i = 0; i < TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17; i++) { - ((DbTxnManager)txnMgr).acquireLocks(qp, ctx, "PeterI" + i, true); // No heartbeat + ((DbTxnManager)txnMgr).acquireLocks(qp, ctx, "PeterI" + i, true); // Heartbeat hasn't started yet } expireLocks(txnMgr, TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17); } @@ -275,7 +272,7 @@ public void testReadWrite() throws Exception { addPartitionInput(t); WriteEntity we = addTableOutput(WriteEntity.WriteType.INSERT); QueryPlan qp = new MockQueryPlan(this); - txnMgr.openTxn("fred"); + txnMgr.openTxn(ctx, "fred"); txnMgr.acquireLocks(qp, ctx, "fred"); List locks = ctx.getHiveLocks(); Assert.assertEquals(1, locks.size()); @@ -290,7 +287,7 @@ public void testReadWrite() throws Exception { public void testUpdate() throws Exception { WriteEntity we = addTableOutput(WriteEntity.WriteType.UPDATE); QueryPlan qp = new MockQueryPlan(this); - txnMgr.openTxn("fred"); + txnMgr.openTxn(ctx, "fred"); txnMgr.acquireLocks(qp, ctx, "fred"); List locks = ctx.getHiveLocks(); Assert.assertEquals(1, locks.size()); @@ -305,7 +302,7 @@ public void testUpdate() throws Exception { public void testDelete() throws Exception { WriteEntity we = addTableOutput(WriteEntity.WriteType.DELETE); QueryPlan qp = new MockQueryPlan(this); - txnMgr.openTxn("fred"); + txnMgr.openTxn(ctx, "fred"); txnMgr.acquireLocks(qp, ctx, "fred"); List locks = ctx.getHiveLocks(); Assert.assertEquals(1, locks.size()); @@ -320,7 +317,7 @@ public void testDelete() throws Exception { public void testRollback() throws Exception { WriteEntity we = addTableOutput(WriteEntity.WriteType.DELETE); QueryPlan qp = new MockQueryPlan(this); - txnMgr.openTxn("fred"); + txnMgr.openTxn(ctx, "fred"); txnMgr.acquireLocks(qp, ctx, "fred"); List locks = ctx.getHiveLocks(); Assert.assertEquals(1, locks.size()); @@ -404,7 +401,7 @@ public void testHeartbeater() throws Exception { QueryPlan qp = new MockQueryPlan(this); // Case 1: If there's no delay for the heartbeat, txn should be able to commit - txnMgr.openTxn("fred"); + txnMgr.openTxn(ctx, "fred"); txnMgr.acquireLocks(qp, ctx, "fred"); // heartbeat started.. runReaper(); try { @@ -417,10 +414,10 @@ public void testHeartbeater() throws Exception { // Case 2: If there's delay for the heartbeat, but the delay is within the reaper's tolerance, // then txt should be able to commit - txnMgr.openTxn("tom"); // Start the heartbeat after a delay, which is shorter than the HIVE_TXN_TIMEOUT - ((DbTxnManager) txnMgr).acquireLocksWithHeartbeatDelay(qp, ctx, "tom", + ((DbTxnManager) txnMgr).openTxn(ctx, "tom", HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) / 2); + ((DbTxnManager) txnMgr).acquireLocks(qp, ctx, "tom", true); runReaper(); try { txnMgr.commitTxn(); @@ -433,8 +430,8 @@ public void testHeartbeater() throws Exception { // Case 3: If there's delay for the heartbeat, and the delay is long enough to trigger the reaper, // then the txn will time out and be aborted. // Here we just don't send the heartbeat at all - an infinite delay. - txnMgr.openTxn("jerry"); // Start the heartbeat after a delay, which exceeds the HIVE_TXN_TIMEOUT + ((DbTxnManager) txnMgr).openTxn(ctx, "jerry", HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2); ((DbTxnManager) txnMgr).acquireLocks(qp, ctx, "jerry", true); Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS)); runReaper(); @@ -445,6 +442,17 @@ public void testHeartbeater() throws Exception { } Assert.assertNotNull("Txn should have been aborted", exception); Assert.assertEquals(ErrorMsg.TXN_ABORTED, exception.getCanonicalErrorMsg()); + + // Case 4: Read-only query doesn't open transactions, but we will trigger heartbeat if there's lock required. + txnMgr.acquireLocks(qp, ctx, "ted"); + List locks = ctx.getHiveLocks(); + Assert.assertEquals(1, locks.size()); + Assert.assertEquals(1, + TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock) locks.get(0)).lockId)); + Assert.assertTrue(ctx.getHeartbeater() != null); + txnMgr.getLockManager().unlock(locks.get(0)); + locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(0, locks.size()); } @Before diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 637a01a..c0e21cd 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -107,7 +107,7 @@ public void testLocksInSubquery() throws Exception { checkCmdOnDriver(driver.run("create table if not exists R (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')")); checkCmdOnDriver(driver.compileAndRespond("delete from S where a in (select a from T where b = 1)")); - txnMgr.openTxn("one"); + txnMgr.openTxn(ctx, "one"); txnMgr.acquireLocks(driver.getPlan(), ctx, "one"); List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -116,7 +116,7 @@ public void testLocksInSubquery() throws Exception { txnMgr.rollbackTxn(); checkCmdOnDriver(driver.compileAndRespond("update S set a = 7 where a in (select a from T where b = 1)")); - txnMgr.openTxn("one"); + txnMgr.openTxn(ctx, "one"); txnMgr.acquireLocks(driver.getPlan(), ctx, "one"); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -125,7 +125,7 @@ public void testLocksInSubquery() throws Exception { txnMgr.rollbackTxn(); checkCmdOnDriver(driver.compileAndRespond("insert into R select * from S where a in (select a from T where b = 1)")); - txnMgr.openTxn("three"); + txnMgr.openTxn(ctx, "three"); txnMgr.acquireLocks(driver.getPlan(), ctx, "three"); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 3, locks.size()); @@ -227,7 +227,7 @@ public void lockConflictDbTable() throws Exception { checkCmdOnDriver(cpr); cpr = driver.compileAndRespond("update temp.T7 set a = 5 where b = 6"); checkCmdOnDriver(cpr); - txnMgr.openTxn("Fifer"); + txnMgr.openTxn(ctx, "Fifer"); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); checkCmdOnDriver(driver.compileAndRespond("drop database if exists temp")); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); @@ -253,12 +253,12 @@ public void updateSelectUpdate() throws Exception { checkCmdOnDriver(cpr); cpr = driver.compileAndRespond("delete from T8 where b = 89"); checkCmdOnDriver(cpr); - txnMgr.openTxn("Fifer"); + txnMgr.openTxn(ctx, "Fifer"); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");//gets SS lock on T8 cpr = driver.compileAndRespond("select a from T8");//gets S lock on T8 checkCmdOnDriver(cpr); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - txnMgr2.openTxn("Fiddler"); + txnMgr2.openTxn(ctx, "Fiddler"); txnMgr2.acquireLocks(driver.getPlan(), ctx, "Fiddler"); checkCmdOnDriver(driver.compileAndRespond("update T8 set a = 1 where b = 1")); ((DbTxnManager) txnMgr2).acquireLocks(driver.getPlan(), ctx, "Practical", false);//waits for SS lock on T8 from fifer @@ -662,7 +662,7 @@ public void checkExpectedLocks2() throws Exception { "clustered by (na) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='false')")); checkCmdOnDriver(driver.run("insert into tab_acid partition(p) (a,b,p) values(1,2,'foo'),(3,4,'bar')")); checkCmdOnDriver(driver.run("insert into tab_not_acid partition(np) (na,nb,np) values(1,2,'blah'),(3,4,'doh')")); - txnMgr.openTxn("T1"); + txnMgr.openTxn(ctx, "T1"); checkCmdOnDriver(driver.compileAndRespond("select * from tab_acid inner join tab_not_acid on a = na")); txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); List locks = getLocks(txnMgr, true); @@ -675,7 +675,7 @@ public void checkExpectedLocks2() throws Exception { checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "tab_not_acid", "np=doh", locks); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - txnMgr2.openTxn("T2"); + txnMgr2.openTxn(ctx, "T2"); checkCmdOnDriver(driver.compileAndRespond("insert into tab_not_acid partition(np='doh') values(5,6)")); LockState ls = ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "T2", false); locks = getLocks(txnMgr2, true); @@ -899,12 +899,12 @@ public void testWriteSetTracking1() throws Exception { checkCmdOnDriver(cpr); checkCmdOnDriver(driver.compileAndRespond("select * from TAB_PART")); - txnMgr.openTxn("Nicholas"); + txnMgr.openTxn(ctx, "Nicholas"); checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); txnMgr.acquireLocks(driver.getPlan(), ctx, "Nicholas"); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); txnMgr.commitTxn(); - txnMgr2.openTxn("Alexandra"); + txnMgr2.openTxn(ctx, "Alexandra"); checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); txnMgr2.acquireLocks(driver.getPlan(), ctx, "Nicholas"); txnMgr2.commitTxn(); @@ -928,10 +928,10 @@ public void testWriteSetTracking2() throws Exception { checkCmdOnDriver(cpr); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - txnMgr.openTxn("Peter"); + txnMgr.openTxn(ctx, "Peter"); checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); txnMgr.acquireLocks(driver.getPlan(), ctx, "Peter"); - txnMgr2.openTxn("Catherine"); + txnMgr2.openTxn(ctx, "Catherine"); List locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 1, locks.size()); //note that "update" uses dynamic partitioning thus lock is on the table not partition @@ -953,8 +953,8 @@ public void testWriteSetTracking3() throws Exception { checkCmdOnDriver(driver.run("insert into TAB_PART partition(p='blah') values(1,2)")); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - long txnId = txnMgr.openTxn("Known"); - long txnId2 = txnMgr2.openTxn("Unknown"); + long txnId = txnMgr.openTxn(ctx, "Known"); + long txnId2 = txnMgr2.openTxn(ctx, "Unknown"); checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); txnMgr.acquireLocks(driver.getPlan(), ctx, "Known"); List locks = getLocks(txnMgr); @@ -1003,7 +1003,7 @@ public void testWriteSetTracking4() throws Exception { "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); - txnMgr.openTxn("Long Running"); + txnMgr.openTxn(ctx, "Long Running"); checkCmdOnDriver(driver.compileAndRespond("select a from TAB_PART where p = 'blah'")); txnMgr.acquireLocks(driver.getPlan(), ctx, "Long Running"); List locks = getLocks(txnMgr); @@ -1014,7 +1014,7 @@ public void testWriteSetTracking4() throws Exception { checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB_PART", null, locks); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - txnMgr2.openTxn("Short Running"); + txnMgr2.openTxn(ctx, "Short Running"); checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 7 where p = 'blah'"));//no such partition txnMgr2.acquireLocks(driver.getPlan(), ctx, "Short Running"); locks = getLocks(txnMgr); @@ -1031,7 +1031,7 @@ public void testWriteSetTracking4() throws Exception { //Short Running updated nothing, so we expect 0 rows in WRITE_SET Assert.assertEquals( 0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET")); - txnMgr2.openTxn("T3"); + txnMgr2.openTxn(ctx, "T3"); checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 7 where p = 'two'"));//pretend this partition exists txnMgr2.acquireLocks(driver.getPlan(), ctx, "T3"); locks = getLocks(txnMgr); @@ -1080,8 +1080,8 @@ public void testWriteSetTracking5() throws Exception { checkCmdOnDriver(driver.run("insert into TAB_PART partition(p='blah') values(1,2)")); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - txnMgr.openTxn("Known"); - long txnId = txnMgr2.openTxn("Unknown"); + txnMgr.openTxn(ctx, "Known"); + long txnId = txnMgr2.openTxn(ctx, "Unknown"); checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); txnMgr.acquireLocks(driver.getPlan(), ctx, "Known"); List locks = getLocks(txnMgr); @@ -1119,7 +1119,7 @@ public void testWriteSetTracking6() throws Exception { Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB2", null, locks); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - txnMgr2.openTxn("Horton"); + txnMgr2.openTxn(ctx, "Horton"); checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 17 where a = 101")); txnMgr2.acquireLocks(driver.getPlan(), ctx, "Horton"); Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET")); @@ -1151,7 +1151,7 @@ public void testWriteSetTracking7() throws Exception { HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); //test with predicates such that partition pruning works - txnMgr2.openTxn("T2"); + txnMgr2.openTxn(ctx, "T2"); checkCmdOnDriver(driver.compileAndRespond("update tab2 set b = 7 where p='two'")); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2"); List locks = getLocks(txnMgr2); @@ -1159,7 +1159,7 @@ public void testWriteSetTracking7() throws Exception { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=two", locks); //now start concurrent txn - txnMgr.openTxn("T3"); + txnMgr.openTxn(ctx, "T3"); checkCmdOnDriver(driver.compileAndRespond("update tab2 set b = 7 where p='one'")); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); locks = getLocks(txnMgr); @@ -1199,7 +1199,7 @@ public void testWriteSetTracking7() throws Exception { "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:4 - txnMgr2.openTxn("T5"); + txnMgr2.openTxn(ctx, "T5"); checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1")); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T5"); locks = getLocks(txnMgr2); @@ -1208,7 +1208,7 @@ public void testWriteSetTracking7() throws Exception { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); //now start concurrent txn - txnMgr.openTxn("T6"); + txnMgr.openTxn(ctx, "T6"); checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b = 2")); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T6", false); locks = getLocks(txnMgr); @@ -1256,7 +1256,7 @@ public void testWriteSetTracking8() throws Exception { checkCmdOnDriver(cpr); checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1 HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - txnMgr2.openTxn("T2"); + txnMgr2.openTxn(ctx, "T2"); checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1")); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2"); List locks = getLocks(txnMgr2); @@ -1265,7 +1265,7 @@ public void testWriteSetTracking8() throws Exception { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); //now start concurrent txn - txnMgr.openTxn("T3"); + txnMgr.openTxn(ctx, "T3"); checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where p='two'")); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); locks = getLocks(txnMgr); @@ -1310,7 +1310,7 @@ public void testWriteSetTracking9() throws Exception { checkCmdOnDriver(cpr); checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1 HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - txnMgr2.openTxn("T2"); + txnMgr2.openTxn(ctx, "T2"); checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1")); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2"); List locks = getLocks(txnMgr2); @@ -1319,7 +1319,7 @@ public void testWriteSetTracking9() throws Exception { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); //now start concurrent txn - txnMgr.openTxn("T3"); + txnMgr.openTxn(ctx, "T3"); checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2")); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); locks = getLocks(txnMgr); @@ -1370,7 +1370,7 @@ public void testWriteSetTracking10() throws Exception { checkCmdOnDriver(cpr); checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1 HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - txnMgr2.openTxn("T2"); + txnMgr2.openTxn(ctx, "T2"); checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=2")); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2"); List locks = getLocks(txnMgr2); @@ -1379,7 +1379,7 @@ public void testWriteSetTracking10() throws Exception { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); //now start concurrent txn - txnMgr.openTxn("T3"); + txnMgr.openTxn(ctx, "T3"); checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2")); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); locks = getLocks(txnMgr); @@ -1432,7 +1432,7 @@ public void testWriteSetTracking11() throws Exception { checkCmdOnDriver(cpr); checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1 HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - txnMgr2.openTxn("T2"); + txnMgr2.openTxn(ctx, "T2"); checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where b=2")); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2"); List locks = getLocks(txnMgr2); @@ -1441,7 +1441,7 @@ public void testWriteSetTracking11() throws Exception { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); //now start concurrent txn - txnMgr.openTxn("T3"); + txnMgr.openTxn(ctx, "T3"); checkCmdOnDriver(driver.compileAndRespond("select * from tab1 where b=1 and p='one'")); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2")); @@ -1598,7 +1598,7 @@ private void testMerge3Way(boolean cc) throws Exception { "(9,100,1,2), (3,4,1,2), (5,13,1,3), (7,8,2,2), (14,15,2,1)")); - long txnId1 = txnMgr.openTxn("T1"); + long txnId1 = txnMgr.openTxn(ctx, "T1"); checkCmdOnDriver(driver.compileAndRespond("merge into target t using source s on t.a=s.b " + "when matched and t.a=5 then update set b=s.b " + //updates p=1/q=3 "when matched and t.a in (3,7) then delete " + //deletes from p=1/q=2, p=2/q=2 @@ -1614,7 +1614,7 @@ private void testMerge3Way(boolean cc) throws Exception { //start concurrent txn DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - long txnId2 = txnMgr2.openTxn("T2"); + long txnId2 = txnMgr2.openTxn(ctx, "T2"); checkCmdOnDriver(driver.compileAndRespond("merge into target t using source2 s on t.a=s.b " + "when matched and t.a=" + (cc ? 5 : 9) + " then update set b=s.b " + //if conflict updates p=1/q=3 else update p=1/q=2 "when matched and t.a in (3,7) then delete " + //deletes from p=1/q=2, p=2/q=2 @@ -1806,7 +1806,7 @@ private void testMergeUnpartitioned(boolean causeConflict) throws Exception { checkCmdOnDriver(driver.run("insert into target values (1,2), (3,4), (5,6), (7,8)")); checkCmdOnDriver(driver.run("create table source (a int, b int)")); - long txnid1 = txnMgr.openTxn("T1"); + long txnid1 = txnMgr.openTxn(ctx, "T1"); if(causeConflict) { checkCmdOnDriver(driver.compileAndRespond("update target set b = 2 where a=1")); } @@ -1827,7 +1827,7 @@ private void testMergeUnpartitioned(boolean causeConflict) throws Exception { DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); //start a 2nd (overlapping) txn - long txnid2 = txnMgr2.openTxn("T2"); + long txnid2 = txnMgr2.openTxn(ctx, "T2"); checkCmdOnDriver(driver.compileAndRespond("merge into target t using source s " + "on t.a=s.a " + "when matched then delete " + @@ -1903,7 +1903,7 @@ public void testDynamicPartitionInsert() throws Exception { checkCmdOnDriver(driver.run("create table target (a int, b int) " + "partitioned by (p int, q int) clustered by (a) into 2 buckets " + "stored as orc TBLPROPERTIES ('transactional'='true')")); - long txnid1 = txnMgr.openTxn("T1"); + long txnid1 = txnMgr.openTxn(ctx, "T1"); checkCmdOnDriver(driver.compileAndRespond("insert into target partition(p=1,q) values (1,2,2), (3,4,2), (5,6,3), (7,8,2)")); txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); List locks = getLocks(txnMgr, true); @@ -1935,7 +1935,7 @@ public void testDynamicPartitionInsert() throws Exception { TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + (txnid1 + 1))); - long txnid2 = txnMgr.openTxn("T1"); + long txnid2 = txnMgr.openTxn(ctx, "T1"); checkCmdOnDriver(driver.compileAndRespond("insert into target partition(p=1,q) values (10,2,2), (30,4,2), (50,6,3), (70,8,2)")); txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); locks = getLocks(txnMgr, true); @@ -1974,7 +1974,7 @@ private void testMergePartitioned(boolean causeConflict) throws Exception { checkCmdOnDriver(driver.run("insert into target partition(p,q) values (1,2,1,2), (3,4,1,2), (5,6,1,3), (7,8,2,2)")); checkCmdOnDriver(driver.run("create table source (a1 int, b1 int, p1 int, q1 int)")); - long txnId1 = txnMgr.openTxn("T1"); + long txnId1 = txnMgr.openTxn(ctx, "T1"); checkCmdOnDriver(driver.compileAndRespond("update target set b = 2 where p=1")); txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); List locks = getLocks(txnMgr, true); @@ -1984,7 +1984,7 @@ private void testMergePartitioned(boolean causeConflict) throws Exception { DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); //start a 2nd (overlapping) txn - long txnid2 = txnMgr2.openTxn("T2"); + long txnid2 = txnMgr2.openTxn(ctx, "T2"); checkCmdOnDriver(driver.compileAndRespond("merge into target using source " + "on target.p=source.p1 and target.a=source.a1 " + "when matched then update set b = 11 " +