diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index d789ed0588..832f0d1f89 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -196,6 +196,10 @@ private HiveTxnManager queryTxnMgr; private RuntimeStatsSource runtimeStatsSource; + // Boolean to store information about whether valid txn list was generated + // for current query. + private boolean validTxnListsGenerated; + private CacheUsage cacheUsage; private CacheEntry usedCacheEntry; @@ -613,6 +617,9 @@ public void run() { tree = hookRunner.runPreAnalyzeHooks(hookCtx, tree); sem = SemanticAnalyzerFactory.get(queryState, tree); openTransaction(); + // TODO: Lock acquisition should be moved before this method call + // when we want to implement lock-based concurrency control + generateValidTxnList(); sem.analyze(tree, ctx); hookCtx.update(sem); @@ -620,6 +627,9 @@ public void run() { } else { sem = SemanticAnalyzerFactory.get(queryState, tree); openTransaction(); + // TODO: Lock acquisition should be moved before this method call + // when we want to implement lock-based concurrency control + generateValidTxnList(); sem.analyze(tree, ctx); } LOG.info("Semantic Analysis Completed"); @@ -769,6 +779,24 @@ private void openTransaction() throws LockException, CommandProcessorResponse { } } + private void generateValidTxnList() throws LockException { + // Record current valid txn list that will be used throughout the query + // compilation and processing. We only do this if 1) a transaction + // was already opened and 2) the list has not been recorded yet, + // e.g., by an explicit open transaction command. + validTxnListsGenerated = false; + String currentTxnString = conf.get(ValidTxnList.VALID_TXNS_KEY); + if (queryTxnMgr.isTxnOpen() && (currentTxnString == null || currentTxnString.isEmpty())) { + try { + recordValidTxns(queryTxnMgr); + validTxnListsGenerated = true; + } catch (LockException e) { + LOG.error("Exception while acquiring valid txn list", e); + throw e; + } + } + } + private boolean startImplicitTxn(HiveTxnManager txnManager) throws LockException { boolean shouldOpenImplicitTxn = !ctx.isExplainPlan(); //this is dumb. HiveOperation is not always set. see HIVE-16447/HIVE-16443 @@ -1360,8 +1388,10 @@ private void acquireLocks() throws CommandProcessorResponse { /*It's imperative that {@code acquireLocks()} is called for all commands so that HiveTxnManager can transition its state machine correctly*/ queryTxnMgr.acquireLocks(plan, ctx, userFromUGI, lDrvState); - if (queryTxnMgr.recordSnapshot(plan)) { - recordValidTxns(queryTxnMgr); + // This check is for controlling the correctness of the current state + if (queryTxnMgr.recordSnapshot(plan) && !validTxnListsGenerated) { + throw new IllegalStateException("calling recordValidTxn() more than once in the same " + + JavaUtils.txnIdToString(queryTxnMgr.getCurrentTxnId())); } if (plan.hasAcidResourcesInQuery()) { recordValidWriteIds(queryTxnMgr); @@ -1519,11 +1549,21 @@ public CommandProcessorResponse run(String command, boolean alreadyCompiled) { @Override public CommandProcessorResponse compileAndRespond(String command) { + return compileAndRespond(command, false); + } + + public CommandProcessorResponse compileAndRespond(String command, boolean cleanupTxnList) { try { compileInternal(command, false); return createProcessorResponse(0); } catch (CommandProcessorResponse e) { return e; + } finally { + if (cleanupTxnList) { + // Valid txn list might be generated for a query compiled using this + // command, thus we need to reset it + conf.unset(ValidTxnList.VALID_TXNS_KEY); + } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java index ba0d56ae37..1f533fcdd0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java @@ -249,7 +249,7 @@ public PlanFragment createPlanFragment(String query, int num, ApplicationId spli DriverCleanup driverCleanup = new DriverCleanup(driver, txnManager, splitsAppId.toString()); boolean needsCleanup = true; try { - CommandProcessorResponse cpr = driver.compileAndRespond(query); + CommandProcessorResponse cpr = driver.compileAndRespond(query, true); if (cpr.getResponseCode() != 0) { throw new HiveException("Failed to compile query: " + cpr.getException()); } @@ -280,7 +280,7 @@ public PlanFragment createPlanFragment(String query, int num, ApplicationId spli HiveConf.setVar(conf, ConfVars.HIVE_EXECUTION_MODE, "llap"); query = "select * from " + tableName; - cpr = driver.compileAndRespond(query); + cpr = driver.compileAndRespond(query, true); if(cpr.getResponseCode() != 0) { throw new HiveException("Failed to create temp table: "+cpr.getException()); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index d9a5feb386..abd2cb8f5b 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -125,7 +125,7 @@ public void testMetadataOperationLocks() throws Exception { conf.setBoolVar(HiveConf.ConfVars.HIVE_TXN_STRICT_LOCKING_MODE, false); dropTable(new String[] {"T"}); checkCmdOnDriver(driver.run("create table if not exists T (a int, b int)")); - checkCmdOnDriver(driver.compileAndRespond("insert into T values (1,2)")); + checkCmdOnDriver(driver.compileAndRespond("insert into T values (1,2)", true)); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -136,7 +136,7 @@ public void testMetadataOperationLocks() throws Exception { //simulate concurrent session HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); - checkCmdOnDriver(driver.compileAndRespond("alter table T SET TBLPROPERTIES ('transactional'='true')")); + checkCmdOnDriver(driver.compileAndRespond("alter table T SET TBLPROPERTIES ('transactional'='true')", true)); ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Fiddler", false); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 3, locks.size()); @@ -154,7 +154,7 @@ public void testLocksInSubquery() throws Exception { checkCmdOnDriver(driver.run("create table if not exists S (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')")); checkCmdOnDriver(driver.run("create table if not exists R (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')")); - checkCmdOnDriver(driver.compileAndRespond("delete from S where a in (select a from T where b = 1)")); + checkCmdOnDriver(driver.compileAndRespond("delete from S where a in (select a from T where b = 1)", true)); txnMgr.acquireLocks(driver.getPlan(), ctx, "one"); List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -162,7 +162,7 @@ public void testLocksInSubquery() throws Exception { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "S", null, locks); txnMgr.rollbackTxn(); - checkCmdOnDriver(driver.compileAndRespond("update S set a = 7 where a in (select a from T where b = 1)")); + checkCmdOnDriver(driver.compileAndRespond("update S set a = 7 where a in (select a from T where b = 1)", true)); txnMgr.acquireLocks(driver.getPlan(), ctx, "one"); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -170,7 +170,7 @@ public void testLocksInSubquery() throws Exception { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "S", null, locks); txnMgr.rollbackTxn(); - checkCmdOnDriver(driver.compileAndRespond("insert into R select * from S where a in (select a from T where b = 1)")); + checkCmdOnDriver(driver.compileAndRespond("insert into R select * from S where a in (select a from T where b = 1)", true)); txnMgr.acquireLocks(driver.getPlan(), ctx, "three"); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 3, locks.size()); @@ -182,7 +182,7 @@ public void testLocksInSubquery() throws Exception { @Test public void createTable() throws Exception { dropTable(new String[] {"T"}); - CommandProcessorResponse cpr = driver.compileAndRespond("create table if not exists T (a int, b int)"); + CommandProcessorResponse cpr = driver.compileAndRespond("create table if not exists T (a int, b int)", true); checkCmdOnDriver(cpr); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); List locks = getLocks(); @@ -198,7 +198,7 @@ public void insertOverwriteCreate() throws Exception { checkCmdOnDriver(cpr); cpr = driver.run("create table if not exists T3(a int)"); checkCmdOnDriver(cpr); - cpr = driver.compileAndRespond("insert overwrite table T3 select a from T2"); + cpr = driver.compileAndRespond("insert overwrite table T3 select a from T2", true); checkCmdOnDriver(cpr); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); List locks = getLocks(); @@ -219,7 +219,7 @@ public void insertOverwritePartitionedCreate() throws Exception { checkCmdOnDriver(cpr); cpr = driver.run("create table if not exists T5(name string, age int, gpa double)"); checkCmdOnDriver(cpr); - cpr = driver.compileAndRespond("INSERT OVERWRITE TABLE T4 PARTITION (age) SELECT name, age, gpa FROM T5"); + cpr = driver.compileAndRespond("INSERT OVERWRITE TABLE T4 PARTITION (age) SELECT name, age, gpa FROM T5", true); checkCmdOnDriver(cpr); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); List locks = getLocks(); @@ -238,13 +238,13 @@ public void basicBlocking() throws Exception { dropTable(new String[] {"T6"}); CommandProcessorResponse cpr = driver.run("create table if not exists T6(a int)"); checkCmdOnDriver(cpr); - cpr = driver.compileAndRespond("select a from T6"); + cpr = driver.compileAndRespond("select a from T6", true); checkCmdOnDriver(cpr); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");//gets S lock on T6 List selectLocks = ctx.getHiveLocks(); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); - cpr = driver.compileAndRespond("drop table if exists T6"); + cpr = driver.compileAndRespond("drop table if exists T6", true); checkCmdOnDriver(cpr); //tries to get X lock on T1 and gets Waiting state LockState lockState = ((DbTxnManager) txnMgr2).acquireLocks(driver.getPlan(), ctx, "Fiddler", false); @@ -271,12 +271,12 @@ public void lockConflictDbTable() throws Exception { checkCmdOnDriver(cpr); cpr = driver.run("create table if not exists temp.T7(a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); - cpr = driver.compileAndRespond("update temp.T7 set a = 5 where b = 6");//gets SS lock on T7 + cpr = driver.compileAndRespond("update temp.T7 set a = 5 where b = 6", true);//gets SS lock on T7 checkCmdOnDriver(cpr); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); - checkCmdOnDriver(driver.compileAndRespond("drop database if exists temp")); + checkCmdOnDriver(driver.compileAndRespond("drop database if exists temp", true)); ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Fiddler", false); List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -294,16 +294,16 @@ public void updateSelectUpdate() throws Exception { dropTable(new String[] {"T8"}); CommandProcessorResponse cpr = driver.run("create table T8(a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); - cpr = driver.compileAndRespond("delete from T8 where b = 89"); + cpr = driver.compileAndRespond("delete from T8 where b = 89", true); checkCmdOnDriver(cpr); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");//gets SS lock on T8 HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); checkCmdOnDriver(driver.run("start transaction")); - cpr = driver.compileAndRespond("select a from T8");//gets S lock on T8 + cpr = driver.compileAndRespond("select a from T8", true);//gets S lock on T8 checkCmdOnDriver(cpr); txnMgr2.acquireLocks(driver.getPlan(), ctx, "Fiddler"); - checkCmdOnDriver(driver.compileAndRespond("update T8 set a = 1 where b = 1")); + checkCmdOnDriver(driver.compileAndRespond("update T8 set a = 1 where b = 1", true)); ((DbTxnManager) txnMgr2).acquireLocks(driver.getPlan(), ctx, "Practical", false);//waits for SS lock on T8 from fifer List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 3, locks.size()); @@ -331,7 +331,7 @@ public void testLockRetryLimit() throws Exception { conf.setBoolVar(HiveConf.ConfVars.TXN_MGR_DUMP_LOCK_STATE_ON_ACQUIRE_TIMEOUT, true); CommandProcessorResponse cpr = driver.run("create table T9(a int)"); checkCmdOnDriver(cpr); - cpr = driver.compileAndRespond("select * from T9"); + cpr = driver.compileAndRespond("select * from T9", true); checkCmdOnDriver(cpr); txnMgr.acquireLocks(driver.getPlan(), ctx, "Vincent Vega"); List locks = getLocks(); @@ -339,7 +339,7 @@ public void testLockRetryLimit() throws Exception { checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T9", null, locks); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); - cpr = driver.compileAndRespond("drop table T9"); + cpr = driver.compileAndRespond("drop table T9", true); checkCmdOnDriver(cpr); try { txnMgr2.acquireLocks(driver.getPlan(), ctx, "Winston Winnfield"); @@ -365,7 +365,7 @@ public void testLockBlockedBy() throws Exception { dropTable(new String[] {"TAB_BLOCKED"}); CommandProcessorResponse cpr = driver.run("create table TAB_BLOCKED (a int, b int) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); - cpr = driver.compileAndRespond("select * from TAB_BLOCKED"); + cpr = driver.compileAndRespond("select * from TAB_BLOCKED", true); checkCmdOnDriver(cpr); txnMgr.acquireLocks(driver.getPlan(), ctx, "I AM SAM"); List locks = getLocks(); @@ -373,7 +373,7 @@ public void testLockBlockedBy() throws Exception { checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB_BLOCKED", null, locks); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); - cpr = driver.compileAndRespond("drop table TAB_BLOCKED"); + cpr = driver.compileAndRespond("drop table TAB_BLOCKED", true); checkCmdOnDriver(cpr); ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "SAM I AM", false);//make non-blocking locks = getLocks(); @@ -395,22 +395,22 @@ public void testDummyTxnManagerOnAcidTable() throws Exception { // All DML should fail with DummyTxnManager on ACID table useDummyTxnManagerTemporarily(conf); - cpr = driver.compileAndRespond("select * from T10"); + cpr = driver.compileAndRespond("select * from T10", true); Assert.assertEquals(ErrorMsg.TXNMGR_NOT_ACID.getErrorCode(), cpr.getResponseCode()); Assert.assertTrue(cpr.getErrorMessage().contains("This command is not allowed on an ACID table")); useDummyTxnManagerTemporarily(conf); - cpr = driver.compileAndRespond("insert into table T10 values (1, 2)"); + cpr = driver.compileAndRespond("insert into table T10 values (1, 2)", true); Assert.assertEquals(ErrorMsg.TXNMGR_NOT_ACID.getErrorCode(), cpr.getResponseCode()); Assert.assertTrue(cpr.getErrorMessage().contains("This command is not allowed on an ACID table")); useDummyTxnManagerTemporarily(conf); - cpr = driver.compileAndRespond("update T10 set a=0 where b=1"); + cpr = driver.compileAndRespond("update T10 set a=0 where b=1", true); Assert.assertEquals(ErrorMsg.ACID_OP_ON_NONACID_TXNMGR.getErrorCode(), cpr.getResponseCode()); Assert.assertTrue(cpr.getErrorMessage().contains("Attempt to do update or delete using transaction manager that does not support these operations.")); useDummyTxnManagerTemporarily(conf); - cpr = driver.compileAndRespond("delete from T10"); + cpr = driver.compileAndRespond("delete from T10", true); Assert.assertEquals(ErrorMsg.ACID_OP_ON_NONACID_TXNMGR.getErrorCode(), cpr.getResponseCode()); Assert.assertTrue(cpr.getErrorMessage().contains("Attempt to do update or delete using transaction manager that does not support these operations.")); @@ -631,7 +631,7 @@ public void checkExpectedLocks() throws Exception { cpr = driver.run("create table nonAcidPart(a int, b int) partitioned by (p string) stored as orc TBLPROPERTIES ('transactional'='false')"); checkCmdOnDriver(cpr); - cpr = driver.compileAndRespond("insert into nonAcidPart partition(p) values(1,2,3)"); + cpr = driver.compileAndRespond("insert into nonAcidPart partition(p) values(1,2,3)", true); checkCmdOnDriver(cpr); LockState lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); List locks = getLocks(); @@ -640,7 +640,7 @@ public void checkExpectedLocks() throws Exception { checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "_dummy_database", "_dummy_table", null, locks); txnMgr.rollbackTxn();; - cpr = driver.compileAndRespond("insert into nonAcidPart partition(p=1) values(5,6)"); + cpr = driver.compileAndRespond("insert into nonAcidPart partition(p=1) values(5,6)", true); checkCmdOnDriver(cpr); lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); locks = getLocks(); @@ -649,7 +649,7 @@ public void checkExpectedLocks() throws Exception { checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "_dummy_database", "_dummy_table", null, locks); txnMgr.rollbackTxn(); - cpr = driver.compileAndRespond("insert into acidPart partition(p) values(1,2,3)"); + cpr = driver.compileAndRespond("insert into acidPart partition(p) values(1,2,3)", true); checkCmdOnDriver(cpr); lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); locks = getLocks(); @@ -658,7 +658,7 @@ public void checkExpectedLocks() throws Exception { checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "_dummy_database", "_dummy_table", null, locks); txnMgr.rollbackTxn(); - cpr = driver.compileAndRespond("insert into acidPart partition(p=1) values(5,6)"); + cpr = driver.compileAndRespond("insert into acidPart partition(p=1) values(5,6)", true); checkCmdOnDriver(cpr); lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); locks = getLocks(); @@ -667,7 +667,7 @@ public void checkExpectedLocks() throws Exception { checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "_dummy_database", "_dummy_table", null, locks); txnMgr.rollbackTxn(); - cpr = driver.compileAndRespond("update acidPart set b = 17 where a = 1"); + cpr = driver.compileAndRespond("update acidPart set b = 17 where a = 1", true); checkCmdOnDriver(cpr); lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); locks = getLocks(); @@ -675,7 +675,7 @@ public void checkExpectedLocks() throws Exception { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "acidPart", null, locks); txnMgr.rollbackTxn(); - cpr = driver.compileAndRespond("update acidPart set b = 17 where p = 1"); + cpr = driver.compileAndRespond("update acidPart set b = 17 where p = 1", true); checkCmdOnDriver(cpr); lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); locks = getLocks(); @@ -696,7 +696,7 @@ public void checkExpectedLocks2() throws Exception { checkCmdOnDriver(driver.run("insert into tab_acid partition(p) (a,b,p) values(1,2,'foo'),(3,4,'bar')")); checkCmdOnDriver(driver.run("insert into tab_not_acid partition(np) (na,nb,np) values(1,2,'blah'),(3,4,'doh')")); txnMgr.openTxn(ctx, "T1"); - checkCmdOnDriver(driver.compileAndRespond("select * from tab_acid inner join tab_not_acid on a = na")); + checkCmdOnDriver(driver.compileAndRespond("select * from tab_acid inner join tab_not_acid on a = na", true)); txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); List locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 6, locks.size()); @@ -709,7 +709,7 @@ public void checkExpectedLocks2() throws Exception { HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); txnMgr2.openTxn(ctx, "T2"); - checkCmdOnDriver(driver.compileAndRespond("insert into tab_not_acid partition(np='doh') values(5,6)")); + checkCmdOnDriver(driver.compileAndRespond("insert into tab_not_acid partition(np='doh') values(5,6)", true)); LockState ls = ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "T2", false); locks = getLocks(txnMgr2); Assert.assertEquals("Unexpected lock count", 8, locks.size()); @@ -727,7 +727,7 @@ public void checkExpectedLocks2() throws Exception { conf.setBoolVar(HiveConf.ConfVars.HIVE_TXN_STRICT_LOCKING_MODE, false); HiveTxnManager txnMgr3 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); txnMgr3.openTxn(ctx, "T3"); - checkCmdOnDriver(driver.compileAndRespond("insert into tab_not_acid partition(np='blah') values(7,8)")); + checkCmdOnDriver(driver.compileAndRespond("insert into tab_not_acid partition(np='blah') values(7,8)", true)); ((DbTxnManager)txnMgr3).acquireLocks(driver.getPlan(), ctx, "T3", false); locks = getLocks(txnMgr3); Assert.assertEquals("Unexpected lock count", 10, locks.size()); @@ -803,31 +803,31 @@ public void testShowLocksFilterOptions() throws Exception { HiveTxnManager txnMgr1 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr1); - cpr = driver.compileAndRespond("insert into table db1.t14 partition (ds='today') values (1, 2)"); + cpr = driver.compileAndRespond("insert into table db1.t14 partition (ds='today') values (1, 2)", true); checkCmdOnDriver(cpr); txnMgr1.acquireLocks(driver.getPlan(), ctx, "Tom"); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); - cpr = driver.compileAndRespond("insert into table db1.t14 partition (ds='tomorrow') values (3, 4)"); + cpr = driver.compileAndRespond("insert into table db1.t14 partition (ds='tomorrow') values (3, 4)", true); checkCmdOnDriver(cpr); txnMgr2.acquireLocks(driver.getPlan(), ctx, "Jerry"); HiveTxnManager txnMgr3 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr3); - cpr = driver.compileAndRespond("select * from db2.t15"); + cpr = driver.compileAndRespond("select * from db2.t15", true); checkCmdOnDriver(cpr); txnMgr3.acquireLocks(driver.getPlan(), ctx, "Donald"); HiveTxnManager txnMgr4 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr4); - cpr = driver.compileAndRespond("select * from db2.t16"); + cpr = driver.compileAndRespond("select * from db2.t16", true); checkCmdOnDriver(cpr); txnMgr4.acquireLocks(driver.getPlan(), ctx, "Hillary"); HiveTxnManager txnMgr5 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr5); - cpr = driver.compileAndRespond("select * from db2.t14"); + cpr = driver.compileAndRespond("select * from db2.t14", true); checkCmdOnDriver(cpr); txnMgr5.acquireLocks(driver.getPlan(), ctx, "Obama"); @@ -899,13 +899,13 @@ public void testWriteSetTracking1() throws Exception { "partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); - checkCmdOnDriver(driver.compileAndRespond("select * from TAB_PART")); + checkCmdOnDriver(driver.compileAndRespond("select * from TAB_PART", true)); txnMgr.acquireLocks(driver.getPlan(), ctx, "Nicholas"); txnMgr.commitTxn(); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); - checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); - checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); + checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'", true)); + checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'", true)); txnMgr2.acquireLocks(driver.getPlan(), ctx, "Alexandra"); txnMgr2.commitTxn(); } @@ -929,7 +929,7 @@ public void testWriteSetTracking2() throws Exception { HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); txnMgr.openTxn(ctx, "Peter"); - checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); + checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'", true)); txnMgr.acquireLocks(driver.getPlan(), ctx, "Peter"); txnMgr2.openTxn(ctx, "Catherine"); List locks = getLocks(txnMgr); @@ -937,7 +937,7 @@ public void testWriteSetTracking2() throws Exception { //note that "update" uses dynamic partitioning thus lock is on the table not partition checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks); txnMgr.commitTxn(); - checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 9 where p = 'doh'")); + checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 9 where p = 'doh'", true)); txnMgr2.acquireLocks(driver.getPlan(), ctx, "Catherine"); txnMgr2.commitTxn(); } @@ -952,7 +952,7 @@ public void testWriteSetTracking3() throws Exception { checkCmdOnDriver(cpr); checkCmdOnDriver(driver.run("insert into TAB_PART partition(p='blah') values(1,2)")); - checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); + checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'", true)); long txnId = txnMgr.getCurrentTxnId(); txnMgr.acquireLocks(driver.getPlan(), ctx, "Known"); List locks = getLocks(txnMgr); @@ -960,7 +960,7 @@ public void testWriteSetTracking3() throws Exception { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", "p=blah", locks); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); - checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); + checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'", true)); long txnId2 = txnMgr2.getCurrentTxnId(); ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Unknown", false); locks = getLocks(txnMgr2);//should not matter which txnMgr is used here @@ -1008,7 +1008,7 @@ public void testWriteSetTracking4() throws Exception { checkCmdOnDriver(cpr); txnMgr.openTxn(ctx, "Long Running"); - checkCmdOnDriver(driver.compileAndRespond("select a from TAB_PART where p = 'blah'")); + checkCmdOnDriver(driver.compileAndRespond("select a from TAB_PART where p = 'blah'", true)); txnMgr.acquireLocks(driver.getPlan(), ctx, "Long Running"); List locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 1, locks.size()); @@ -1019,7 +1019,7 @@ public void testWriteSetTracking4() throws Exception { HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); txnMgr2.openTxn(ctx, "Short Running"); - checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 7 where p = 'blah'"));//no such partition + checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 7 where p = 'blah'", true));//no such partition txnMgr2.acquireLocks(driver.getPlan(), ctx, "Short Running"); locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -1042,7 +1042,7 @@ public void testWriteSetTracking4() throws Exception { Assert.assertEquals( 0, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET")); txnMgr2.openTxn(ctx, "T3"); - checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 7 where p = 'two'"));//pretend this partition exists + checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 7 where p = 'two'", true));//pretend this partition exists txnMgr2.acquireLocks(driver.getPlan(), ctx, "T3"); locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -1068,7 +1068,7 @@ public void testWriteSetTracking4() throws Exception { houseKeeper.run(); //since T3 overlaps with Long Running (still open) GC does nothing Assert.assertEquals(1, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET")); - checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 17 where a = 1"));//no rows match + checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 17 where a = 1", true));//no rows match txnMgr.acquireLocks(driver.getPlan(), ctx, "Long Running"); writeIds = txnHandler.allocateTableWriteIds(new AllocateTableWriteIdsRequest(Collections.singletonList(txnMgr.getCurrentTxnId()), @@ -1103,12 +1103,12 @@ public void testWriteSetTracking5() throws Exception { txnMgr.openTxn(ctx, "Known"); long txnId = txnMgr2.openTxn(ctx, "Unknown"); - checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); + checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'", true)); txnMgr.acquireLocks(driver.getPlan(), ctx, "Known"); List locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", "p=blah", locks); - checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); + checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'", true)); ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Unknown", false); locks = getLocks(txnMgr2);//should not matter which txnMgr is used here Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -1139,14 +1139,14 @@ public void testWriteSetTracking6() throws Exception { CommandProcessorResponse cpr = driver.run("create table if not exists TAB2(a int, b int) clustered " + "by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); - checkCmdOnDriver(driver.compileAndRespond("select * from TAB2 where a = 113")); + checkCmdOnDriver(driver.compileAndRespond("select * from TAB2 where a = 113", true)); txnMgr.acquireLocks(driver.getPlan(), ctx, "Works"); List locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB2", null, locks); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); - checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 17 where a = 101")); + checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 17 where a = 101", true)); txnMgr2.acquireLocks(driver.getPlan(), ctx, "Horton"); Assert.assertEquals(0, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET")); locks = getLocks(txnMgr); @@ -1180,7 +1180,7 @@ public void testWriteSetTracking7() throws Exception { HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); //test with predicates such that partition pruning works - checkCmdOnDriver(driver.compileAndRespond("update tab2 set b = 7 where p='two'")); + checkCmdOnDriver(driver.compileAndRespond("update tab2 set b = 7 where p='two'", true)); long idTxnUpdate1 = txnMgr2.getCurrentTxnId(); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2"); List locks = getLocks(txnMgr2); @@ -1189,7 +1189,7 @@ public void testWriteSetTracking7() throws Exception { //now start concurrent txn swapTxnManager(txnMgr); - checkCmdOnDriver(driver.compileAndRespond("update tab2 set b = 7 where p='one'")); + checkCmdOnDriver(driver.compileAndRespond("update tab2 set b = 7 where p='one'", true)); long idTxnUpdate2 = txnMgr.getCurrentTxnId(); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); locks = getLocks(txnMgr); @@ -1231,7 +1231,7 @@ public void testWriteSetTracking7() throws Exception { checkCmdOnDriver(cpr); checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:4 swapTxnManager(txnMgr2); - checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1")); + checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1", true)); long idTxnUpdate3 = txnMgr2.getCurrentTxnId(); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T5"); locks = getLocks(txnMgr2); @@ -1241,7 +1241,7 @@ public void testWriteSetTracking7() throws Exception { //now start concurrent txn swapTxnManager(txnMgr); - checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b = 2")); + checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b = 2", true)); long idTxnUpdate4 = txnMgr.getCurrentTxnId(); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T6", false); locks = getLocks(txnMgr); @@ -1292,7 +1292,7 @@ public void testWriteSetTracking8() throws Exception { checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')")); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); - checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1")); + checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1", true)); long idTxnUpdate1 = txnMgr2.getCurrentTxnId(); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2"); List locks = getLocks(txnMgr2); @@ -1302,7 +1302,7 @@ public void testWriteSetTracking8() throws Exception { //now start concurrent txn swapTxnManager(txnMgr); - checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where p='two'")); + checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where p='two'", true)); long idTxnUpdate2 = txnMgr.getCurrentTxnId(); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); locks = getLocks(txnMgr); @@ -1350,7 +1350,7 @@ public void testWriteSetTracking9() throws Exception { checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')")); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); - checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1")); + checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1", true)); long idTxnUpdate1 = txnMgr2.getCurrentTxnId(); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2"); List locks = getLocks(txnMgr2); @@ -1360,7 +1360,7 @@ public void testWriteSetTracking9() throws Exception { //now start concurrent txn swapTxnManager(txnMgr); - checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2")); + checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2", true)); long idTxnDelete1 = txnMgr.getCurrentTxnId(); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); locks = getLocks(txnMgr); @@ -1414,7 +1414,7 @@ public void testWriteSetTracking10() throws Exception { checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1 HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); - checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=2")); + checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=2", true)); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2"); List locks = getLocks(txnMgr2); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -1423,7 +1423,7 @@ public void testWriteSetTracking10() throws Exception { //now start concurrent txn swapTxnManager(txnMgr); - checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2")); + checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2", true)); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 3, locks.size()); @@ -1478,7 +1478,7 @@ public void testWriteSetTracking11() throws Exception { checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')")); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); - checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where b=2"));//start "delete from tab1" txn + checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where b=2", true));//start "delete from tab1" txn long txnIdDelete = txnMgr2.getCurrentTxnId(); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2"); List locks = getLocks(txnMgr2); @@ -1489,10 +1489,10 @@ public void testWriteSetTracking11() throws Exception { //now start concurrent "select * from tab1" txn swapTxnManager(txnMgr); checkCmdOnDriver(driver.run("start transaction"));//start explicit txn so that txnMgr knows it - checkCmdOnDriver(driver.compileAndRespond("select * from tab1 where b=1 and p='one'")); + checkCmdOnDriver(driver.compileAndRespond("select * from tab1 where b=1 and p='one'", true)); long txnIdSelect = txnMgr.getCurrentTxnId(); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); - checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2")); + checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2", true)); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 5, locks.size()); @@ -1608,7 +1608,7 @@ public void testMultiInsert() throws Exception { public void testShowLocksAgentInfo() throws Exception { CommandProcessorResponse cpr = driver.run("create table if not exists XYZ (a int, b int)"); checkCmdOnDriver(cpr); - checkCmdOnDriver(driver.compileAndRespond("select a from XYZ where b = 8")); + checkCmdOnDriver(driver.compileAndRespond("select a from XYZ where b = 8", true)); txnMgr.acquireLocks(driver.getPlan(), ctx, "XYZ"); List locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 1, locks.size()); @@ -1651,7 +1651,7 @@ private void testMerge3Way(boolean cc) throws Exception { checkCmdOnDriver(driver.compileAndRespond("merge into target t using source s on t.a=s.b " + "when matched and t.a=5 then update set b=s.b " + //updates p=1/q=3 "when matched and t.a in (3,7) then delete " + //deletes from p=1/q=2, p=2/q=2 - "when not matched and t.a >= 8 then insert values(s.a, s.b, s.p, s.q)"));//insert p=1/q=2, p=1/q=3 and new part 1/1 + "when not matched and t.a >= 8 then insert values(s.a, s.b, s.p, s.q)", true));//insert p=1/q=2, p=1/q=3 and new part 1/1 long txnId1 = txnMgr.getCurrentTxnId(); txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); List locks = getLocks(); @@ -1668,7 +1668,7 @@ private void testMerge3Way(boolean cc) throws Exception { checkCmdOnDriver(driver.compileAndRespond("merge into target t using source2 s on t.a=s.b " + "when matched and t.a=" + (cc ? 5 : 9) + " then update set b=s.b " + //if conflict updates p=1/q=3 else update p=1/q=2 "when matched and t.a in (3,7) then delete " + //deletes from p=1/q=2, p=2/q=2 - "when not matched and t.a >= 8 then insert values(s.a, s.b, s.p, s.q)"));//insert p=1/q=2, p=1/q=3 and new part 1/1 + "when not matched and t.a >= 8 then insert values(s.a, s.b, s.p, s.q)", true));//insert p=1/q=2, p=1/q=3 and new part 1/1 long txnId2 = txnMgr2.getCurrentTxnId(); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T1", false); locks = getLocks(); @@ -1859,10 +1859,10 @@ private void testMergeUnpartitioned(boolean causeConflict) throws Exception { checkCmdOnDriver(driver.run("insert into target values (1,2), (3,4), (5,6), (7,8)")); checkCmdOnDriver(driver.run("create table source (a int, b int)")); if(causeConflict) { - checkCmdOnDriver(driver.compileAndRespond("update target set b = 2 where a=1")); + checkCmdOnDriver(driver.compileAndRespond("update target set b = 2 where a=1", true)); } else { - checkCmdOnDriver(driver.compileAndRespond("insert into target values(9,10),(11,12)")); + checkCmdOnDriver(driver.compileAndRespond("insert into target values(9,10),(11,12)", true)); } long txnid1 = txnMgr.getCurrentTxnId(); txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); @@ -1888,7 +1888,7 @@ private void testMergeUnpartitioned(boolean causeConflict) throws Exception { checkCmdOnDriver(driver.compileAndRespond("merge into target t using source s " + "on t.a=s.a " + "when matched then delete " + - "when not matched then insert values(s.a,s.b)")); + "when not matched then insert values(s.a,s.b)", true)); long txnid2 = txnMgr2.getCurrentTxnId(); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2", false); locks = getLocks(); @@ -1965,7 +1965,7 @@ public void testDynamicPartitionInsert() throws Exception { "partitioned by (p int, q int) clustered by (a) into 2 buckets " + "stored as orc TBLPROPERTIES ('transactional'='true')")); long txnid1 = txnMgr.openTxn(ctx, "T1"); - checkCmdOnDriver(driver.compileAndRespond("insert into target partition(p=1,q) values (1,2,2), (3,4,2), (5,6,3), (7,8,2)")); + checkCmdOnDriver(driver.compileAndRespond("insert into target partition(p=1,q) values (1,2,2), (3,4,2), (5,6,3), (7,8,2)", true)); txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); List locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -1998,7 +1998,7 @@ public void testDynamicPartitionInsert() throws Exception { long txnid2 = txnMgr.openTxn(ctx, "T1"); - checkCmdOnDriver(driver.compileAndRespond("insert into target partition(p=1,q) values (10,2,2), (30,4,2), (50,6,3), (70,8,2)")); + checkCmdOnDriver(driver.compileAndRespond("insert into target partition(p=1,q) values (10,2,2), (30,4,2), (50,6,3), (70,8,2)", true)); txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -2038,7 +2038,7 @@ private void testMergePartitioned(boolean causeConflict) throws Exception { checkCmdOnDriver(driver.run("insert into target partition(p,q) values (1,2,1,2), (3,4,1,2), (5,6,1,3), (7,8,2,2)")); checkCmdOnDriver(driver.run("create table source (a1 int, b1 int, p1 int, q1 int)")); - checkCmdOnDriver(driver.compileAndRespond("update target set b = 2 where p=1")); + checkCmdOnDriver(driver.compileAndRespond("update target set b = 2 where p=1", true)); long txnId1 = txnMgr.getCurrentTxnId(); txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); List locks = getLocks(); @@ -2052,7 +2052,7 @@ private void testMergePartitioned(boolean causeConflict) throws Exception { checkCmdOnDriver(driver.compileAndRespond("merge into target using source " + "on target.p=source.p1 and target.a=source.a1 " + "when matched then update set b = 11 " + - "when not matched then insert values(a1,b1,p1,q1)")); + "when not matched then insert values(a1,b1,p1,q1)", true)); long txnid2 = txnMgr2.getCurrentTxnId(); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2", false); locks = getLocks(txnMgr); @@ -2179,7 +2179,7 @@ public void testShowTablesLock() throws Exception { checkCmdOnDriver(cpr); long txnid1 = txnMgr.openTxn(ctx, "Fifer"); - checkCmdOnDriver(driver.compileAndRespond("insert into T values(1,3)")); + checkCmdOnDriver(driver.compileAndRespond("insert into T values(1,3)", true)); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -2189,7 +2189,7 @@ public void testShowTablesLock() throws Exception { DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); txnMgr2.openTxn(ctx, "Fidler"); swapTxnManager(txnMgr2); - checkCmdOnDriver(driver.compileAndRespond("show tables")); + checkCmdOnDriver(driver.compileAndRespond("show tables", true)); txnMgr2.acquireLocks(driver.getPlan(), ctx, "Fidler"); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 3, locks.size()); @@ -2208,7 +2208,7 @@ public void testShowTablesLock() throws Exception { "into 2 buckets stored as orc TBLPROPERTIES ('transactional'='false')"); checkCmdOnDriver(cpr); - checkCmdOnDriver(driver.compileAndRespond("insert into T2 partition(p=1) values(1,3)")); + checkCmdOnDriver(driver.compileAndRespond("insert into T2 partition(p=1) values(1,3)", true)); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -2218,7 +2218,7 @@ public void testShowTablesLock() throws Exception { txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); txnMgr2.openTxn(ctx, "Fidler"); swapTxnManager(txnMgr2); - checkCmdOnDriver(driver.compileAndRespond("show tables")); + checkCmdOnDriver(driver.compileAndRespond("show tables", true)); txnMgr2.acquireLocks(driver.getPlan(), ctx, "Fidler", false); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 3, locks.size()); @@ -2235,12 +2235,12 @@ public void testFairness() throws Exception { dropTable(new String[] {"T6"}); CommandProcessorResponse cpr = driver.run("create table if not exists T6(a int)"); checkCmdOnDriver(cpr); - cpr = driver.compileAndRespond("select a from T6"); + cpr = driver.compileAndRespond("select a from T6", true); checkCmdOnDriver(cpr); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");//gets S lock on T6 HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); - cpr = driver.compileAndRespond("drop table if exists T6"); + cpr = driver.compileAndRespond("drop table if exists T6", true); checkCmdOnDriver(cpr); //tries to get X lock on T6 and gets Waiting state LockState lockState = ((DbTxnManager) txnMgr2).acquireLocks(driver.getPlan(), ctx, "Fiddler", false); @@ -2253,7 +2253,7 @@ public void testFairness() throws Exception { swapTxnManager(txnMgr3); //this should block behind the X lock on T6 //this is a contrived example, in practice this query would of course fail after drop table - cpr = driver.compileAndRespond("select a from T6"); + cpr = driver.compileAndRespond("select a from T6", true); checkCmdOnDriver(cpr); ((DbTxnManager)txnMgr3).acquireLocks(driver.getPlan(), ctx, "Fifer", false);//gets S lock on T6 locks = getLocks(); @@ -2281,12 +2281,12 @@ public void testFairness2() throws Exception { checkCmdOnDriver(cpr); checkCmdOnDriver(driver.run( "insert into T7 partition(p) values(1,1),(1,2)"));//create 2 partitions - cpr = driver.compileAndRespond("select a from T7 "); + cpr = driver.compileAndRespond("select a from T7 ", true); checkCmdOnDriver(cpr); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");//gets S lock on T7 HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr2); - cpr = driver.compileAndRespond("alter table T7 drop partition (p=1)"); + cpr = driver.compileAndRespond("alter table T7 drop partition (p=1)", true); checkCmdOnDriver(cpr); //tries to get X lock on T7.p=1 and gets Waiting state LockState lockState = ((DbTxnManager) txnMgr2).acquireLocks(driver.getPlan(), ctx, @@ -2301,7 +2301,7 @@ public void testFairness2() throws Exception { HiveTxnManager txnMgr3 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr3); //this should block behind the X lock on T7.p=1 - cpr = driver.compileAndRespond("select a from T7"); + cpr = driver.compileAndRespond("select a from T7", true); checkCmdOnDriver(cpr); //tries to get S lock on T7, S on T7.p=1 and S on T7.p=2 ((DbTxnManager)txnMgr3).acquireLocks(driver.getPlan(), ctx, "Fifer", false);