diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 79e95cf..e5d0780 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1102,7 +1102,7 @@ private int acquireLocksAndOpenTxn(boolean startTxnImplicitly) { throw new RuntimeException("Already have an open transaction txnid:" + txnMgr.getCurrentTxnId()); } // We are writing to tables in an ACID compliant way, so we need to open a transaction - txnMgr.openTxn(userFromUGI); + txnMgr.openTxn(ctx, userFromUGI); initiatingTransaction = true; } else { diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 203eae5..dcf32b8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -135,7 +135,12 @@ void setHiveConf(HiveConf conf) { } @Override - public long openTxn(String user) throws LockException { + public long openTxn(Context ctx, String user) throws LockException { + return openTxn(ctx, user, 0); + } + + @VisibleForTesting + long openTxn(Context ctx, String user, long delay) throws LockException { //todo: why don't we lock the snapshot here??? Instead of having client make an explicit call //whenever it chooses init(); @@ -146,6 +151,7 @@ public long openTxn(String user) throws LockException { txnId = client.openTxn(user); statementId = 0; LOG.debug("Opened " + JavaUtils.txnIdToString(txnId)); + ctx.setHeartbeater(startHeartbeat(delay)); return txnId; } catch (TException e) { throw new LockException(e, ErrorMsg.METASTORE_COMMUNICATION_FAILED); @@ -164,7 +170,7 @@ public HiveLockManager getLockManager() throws LockException { @Override public void acquireLocks(QueryPlan plan, Context ctx, String username) throws LockException { try { - acquireLocksWithHeartbeatDelay(plan, ctx, username, 0); + acquireLocks(plan, ctx, username, true, true); } catch(LockException e) { if(e.getCause() instanceof TxnAbortedException) { @@ -177,10 +183,11 @@ public void acquireLocks(QueryPlan plan, Context ctx, String username) throws Lo /** * This is for testing only. Normally client should call {@link #acquireLocks(org.apache.hadoop.hive.ql.QueryPlan, org.apache.hadoop.hive.ql.Context, String)} - * @param isBlocking if false, the method will return immediately; thus the locks may be in LockState.WAITING + * @param isBlocking if false, the method will return immediately and not start a heartbeat; thus the locks may be in LockState.WAITING + * @param startHeartbeat if false, no heartbeat will be started (for R/O queries) * @return null if no locks were needed */ - LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isBlocking) throws LockException { + LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isBlocking, boolean startHeartbeat) throws LockException { init(); // Make sure we've built the lock manager getLockManager(); @@ -338,6 +345,11 @@ LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isB List locks = new ArrayList(1); LockState lockState = lockMgr.lock(rqstBuilder.build(), queryId, isBlocking, locks); ctx.setHiveLocks(locks); + if (lockState != null && !isTxnOpen() && startHeartbeat) { + // Start heartbeat for read-only queries which don't open transactions but requires locks. + // For those that require transactions, the heartbeat has already been started in openTxn. + ctx.setHeartbeater(startHeartbeat(0)); + } return lockState; } private static Table getTable(WriteEntity we) { @@ -347,17 +359,6 @@ private static Table getTable(WriteEntity we) { } return t; } - /** - * @param delay time to delay for first heartbeat - */ - @VisibleForTesting - void acquireLocksWithHeartbeatDelay(QueryPlan plan, Context ctx, String username, long delay) throws LockException { - LockState ls = acquireLocks(plan, ctx, username, true); - if (ls != null) { // If there's no lock, we don't need to do heartbeat - ctx.setHeartbeater(startHeartbeat(delay)); - } - } - @Override public void releaseLocks(List hiveLocks) throws LockException { diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java index f001f59..24fbd9a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java @@ -49,7 +49,7 @@ private HiveLockManager lockMgr; @Override - public long openTxn(String user) throws LockException { + public long openTxn(Context ctx, String user) throws LockException { // No-op return 0L; } diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java index 5b9ad60..ce220a2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java @@ -38,11 +38,12 @@ /** * Open a new transaction. + * @param ctx Context for this query * @param user Hive user who is opening this transaction. * @return The new transaction id * @throws LockException if a transaction is already open. */ - long openTxn(String user) throws LockException; + long openTxn(Context ctx, String user) throws LockException; /** * Get the lock manager. This must be used rather than instantiating an @@ -55,7 +56,7 @@ /** * Acquire all of the locks needed by a query. If used with a query that - * requires transactions, this should be called after {@link #openTxn(String)}. + * requires transactions, this should be called after {@link #openTxn(Context, String)}. * A list of acquired locks will be stored in the * {@link org.apache.hadoop.hive.ql.Context} object and can be retrieved * via {@link org.apache.hadoop.hive.ql.Context#getHiveLocks}. diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java index 460bad5..d5b1f88 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java @@ -139,7 +139,7 @@ public void testJoin() throws Exception { public void testSingleWriteTable() throws Exception { WriteEntity we = addTableOutput(WriteEntity.WriteType.INSERT); QueryPlan qp = new MockQueryPlan(this); - txnMgr.openTxn("fred"); + txnMgr.openTxn(ctx, "fred"); txnMgr.acquireLocks(qp, ctx, "fred"); List locks = ctx.getHiveLocks(); Assert.assertEquals(1, locks.size()); @@ -155,7 +155,7 @@ public void testSingleWriteTable() throws Exception { public void testSingleWritePartition() throws Exception { WriteEntity we = addPartitionOutput(newTable(true), WriteEntity.WriteType.INSERT); QueryPlan qp = new MockQueryPlan(this); - txnMgr.openTxn("fred"); + txnMgr.openTxn(ctx, "fred"); txnMgr.acquireLocks(qp, ctx, "fred"); List locks = ctx.getHiveLocks(); Assert.assertEquals(1, locks.size()); @@ -170,7 +170,7 @@ public void testSingleWritePartition() throws Exception { public void testWriteDynamicPartition() throws Exception { WriteEntity we = addDynamicPartitionedOutput(newTable(true), WriteEntity.WriteType.INSERT); QueryPlan qp = new MockQueryPlan(this); - txnMgr.openTxn("fred"); + txnMgr.openTxn(ctx, "fred"); txnMgr.acquireLocks(qp, ctx, "fred"); List locks = ctx.getHiveLocks(); Assert.assertEquals(1, locks.size()); @@ -213,7 +213,7 @@ private void runReaper() throws Exception { public void testExceptions() throws Exception { addPartitionOutput(newTable(true), WriteEntity.WriteType.INSERT); QueryPlan qp = new MockQueryPlan(this); - txnMgr.openTxn("NicholasII"); + ((DbTxnManager) txnMgr).openTxn(ctx, "NicholasII", HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2); Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS)); runReaper(); LockException exception = null; @@ -227,7 +227,7 @@ public void testExceptions() throws Exception { Assert.assertEquals("Wrong Exception1", ErrorMsg.TXN_ABORTED, exception.getCanonicalErrorMsg()); exception = null; - txnMgr.openTxn("AlexanderIII"); + ((DbTxnManager) txnMgr).openTxn(ctx, "AlexanderIII", HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2); Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS)); runReaper(); try { @@ -245,26 +245,30 @@ public void testLockTimeout() throws Exception { addPartitionInput(newTable(true)); QueryPlan qp = new MockQueryPlan(this); //make sure it works with nothing to expire - expireLocks(txnMgr, 0); + expireLocks(txnMgr, 0, true); //create a few read locks, all on the same resource for(int i = 0; i < 5; i++) { - ((DbTxnManager)txnMgr).acquireLocks(qp, ctx, "PeterI" + i, true); // No heartbeat + ((DbTxnManager)txnMgr).acquireLocks(qp, ctx, "PeterI" + i, true, false); // No heartbeat } - expireLocks(txnMgr, 5); + expireLocks(txnMgr, 5, true); //create a lot of locks for(int i = 0; i < TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17; i++) { - ((DbTxnManager)txnMgr).acquireLocks(qp, ctx, "PeterI" + i, true); // No heartbeat + ((DbTxnManager)txnMgr).acquireLocks(qp, ctx, "PeterI" + i, true, false); // No heartbeat } - expireLocks(txnMgr, TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17); + expireLocks(txnMgr, TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17, true); } - private void expireLocks(HiveTxnManager txnMgr, int numLocksBefore) throws Exception { + private void expireLocks(HiveTxnManager txnMgr, int numLocksBefore, boolean locksExpire) throws Exception { DbLockManager lockManager = (DbLockManager)txnMgr.getLockManager(); ShowLocksResponse resp = lockManager.getLocks(); Assert.assertEquals("Wrong number of locks before expire", numLocksBefore, resp.getLocks().size()); Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS)); runReaper(); resp = lockManager.getLocks(); - Assert.assertEquals("Expected all locks to expire", 0, resp.getLocks().size()); + if (locksExpire) { + Assert.assertEquals("Expected all locks to expire", 0, resp.getLocks().size()); + } else { + Assert.assertEquals("No lock should expire because there is heartbeating", numLocksBefore, resp.getLocks().size()); + } } @Test @@ -275,7 +279,7 @@ public void testReadWrite() throws Exception { addPartitionInput(t); WriteEntity we = addTableOutput(WriteEntity.WriteType.INSERT); QueryPlan qp = new MockQueryPlan(this); - txnMgr.openTxn("fred"); + txnMgr.openTxn(ctx, "fred"); txnMgr.acquireLocks(qp, ctx, "fred"); List locks = ctx.getHiveLocks(); Assert.assertEquals(1, locks.size()); @@ -290,7 +294,7 @@ public void testReadWrite() throws Exception { public void testUpdate() throws Exception { WriteEntity we = addTableOutput(WriteEntity.WriteType.UPDATE); QueryPlan qp = new MockQueryPlan(this); - txnMgr.openTxn("fred"); + txnMgr.openTxn(ctx, "fred"); txnMgr.acquireLocks(qp, ctx, "fred"); List locks = ctx.getHiveLocks(); Assert.assertEquals(1, locks.size()); @@ -305,7 +309,7 @@ public void testUpdate() throws Exception { public void testDelete() throws Exception { WriteEntity we = addTableOutput(WriteEntity.WriteType.DELETE); QueryPlan qp = new MockQueryPlan(this); - txnMgr.openTxn("fred"); + txnMgr.openTxn(ctx, "fred"); txnMgr.acquireLocks(qp, ctx, "fred"); List locks = ctx.getHiveLocks(); Assert.assertEquals(1, locks.size()); @@ -320,7 +324,7 @@ public void testDelete() throws Exception { public void testRollback() throws Exception { WriteEntity we = addTableOutput(WriteEntity.WriteType.DELETE); QueryPlan qp = new MockQueryPlan(this); - txnMgr.openTxn("fred"); + txnMgr.openTxn(ctx, "fred"); txnMgr.acquireLocks(qp, ctx, "fred"); List locks = ctx.getHiveLocks(); Assert.assertEquals(1, locks.size()); @@ -404,7 +408,7 @@ public void testHeartbeater() throws Exception { QueryPlan qp = new MockQueryPlan(this); // Case 1: If there's no delay for the heartbeat, txn should be able to commit - txnMgr.openTxn("fred"); + txnMgr.openTxn(ctx, "fred"); txnMgr.acquireLocks(qp, ctx, "fred"); // heartbeat started.. runReaper(); try { @@ -417,10 +421,10 @@ public void testHeartbeater() throws Exception { // Case 2: If there's delay for the heartbeat, but the delay is within the reaper's tolerance, // then txt should be able to commit - txnMgr.openTxn("tom"); // Start the heartbeat after a delay, which is shorter than the HIVE_TXN_TIMEOUT - ((DbTxnManager) txnMgr).acquireLocksWithHeartbeatDelay(qp, ctx, "tom", + ((DbTxnManager) txnMgr).openTxn(ctx, "tom", HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) / 2); + ((DbTxnManager) txnMgr).acquireLocks(qp, ctx, "tom", true, true); runReaper(); try { txnMgr.commitTxn(); @@ -433,9 +437,9 @@ public void testHeartbeater() throws Exception { // Case 3: If there's delay for the heartbeat, and the delay is long enough to trigger the reaper, // then the txn will time out and be aborted. // Here we just don't send the heartbeat at all - an infinite delay. - txnMgr.openTxn("jerry"); // Start the heartbeat after a delay, which exceeds the HIVE_TXN_TIMEOUT - ((DbTxnManager) txnMgr).acquireLocks(qp, ctx, "jerry", true); + ((DbTxnManager) txnMgr).openTxn(ctx, "jerry", HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2); + ((DbTxnManager) txnMgr).acquireLocks(qp, ctx, "jerry", true, true); Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS)); runReaper(); try { @@ -445,6 +449,11 @@ public void testHeartbeater() throws Exception { } Assert.assertNotNull("Txn should have been aborted", exception); Assert.assertEquals(ErrorMsg.TXN_ABORTED, exception.getCanonicalErrorMsg()); + + // Case 4: Read-only query doesn't open transactions, but we will trigger heartbeat if there's lock required. + ((DbTxnManager)txnMgr).acquireLocks(qp, ctx, "peter", true, true); // start heartbeat + // Since we have heartbeat being sent, the locks won't expire + expireLocks(txnMgr, 1, false); } @Before diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 3c9358d..7364fce 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -107,7 +107,7 @@ public void testLocksInSubquery() throws Exception { checkCmdOnDriver(driver.run("create table if not exists R (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')")); checkCmdOnDriver(driver.compileAndRespond("delete from S where a in (select a from T where b = 1)")); - txnMgr.openTxn("one"); + txnMgr.openTxn(ctx, "one"); txnMgr.acquireLocks(driver.getPlan(), ctx, "one"); List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -116,7 +116,7 @@ public void testLocksInSubquery() throws Exception { txnMgr.rollbackTxn(); checkCmdOnDriver(driver.compileAndRespond("update S set a = 7 where a in (select a from T where b = 1)")); - txnMgr.openTxn("one"); + txnMgr.openTxn(ctx, "one"); txnMgr.acquireLocks(driver.getPlan(), ctx, "one"); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -125,7 +125,7 @@ public void testLocksInSubquery() throws Exception { txnMgr.rollbackTxn(); checkCmdOnDriver(driver.compileAndRespond("insert into R select * from S where a in (select a from T where b = 1)")); - txnMgr.openTxn("three"); + txnMgr.openTxn(ctx, "three"); txnMgr.acquireLocks(driver.getPlan(), ctx, "three"); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 3, locks.size()); @@ -199,7 +199,7 @@ public void basicBlocking() throws Exception { cpr = driver.compileAndRespond("drop table if exists T6"); checkCmdOnDriver(cpr); //tries to get X lock on T1 and gets Waiting state - LockState lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Fiddler", false); + LockState lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Fiddler", false, true); List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T6", null, locks); @@ -227,12 +227,12 @@ public void lockConflictDbTable() throws Exception { checkCmdOnDriver(cpr); cpr = driver.compileAndRespond("update temp.T7 set a = 5 where b = 6"); checkCmdOnDriver(cpr); - txnMgr.openTxn("Fifer"); + txnMgr.openTxn(ctx, "Fifer"); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); checkCmdOnDriver(driver.compileAndRespond("drop database if exists temp")); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); //txnMgr2.openTxn("Fiddler"); - ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Fiddler", false);//gets SS lock on T7 + ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Fiddler", false, true);//gets SS lock on T7 List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "temp", "T7", null, locks); @@ -253,15 +253,15 @@ public void updateSelectUpdate() throws Exception { checkCmdOnDriver(cpr); cpr = driver.compileAndRespond("delete from T8 where b = 89"); checkCmdOnDriver(cpr); - txnMgr.openTxn("Fifer"); + txnMgr.openTxn(ctx, "Fifer"); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");//gets SS lock on T8 cpr = driver.compileAndRespond("select a from T8");//gets S lock on T8 checkCmdOnDriver(cpr); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - txnMgr2.openTxn("Fiddler"); + txnMgr2.openTxn(ctx, "Fiddler"); txnMgr2.acquireLocks(driver.getPlan(), ctx, "Fiddler"); checkCmdOnDriver(driver.compileAndRespond("update T8 set a = 1 where b = 1")); - ((DbTxnManager) txnMgr2).acquireLocks(driver.getPlan(), ctx, "Practical", false);//waits for SS lock on T8 from fifer + ((DbTxnManager) txnMgr2).acquireLocks(driver.getPlan(), ctx, "Practical", false, true);//waits for SS lock on T8 from fifer List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 3, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T8", null, locks); @@ -330,7 +330,7 @@ public void testLockBlockedBy() throws Exception { checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB_BLOCKED", null, locks); cpr = driver.compileAndRespond("drop table TAB_BLOCKED"); checkCmdOnDriver(cpr); - ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "SAM I AM", false);//make non-blocking + ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "SAM I AM", false, true);//make non-blocking locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB_BLOCKED", null, locks); @@ -592,7 +592,7 @@ public void checkExpectedLocks() throws Exception { cpr = driver.compileAndRespond("insert into nonAcidPart partition(p) values(1,2,3)"); checkCmdOnDriver(cpr); - LockState lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); + LockState lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false, true); List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "nonAcidPart", null, locks); @@ -602,7 +602,7 @@ public void checkExpectedLocks() throws Exception { cpr = driver.compileAndRespond("insert into nonAcidPart partition(p=1) values(5,6)"); checkCmdOnDriver(cpr); - lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); + lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false, true); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "nonAcidPart", "p=1", locks); @@ -612,7 +612,7 @@ public void checkExpectedLocks() throws Exception { cpr = driver.compileAndRespond("insert into acidPart partition(p) values(1,2,3)"); checkCmdOnDriver(cpr); - lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); + lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false, true); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "acidPart", null, locks); @@ -622,7 +622,7 @@ public void checkExpectedLocks() throws Exception { cpr = driver.compileAndRespond("insert into acidPart partition(p=1) values(5,6)"); checkCmdOnDriver(cpr); - lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); + lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false, true); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "acidPart", "p=1", locks); @@ -632,7 +632,7 @@ public void checkExpectedLocks() throws Exception { cpr = driver.compileAndRespond("update acidPart set b = 17 where a = 1"); checkCmdOnDriver(cpr); - lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); + lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false, true); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "acidPart", null, locks); @@ -642,7 +642,7 @@ public void checkExpectedLocks() throws Exception { cpr = driver.compileAndRespond("update acidPart set b = 17 where p = 1"); checkCmdOnDriver(cpr); - lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); + lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false, true); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "acidPart", null, locks);//https://issues.apache.org/jira/browse/HIVE-13212 @@ -662,7 +662,7 @@ public void checkExpectedLocks2() throws Exception { "clustered by (na) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='false')")); checkCmdOnDriver(driver.run("insert into tab_acid partition(p) (a,b,p) values(1,2,'foo'),(3,4,'bar')")); checkCmdOnDriver(driver.run("insert into tab_not_acid partition(np) (na,nb,np) values(1,2,'blah'),(3,4,'doh')")); - txnMgr.openTxn("T1"); + txnMgr.openTxn(ctx, "T1"); checkCmdOnDriver(driver.compileAndRespond("select * from tab_acid inner join tab_not_acid on a = na")); txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); List locks = getLocks(txnMgr, true); @@ -675,9 +675,9 @@ public void checkExpectedLocks2() throws Exception { checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "tab_not_acid", "np=doh", locks); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - txnMgr2.openTxn("T2"); + txnMgr2.openTxn(ctx, "T2"); checkCmdOnDriver(driver.compileAndRespond("insert into tab_not_acid partition(np='doh') values(5,6)")); - LockState ls = ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "T2", false); + LockState ls = ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "T2", false, true); locks = getLocks(txnMgr2, true); Assert.assertEquals("Unexpected lock count", 7, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "tab_acid", null, locks); @@ -899,12 +899,12 @@ public void testWriteSetTracking1() throws Exception { checkCmdOnDriver(cpr); checkCmdOnDriver(driver.compileAndRespond("select * from TAB_PART")); - txnMgr.openTxn("Nicholas"); + txnMgr.openTxn(ctx, "Nicholas"); checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); txnMgr.acquireLocks(driver.getPlan(), ctx, "Nicholas"); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); txnMgr.commitTxn(); - txnMgr2.openTxn("Alexandra"); + txnMgr2.openTxn(ctx, "Alexandra"); checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); txnMgr2.acquireLocks(driver.getPlan(), ctx, "Nicholas"); txnMgr2.commitTxn(); @@ -928,10 +928,10 @@ public void testWriteSetTracking2() throws Exception { checkCmdOnDriver(cpr); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - txnMgr.openTxn("Peter"); + txnMgr.openTxn(ctx, "Peter"); checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); txnMgr.acquireLocks(driver.getPlan(), ctx, "Peter"); - txnMgr2.openTxn("Catherine"); + txnMgr2.openTxn(ctx, "Catherine"); List locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 1, locks.size()); //note that "update" uses dynamic partitioning thus lock is on the table not partition @@ -953,15 +953,15 @@ public void testWriteSetTracking3() throws Exception { checkCmdOnDriver(driver.run("insert into TAB_PART partition(p='blah') values(1,2)")); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - long txnId = txnMgr.openTxn("Known"); - long txnId2 = txnMgr2.openTxn("Unknown"); + long txnId = txnMgr.openTxn(ctx, "Known"); + long txnId2 = txnMgr2.openTxn(ctx, "Unknown"); checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); txnMgr.acquireLocks(driver.getPlan(), ctx, "Known"); List locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", "p=blah", locks); checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); - ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Unknown", false); + ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Unknown", false, true); locks = getLocks(txnMgr2);//should not matter which txnMgr is used here Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", "p=blah", locks); @@ -1003,7 +1003,7 @@ public void testWriteSetTracking4() throws Exception { "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); - txnMgr.openTxn("Long Running"); + txnMgr.openTxn(ctx, "Long Running"); checkCmdOnDriver(driver.compileAndRespond("select a from TAB_PART where p = 'blah'")); txnMgr.acquireLocks(driver.getPlan(), ctx, "Long Running"); List locks = getLocks(txnMgr); @@ -1014,7 +1014,7 @@ public void testWriteSetTracking4() throws Exception { checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB_PART", null, locks); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - txnMgr2.openTxn("Short Running"); + txnMgr2.openTxn(ctx, "Short Running"); checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 7 where p = 'blah'"));//no such partition txnMgr2.acquireLocks(driver.getPlan(), ctx, "Short Running"); locks = getLocks(txnMgr); @@ -1031,7 +1031,7 @@ public void testWriteSetTracking4() throws Exception { //Short Running updated nothing, so we expect 0 rows in WRITE_SET Assert.assertEquals( 0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET")); - txnMgr2.openTxn("T3"); + txnMgr2.openTxn(ctx, "T3"); checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 7 where p = 'two'"));//pretend this partition exists txnMgr2.acquireLocks(driver.getPlan(), ctx, "T3"); locks = getLocks(txnMgr); @@ -1080,15 +1080,15 @@ public void testWriteSetTracking5() throws Exception { checkCmdOnDriver(driver.run("insert into TAB_PART partition(p='blah') values(1,2)")); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - txnMgr.openTxn("Known"); - long txnId = txnMgr2.openTxn("Unknown"); + txnMgr.openTxn(ctx, "Known"); + long txnId = txnMgr2.openTxn(ctx, "Unknown"); checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); txnMgr.acquireLocks(driver.getPlan(), ctx, "Known"); List locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", "p=blah", locks); checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); - ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Unknown", false); + ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Unknown", false, true); locks = getLocks(txnMgr2);//should not matter which txnMgr is used here Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", "p=blah", locks); @@ -1119,7 +1119,7 @@ public void testWriteSetTracking6() throws Exception { Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB2", null, locks); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - txnMgr2.openTxn("Horton"); + txnMgr2.openTxn(ctx, "Horton"); checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 17 where a = 101")); txnMgr2.acquireLocks(driver.getPlan(), ctx, "Horton"); Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET")); @@ -1151,7 +1151,7 @@ public void testWriteSetTracking7() throws Exception { HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); //test with predicates such that partition pruning works - txnMgr2.openTxn("T2"); + txnMgr2.openTxn(ctx, "T2"); checkCmdOnDriver(driver.compileAndRespond("update tab2 set b = 7 where p='two'")); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2"); List locks = getLocks(txnMgr2); @@ -1159,9 +1159,9 @@ public void testWriteSetTracking7() throws Exception { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=two", locks); //now start concurrent txn - txnMgr.openTxn("T3"); + txnMgr.openTxn(ctx, "T3"); checkCmdOnDriver(driver.compileAndRespond("update tab2 set b = 7 where p='one'")); - ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); + ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false, true); locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=two", locks); @@ -1199,7 +1199,7 @@ public void testWriteSetTracking7() throws Exception { "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:4 - txnMgr2.openTxn("T5"); + txnMgr2.openTxn(ctx, "T5"); checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1")); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T5"); locks = getLocks(txnMgr2); @@ -1208,9 +1208,9 @@ public void testWriteSetTracking7() throws Exception { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); //now start concurrent txn - txnMgr.openTxn("T6"); + txnMgr.openTxn(ctx, "T6"); checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b = 2")); - ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T6", false); + ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T6", false, true); locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 4, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks); @@ -1256,7 +1256,7 @@ public void testWriteSetTracking8() throws Exception { checkCmdOnDriver(cpr); checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1 HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - txnMgr2.openTxn("T2"); + txnMgr2.openTxn(ctx, "T2"); checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1")); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2"); List locks = getLocks(txnMgr2); @@ -1265,9 +1265,9 @@ public void testWriteSetTracking8() throws Exception { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); //now start concurrent txn - txnMgr.openTxn("T3"); + txnMgr.openTxn(ctx, "T3"); checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where p='two'")); - ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); + ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false, true); locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 3, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks); @@ -1310,7 +1310,7 @@ public void testWriteSetTracking9() throws Exception { checkCmdOnDriver(cpr); checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1 HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - txnMgr2.openTxn("T2"); + txnMgr2.openTxn(ctx, "T2"); checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1")); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2"); List locks = getLocks(txnMgr2); @@ -1319,9 +1319,9 @@ public void testWriteSetTracking9() throws Exception { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); //now start concurrent txn - txnMgr.openTxn("T3"); + txnMgr.openTxn(ctx, "T3"); checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2")); - ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); + ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false, true); locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 3, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks); @@ -1370,7 +1370,7 @@ public void testWriteSetTracking10() throws Exception { checkCmdOnDriver(cpr); checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1 HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - txnMgr2.openTxn("T2"); + txnMgr2.openTxn(ctx, "T2"); checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=2")); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2"); List locks = getLocks(txnMgr2); @@ -1379,9 +1379,9 @@ public void testWriteSetTracking10() throws Exception { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); //now start concurrent txn - txnMgr.openTxn("T3"); + txnMgr.openTxn(ctx, "T3"); checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2")); - ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); + ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false, true); locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 3, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks); @@ -1432,7 +1432,7 @@ public void testWriteSetTracking11() throws Exception { checkCmdOnDriver(cpr); checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1 HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - txnMgr2.openTxn("T2"); + txnMgr2.openTxn(ctx, "T2"); checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where b=2")); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2"); List locks = getLocks(txnMgr2); @@ -1441,11 +1441,11 @@ public void testWriteSetTracking11() throws Exception { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); //now start concurrent txn - txnMgr.openTxn("T3"); + txnMgr.openTxn(ctx, "T3"); checkCmdOnDriver(driver.compileAndRespond("select * from tab1 where b=1 and p='one'")); - ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); + ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false, true); checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2")); - ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); + ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false, true); locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 5, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB1", null, locks); @@ -1598,7 +1598,7 @@ private void testMerge3Way(boolean cc) throws Exception { "(9,100,1,2), (3,4,1,2), (5,13,1,3), (7,8,2,2), (14,15,2,1)")); - long txnId1 = txnMgr.openTxn("T1"); + long txnId1 = txnMgr.openTxn(ctx, "T1"); checkCmdOnDriver(driver.compileAndRespond("merge into target t using source s on t.a=s.b " + "when matched and t.a=5 then update set b=s.b " + //updates p=1/q=3 "when matched and t.a in (3,7) then delete " + //deletes from p=1/q=2, p=2/q=2 @@ -1614,12 +1614,12 @@ private void testMerge3Way(boolean cc) throws Exception { //start concurrent txn DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - long txnId2 = txnMgr2.openTxn("T2"); + long txnId2 = txnMgr2.openTxn(ctx, "T2"); checkCmdOnDriver(driver.compileAndRespond("merge into target t using source2 s on t.a=s.b " + "when matched and t.a=" + (cc ? 5 : 9) + " then update set b=s.b " + //if conflict updates p=1/q=3 else update p=1/q=2 "when matched and t.a in (3,7) then delete " + //deletes from p=1/q=2, p=2/q=2 "when not matched and t.a >= 8 then insert values(s.a, s.b, s.p, s.q)"));//insert p=1/q=2, p=1/q=3 and new part 1/1 - txnMgr2.acquireLocks(driver.getPlan(), ctx, "T1", false); + txnMgr2.acquireLocks(driver.getPlan(), ctx, "T1", false, true); locks = getLocks(txnMgr2, true); Assert.assertEquals("Unexpected lock count", 10, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks); @@ -1806,7 +1806,7 @@ private void testMergeUnpartitioned(boolean causeConflict) throws Exception { checkCmdOnDriver(driver.run("insert into target values (1,2), (3,4), (5,6), (7,8)")); checkCmdOnDriver(driver.run("create table source (a int, b int)")); - long txnid1 = txnMgr.openTxn("T1"); + long txnid1 = txnMgr.openTxn(ctx, "T1"); if(causeConflict) { checkCmdOnDriver(driver.compileAndRespond("update target set b = 2 where a=1")); } @@ -1827,12 +1827,12 @@ private void testMergeUnpartitioned(boolean causeConflict) throws Exception { DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); //start a 2nd (overlapping) txn - long txnid2 = txnMgr2.openTxn("T2"); + long txnid2 = txnMgr2.openTxn(ctx, "T2"); checkCmdOnDriver(driver.compileAndRespond("merge into target t using source s " + "on t.a=s.a " + "when matched then delete " + "when not matched then insert values(s.a,s.b)")); - txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2", false); + txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2", false, true); locks = getLocks(txnMgr, true); Assert.assertEquals("Unexpected lock count", 3, locks.size()); @@ -1903,7 +1903,7 @@ public void testDynamicPartitionInsert() throws Exception { checkCmdOnDriver(driver.run("create table target (a int, b int) " + "partitioned by (p int, q int) clustered by (a) into 2 buckets " + "stored as orc TBLPROPERTIES ('transactional'='true')")); - long txnid1 = txnMgr.openTxn("T1"); + long txnid1 = txnMgr.openTxn(ctx, "T1"); checkCmdOnDriver(driver.compileAndRespond("insert into target partition(p=1,q) values (1,2,2), (3,4,2), (5,6,3), (7,8,2)")); txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); List locks = getLocks(txnMgr, true); @@ -1935,7 +1935,7 @@ public void testDynamicPartitionInsert() throws Exception { TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + (txnid1 + 1))); - long txnid2 = txnMgr.openTxn("T1"); + long txnid2 = txnMgr.openTxn(ctx, "T1"); checkCmdOnDriver(driver.compileAndRespond("insert into target partition(p=1,q) values (10,2,2), (30,4,2), (50,6,3), (70,8,2)")); txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); locks = getLocks(txnMgr, true); @@ -1974,7 +1974,7 @@ private void testMergePartitioned(boolean causeConflict) throws Exception { checkCmdOnDriver(driver.run("insert into target partition(p,q) values (1,2,1,2), (3,4,1,2), (5,6,1,3), (7,8,2,2)")); checkCmdOnDriver(driver.run("create table source (a1 int, b1 int, p1 int, q1 int)")); - long txnId1 = txnMgr.openTxn("T1"); + long txnId1 = txnMgr.openTxn(ctx, "T1"); checkCmdOnDriver(driver.compileAndRespond("update target set b = 2 where p=1")); txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); List locks = getLocks(txnMgr, true); @@ -1984,12 +1984,12 @@ private void testMergePartitioned(boolean causeConflict) throws Exception { DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); //start a 2nd (overlapping) txn - long txnid2 = txnMgr2.openTxn("T2"); + long txnid2 = txnMgr2.openTxn(ctx, "T2"); checkCmdOnDriver(driver.compileAndRespond("merge into target using source " + "on target.p=source.p1 and target.a=source.a1 " + "when matched then update set b = 11 " + "when not matched then insert values(a1,b1,p1,q1)")); - txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2", false); + txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2", false, true); locks = getLocks(txnMgr, true); Assert.assertEquals("Unexpected lock count", 7, locks.size()); /**